Source code for tornadio2.gen

# -*- coding: utf-8 -*-
#
# Copyright: (c) 2011 by the Serge S. Koval, see AUTHORS for more details.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
    tornadio2.gen
    ~~~~~~~~~~~~~

    Generator-based interface to make it easier to work in an asynchronous environment.
"""

import functools
import types

from collections import deque

from tornado.gen import engine, Runner, Task, Wait, WaitAll, Callback


[docs]class SyncRunner(Runner): """Customized ``tornado.gen.Runner``, which will notify callback about completion of the generator. """
[docs] def __init__(self, gen, callback): """Constructor. `gen` Generator `callback` Function that should be called upon generator completion """ self._callback = callback super(SyncRunner, self).__init__(gen)
def run(self): """Overloaded run function""" if self.running or self.finished: return try: super(SyncRunner, self).run() finally: if self.finished: self._callback()
class CallQueue(object): __slots__ = ('runner', 'queue') def __init__(self): self.runner = None self.queue = deque()
[docs]def sync_engine(func): """Queued version of the ``tornado.gen.engine``. Prevents calling of the wrapped function if there is already one instance of the function running asynchronously. Function will be called synchronously without blocking io_loop. This decorator can only be used on class methods, as it requires ``self`` to make sure that calls are scheduled on instance level (connection) instead of class level (method). """ @functools.wraps(func) def wrapper(self, *args, **kwargs): # Run method def run(args, kwargs): gen = func(self, *args, **kwargs) if isinstance(gen, types.GeneratorType): data.runner = SyncRunner(gen, finished) data.runner.run() else: return gen # Completion callback def finished(): data.runner = None try: args, kwargs = data.queue.popleft() run(args, kwargs) except IndexError: pass # Get call queue for this instance and wrapped method queue = getattr(self, '_call_queue', None) if queue is None: queue = self._call_queue = dict() data = queue.get(func, None) if data is None: queue[func] = data = CallQueue() # If there's something running, queue call if data.runner is not None: data.queue.append((args, kwargs)) else: # Otherwise run it run(args, kwargs) return wrapper