From 8a79c25292e6aab0623a0f49a501b77775f3aba3 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Tue, 3 Sep 2013 11:40:21 +0400 Subject: [PATCH] Split utils module In this commit we split utils module into several parts: - flow_utils, with code used in running flows; - threading_utils, with code that helps in working with threads; - reflection, with code that inspects python objects metadata; - misc, with all the other code that used to live in utils.py. We also move graph_utils into taskflow.utils package. This commit just moves code around. It should not change any logic (with exception of complex_graph example). Change-Id: Iebfe45395f0ff502bc00fc7ae14829130b2c6abe --- taskflow/examples/complex_graph.py | 12 +- taskflow/flow.py | 6 +- taskflow/patterns/graph_flow.py | 6 +- taskflow/patterns/linear_flow.py | 12 +- taskflow/patterns/resumption/logbook.py | 2 +- taskflow/patterns/threaded_flow.py | 34 +-- taskflow/persistence/backends/memory/api.py | 12 +- taskflow/task.py | 13 +- taskflow/tests/unit/test_utils.py | 57 +++-- taskflow/utils/__init__.py | 19 ++ taskflow/{utils.py => utils/flow_utils.py} | 228 +------------------- taskflow/{ => utils}/graph_utils.py | 0 taskflow/utils/misc.py | 60 ++++++ taskflow/utils/reflection.py | 68 ++++++ taskflow/utils/threading_utils.py | 146 +++++++++++++ 15 files changed, 374 insertions(+), 301 deletions(-) create mode 100644 taskflow/utils/__init__.py rename taskflow/{utils.py => utils/flow_utils.py} (54%) rename taskflow/{ => utils}/graph_utils.py (100%) create mode 100644 taskflow/utils/misc.py create mode 100644 taskflow/utils/reflection.py create mode 100644 taskflow/utils/threading_utils.py diff --git a/taskflow/examples/complex_graph.py b/taskflow/examples/complex_graph.py index 0b47a6227..09f73c0cd 100644 --- a/taskflow/examples/complex_graph.py +++ b/taskflow/examples/complex_graph.py @@ -10,7 +10,6 @@ sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), from taskflow import decorators from taskflow.patterns import graph_flow as gf -from taskflow import utils def flow_notify(state, details): @@ -105,15 +104,10 @@ def startup(context, **kwargs): } -flow_notifier = utils.TransitionNotifier() -flow_notifier.register('*', flow_notify) - -task_notifier = utils.TransitionNotifier() -task_notifier.register('*', task_notify) - flow = gf.Flow("make-auto") -flow.notifier = flow_notifier -flow.task_notifier = task_notifier +flow.notifier.register('*', flow_notify) +flow.task_notifier.register('*', task_notify) + # Lets build a car!! flow.add(build_spec) diff --git a/taskflow/flow.py b/taskflow/flow.py index 8d03e48af..93c398818 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils from taskflow import exceptions as exc from taskflow import states -from taskflow import utils +from taskflow.utils import flow_utils class Flow(object): @@ -83,8 +83,8 @@ class Flow(object): # progress and record tasks finishing (so that it becomes possible to # store the result of a task in some persistent or semi-persistent # storage backend). - self.notifier = utils.TransitionNotifier() - self.task_notifier = utils.TransitionNotifier() + self.notifier = flow_utils.TransitionNotifier() + self.task_notifier = flow_utils.TransitionNotifier() # Assign this flow a unique identifer. if uuid: self._id = str(uuid) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 504bbcd56..f7c52e5f4 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -24,9 +24,9 @@ from networkx import exception as g_exc from taskflow import decorators from taskflow import exceptions as exc -from taskflow import graph_utils from taskflow.patterns import linear_flow -from taskflow import utils +from taskflow.utils import flow_utils +from taskflow.utils import graph_utils LOG = logging.getLogger(__name__) @@ -47,7 +47,7 @@ class Flow(linear_flow.Flow): # together later after all nodes have been added since if we try # to infer the edges at this stage we likely will fail finding # dependencies from nodes that don't exist. - r = utils.AOTRunner(task) + r = flow_utils.AOTRunner(task) self._graph.add_node(r, uuid=r.uuid, infer=infer) self._reset_internals() return r.uuid diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 329935263..34d99bc1e 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -26,7 +26,7 @@ from taskflow.openstack.common import excutils from taskflow import decorators from taskflow import exceptions as exc from taskflow import states -from taskflow import utils +from taskflow.utils import flow_utils from taskflow import flow @@ -46,7 +46,7 @@ class Flow(flow.Flow): super(Flow, self).__init__(name, parents, uuid) # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. - self._accumulator = utils.RollbackAccumulator() + self._accumulator = flow_utils.RollbackAccumulator() # Tasks results are stored here. Lookup is by the uuid that was # returned from the add function. self.results = {} @@ -63,7 +63,7 @@ class Flow(flow.Flow): @decorators.locked def add(self, task): """Adds a given task to this flow.""" - r = utils.AOTRunner(task) + r = flow_utils.AOTRunner(task) r.runs_before = list(reversed(self._runners)) self._runners.append(r) self._reset_internals() @@ -170,7 +170,8 @@ class Flow(flow.Flow): # Add the task to be rolled back *immediately* so that even if # the task fails while producing results it will be given a # chance to rollback. - rb = utils.Rollback(context, runner, self, self.task_notifier) + rb = flow_utils.Rollback(context, runner, self, + self.task_notifier) self._accumulator.add(rb) self.task_notifier.notify(states.STARTED, details={ 'context': context, @@ -212,7 +213,8 @@ class Flow(flow.Flow): 'flow': self, 'runner': runner, }) - self.rollback(context, utils.FlowFailure(runner, self)) + self.rollback(context, + flow_utils.FlowFailure(runner, self)) run_check_functor = functools.partial(abort_if, ok_states=[states.STARTED, diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py index 07dd9535a..437db1b27 100644 --- a/taskflow/patterns/resumption/logbook.py +++ b/taskflow/patterns/resumption/logbook.py @@ -19,7 +19,7 @@ import logging from taskflow import states -from taskflow import utils +from taskflow.utils import misc as utils LOG = logging.getLogger(__name__) diff --git a/taskflow/patterns/threaded_flow.py b/taskflow/patterns/threaded_flow.py index 0e4de140a..8794a585e 100644 --- a/taskflow/patterns/threaded_flow.py +++ b/taskflow/patterns/threaded_flow.py @@ -18,9 +18,11 @@ from taskflow import exceptions as exc from taskflow import flow -from taskflow import graph_utils from taskflow import states -from taskflow import utils +from taskflow.utils import flow_utils +from taskflow.utils import graph_utils +from taskflow.utils import threading_utils + import collections import functools @@ -122,7 +124,7 @@ class Flow(flow.Flow): def reset(self): # All locks are used so that resets can not happen while running or # cancelling or modifying. - with utils.MultiLock(self._core_locks): + with threading_utils.MultiLock(self._core_locks): super(Flow, self).reset() self.results = {} self.resumer = None @@ -143,7 +145,7 @@ class Flow(flow.Flow): # running. Further state management logic is then used while running # to verify that the flow should still be running when it has been # cancelled. - with utils.MultiLock(self._cancel_locks): + with threading_utils.MultiLock(self._cancel_locks): check() if len(self._graph) == 0: was_empty = True @@ -184,7 +186,7 @@ class Flow(flow.Flow): # All locks must be acquired so that modifications can not be made # while running, cancelling or performing a simultaneous mutation. - with utils.MultiLock(self._core_locks): + with threading_utils.MultiLock(self._core_locks): check() runner = ThreadRunner(task, self, timeout) self._graph.add_node(runner, infer=infer) @@ -236,7 +238,7 @@ class Flow(flow.Flow): # All locks must be acquired so that modifications can not be made # while running, cancelling or performing a simultaneous mutation. - with utils.MultiLock(self._core_locks): + with threading_utils.MultiLock(self._core_locks): check() added = [] for t in tasks: @@ -271,7 +273,7 @@ class Flow(flow.Flow): # All locks must be acquired so that modifications can not be made # while running, cancelling or performing a simultaneous mutation. - with utils.MultiLock(self._core_locks): + with threading_utils.MultiLock(self._core_locks): (provider, consumer) = check_and_fetch() self._graph.add_edge(provider, consumer, reason='manual') LOG.debug("Connecting %s as a manual provider for %s", @@ -332,7 +334,7 @@ class Flow(flow.Flow): for r in self._graph.nodes_iter(): r.reset() r._result_cb = result_cb - executor = utils.ThreadGroupExecutor() + executor = threading_utils.ThreadGroupExecutor() for r in self._graph.nodes_iter(): executor.submit(r, *args, **kwargs) executor.await_termination() @@ -342,7 +344,7 @@ class Flow(flow.Flow): return causes = [] for r in failures: - causes.append(utils.FlowFailure(r, self)) + causes.append(flow_utils.FlowFailure(r, self)) try: self.rollback(context, causes) except exc.InvalidStateException: @@ -396,7 +398,7 @@ class Flow(flow.Flow): # mutation lock to stop simultaneous running and simultaneous mutating # which are not allowed on a running flow. Allow simultaneous cancel # by performing repeated state checking while running. - with utils.MultiLock(self._run_locks): + with threading_utils.MultiLock(self._run_locks): check() connect_and_verify() try: @@ -418,13 +420,13 @@ class Flow(flow.Flow): # All locks must be acquired so that modifications can not be made # while another entity is running, rolling-back, cancelling or # performing a mutation operation. - with utils.MultiLock(self._core_locks): + with threading_utils.MultiLock(self._core_locks): check() - accum = utils.RollbackAccumulator() + accum = flow_utils.RollbackAccumulator() for r in self._graph.nodes_iter(): if r.has_ran(): - accum.add(utils.Rollback(context, r, - self, self.task_notifier)) + accum.add(flow_utils.Rollback(context, r, + self, self.task_notifier)) try: self._change_state(context, states.REVERTING) accum.rollback(cause) @@ -432,7 +434,7 @@ class Flow(flow.Flow): self._change_state(context, states.FAILURE) -class ThreadRunner(utils.Runner): +class ThreadRunner(flow_utils.Runner): """A helper class that will use a countdown latch to avoid calling its callable object until said countdown latch has emptied. After it has been emptied the predecessor tasks will be examined for dependent results @@ -465,7 +467,7 @@ class ThreadRunner(utils.Runner): # simultaneously for a given flow. self._state_lock = flow._state_lock self._cancel_lock = threading.RLock() - self._latch = utils.CountDownLatch() + self._latch = threading_utils.CountDownLatch() # Any related family. self._predecessors = [] self._successors = [] diff --git a/taskflow/persistence/backends/memory/api.py b/taskflow/persistence/backends/memory/api.py index fdf615f76..06375d909 100644 --- a/taskflow/persistence/backends/memory/api.py +++ b/taskflow/persistence/backends/memory/api.py @@ -26,7 +26,7 @@ import threading from taskflow import exceptions as exc from taskflow.openstack.common import timeutils -from taskflow import utils +from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def _taskdetails_merge(td_e, td_new): def taskdetails_save(td): - with utils.MultiLock(READ_SAVE_ORDER): + with threading_utils.MultiLock(READ_SAVE_ORDER): try: return _taskdetails_merge(TASK_DETAILS[td.uuid], td) except KeyError: @@ -76,7 +76,7 @@ def taskdetails_save(td): def flowdetails_save(fd): try: - with utils.MultiLock(READ_SAVE_ORDER): + with threading_utils.MultiLock(READ_SAVE_ORDER): e_fd = FLOW_DETAILS[fd.uuid] if e_fd.meta != fd.meta: e_fd.meta = fd.meta @@ -99,7 +99,7 @@ def flowdetails_save(fd): def clear_all(): - with utils.MultiLock(READ_SAVE_ORDER): + with threading_utils.MultiLock(READ_SAVE_ORDER): count = 0 for lb_id in list(LOG_BOOKS.iterkeys()): logbook_destroy(lb_id) @@ -117,7 +117,7 @@ def logbook_get(lb_id): def logbook_destroy(lb_id): try: - with utils.MultiLock(READ_SAVE_ORDER): + with threading_utils.MultiLock(READ_SAVE_ORDER): # Do the same cascading delete that the sql layer does. lb = LOG_BOOKS.pop(lb_id) for fd in lb: @@ -131,7 +131,7 @@ def logbook_destroy(lb_id): def logbook_save(lb): # Acquire all the locks that will be needed to perform this operation with # out being affected by other threads doing it at the same time. - with utils.MultiLock(READ_SAVE_ORDER): + with threading_utils.MultiLock(READ_SAVE_ORDER): # Get a existing logbook model (or create it if it isn't there). try: backing_lb = LOG_BOOKS[lb.uuid] diff --git a/taskflow/task.py b/taskflow/task.py index 63ee7a86d..6b7dca69d 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -19,7 +19,8 @@ import abc -from taskflow import utils +from taskflow.utils import misc +from taskflow.utils import reflection class BaseTask(object): @@ -50,7 +51,7 @@ class BaseTask(object): return self._name def __str__(self): - return "%s==%s" % (self.name, utils.get_task_version(self)) + return "%s==%s" % (self.name, misc.get_task_version(self)) @abc.abstractmethod def execute(self, context, *args, **kwargs): @@ -86,10 +87,10 @@ class Task(BaseTask): arguments names will be added to task requirements """ if name is None: - name = utils.get_callable_name(self) + name = reflection.get_callable_name(self) super(Task, self).__init__(name) if requires_from_args: - f_args = utils.get_required_callable_args(self.execute) + f_args = reflection.get_required_callable_args(self.execute) self.requires.update(a for a in f_args if a != 'context') @@ -115,7 +116,7 @@ class FunctorTask(BaseTask): """ name = kwargs.pop('name', None) if name is None: - name = utils.get_callable_name(execute) + name = reflection.get_callable_name(execute) super(FunctorTask, self).__init__(name) self._execute = execute self._revert = kwargs.pop('revert', None) @@ -124,7 +125,7 @@ class FunctorTask(BaseTask): self.provides.update(kwargs.pop('provides', ())) self.requires.update(kwargs.pop('requires', ())) if kwargs.pop('auto_extract', True): - f_args = utils.get_required_callable_args(execute) + f_args = reflection.get_required_callable_args(execute) self.requires.update(a for a in f_args if a != 'context') if kwargs: raise TypeError('__init__() got an unexpected keyword argument %r' diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index eb3d76010..0323369b5 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -20,7 +20,8 @@ import functools from taskflow import decorators from taskflow import test -from taskflow import utils +from taskflow.utils import flow_utils +from taskflow.utils import reflection class UtilTest(test.TestCase): @@ -30,7 +31,7 @@ class UtilTest(test.TestCase): def caller(token, e): context[token] = True - accum = utils.RollbackAccumulator() + accum = flow_utils.RollbackAccumulator() def blowup(): for i in range(0, 10): @@ -94,37 +95,37 @@ class ClassWithInit(object): class GetCallableNameTest(test.TestCase): def test_mere_function(self): - name = utils.get_callable_name(mere_function) + name = reflection.get_callable_name(mere_function) self.assertEquals(name, '.'.join((__name__, 'mere_function'))) def test_method(self): - name = utils.get_callable_name(Class.method) + name = reflection.get_callable_name(Class.method) self.assertEquals(name, '.'.join((__name__, 'Class', 'method'))) def test_instance_method(self): - name = utils.get_callable_name(Class().method) + name = reflection.get_callable_name(Class().method) self.assertEquals(name, '.'.join((__name__, 'Class', 'method'))) def test_static_method(self): # NOTE(imelnikov): static method are just functions, class name # is not recorded anywhere in them - name = utils.get_callable_name(Class.static_method) + name = reflection.get_callable_name(Class.static_method) self.assertEquals(name, '.'.join((__name__, 'static_method'))) def test_class_method(self): - name = utils.get_callable_name(Class.class_method) + name = reflection.get_callable_name(Class.class_method) self.assertEquals(name, '.'.join((__name__, 'Class', 'class_method'))) def test_constructor(self): - name = utils.get_callable_name(Class) + name = reflection.get_callable_name(Class) self.assertEquals(name, '.'.join((__name__, 'Class'))) def test_callable_class(self): - name = utils.get_callable_name(CallableClass()) + name = reflection.get_callable_name(CallableClass()) self.assertEquals(name, '.'.join((__name__, 'CallableClass'))) def test_callable_class_call(self): - name = utils.get_callable_name(CallableClass().__call__) + name = reflection.get_callable_name(CallableClass().__call__) self.assertEquals(name, '.'.join((__name__, 'CallableClass', '__call__'))) @@ -132,40 +133,36 @@ class GetCallableNameTest(test.TestCase): class GetRequiredCallableArgsTest(test.TestCase): def test_mere_function(self): - self.assertEquals(['a', 'b'], - utils.get_required_callable_args(mere_function)) + result = reflection.get_required_callable_args(mere_function) + self.assertEquals(['a', 'b'], result) def test_function_with_defaults(self): - self.assertEquals(['a', 'b'], - utils.get_required_callable_args( - function_with_defaults)) + result = reflection.get_required_callable_args(function_with_defaults) + self.assertEquals(['a', 'b'], result) def test_method(self): - self.assertEquals(['self', 'c', 'd'], - utils.get_required_callable_args(Class.method)) + result = reflection.get_required_callable_args(Class.method) + self.assertEquals(['self', 'c', 'd'], result) def test_instance_method(self): - self.assertEquals(['c', 'd'], - utils.get_required_callable_args(Class().method)) + result = reflection.get_required_callable_args(Class().method) + self.assertEquals(['c', 'd'], result) def test_class_method(self): - self.assertEquals(['g', 'h'], - utils.get_required_callable_args( - Class.class_method)) + result = reflection.get_required_callable_args(Class.class_method) + self.assertEquals(['g', 'h'], result) def test_class_constructor(self): - self.assertEquals(['k', 'l'], - utils.get_required_callable_args( - ClassWithInit)) + result = reflection.get_required_callable_args(ClassWithInit) + self.assertEquals(['k', 'l'], result) def test_class_with_call(self): - self.assertEquals(['i', 'j'], - utils.get_required_callable_args( - CallableClass())) + result = reflection.get_required_callable_args(CallableClass()) + self.assertEquals(['i', 'j'], result) def test_decorators_work(self): @decorators.locked def locked_fun(x, y): pass - self.assertEquals(['x', 'y'], - utils.get_required_callable_args(locked_fun)) + result = reflection.get_required_callable_args(locked_fun) + self.assertEquals(['x', 'y'], result) diff --git a/taskflow/utils/__init__.py b/taskflow/utils/__init__.py new file mode 100644 index 000000000..d7f601295 --- /dev/null +++ b/taskflow/utils/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory' diff --git a/taskflow/utils.py b/taskflow/utils/flow_utils.py similarity index 54% rename from taskflow/utils.py rename to taskflow/utils/flow_utils.py index 2e2362dba..5e5093b9e 100644 --- a/taskflow/utils.py +++ b/taskflow/utils/flow_utils.py @@ -2,8 +2,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,233 +18,18 @@ import collections import copy -import inspect import logging -import threading -import time -import types import weakref -import threading2 - -from distutils import version - from taskflow.openstack.common import uuidutils from taskflow import states +from taskflow import utils +from taskflow.utils import misc + -TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory' LOG = logging.getLogger(__name__) -def await(check_functor, timeout=None): - if timeout is not None: - end_time = time.time() + max(0, timeout) - else: - end_time = None - # Use the same/similar scheme that the python condition class uses. - delay = 0.0005 - while not check_functor(): - time.sleep(delay) - if end_time is not None: - remaining = end_time - time.time() - if remaining <= 0: - return False - delay = min(delay * 2, remaining, 0.05) - else: - delay = min(delay * 2, 0.05) - return True - - -def get_callable_name(function): - """Generate a name from callable - - Tries to do the best to guess fully qualified callable name. - """ - im_class = getattr(function, 'im_class', None) - if im_class is not None: - if im_class is type: - # this is bound class method - im_class = function.im_self - parts = (im_class.__module__, im_class.__name__, - function.__name__) - elif isinstance(function, types.FunctionType): - parts = (function.__module__, function.__name__) - else: - im_class = type(function) - if im_class is type: - im_class = function - parts = (im_class.__module__, im_class.__name__) - return '.'.join(parts) - - -def is_bound_method(method): - return getattr(method, 'im_self', None) is not None - - -def get_required_callable_args(function): - """Get names of argument required by callable""" - - if isinstance(function, type): - bound = True - function = function.__init__ - elif isinstance(function, (types.FunctionType, types.MethodType)): - bound = is_bound_method(function) - function = getattr(function, '__wrapped__', function) - else: - function = function.__call__ - bound = is_bound_method(function) - - argspec = inspect.getargspec(function) - f_args = argspec.args - if argspec.defaults: - f_args = f_args[:-len(argspec.defaults)] - if bound: - f_args = f_args[1:] - return f_args - - -def get_task_version(task): - """Gets a tasks *string* version, whether it is a task object/function.""" - task_version = getattr(task, 'version') - if isinstance(task_version, (list, tuple)): - task_version = '.'.join(str(item) for item in task_version) - if task_version is not None and not isinstance(task_version, basestring): - task_version = str(task_version) - return task_version - - -def is_version_compatible(version_1, version_2): - """Checks for major version compatibility of two *string" versions.""" - try: - version_1_tmp = version.StrictVersion(version_1) - version_2_tmp = version.StrictVersion(version_2) - except ValueError: - version_1_tmp = version.LooseVersion(version_1) - version_2_tmp = version.LooseVersion(version_2) - version_1 = version_1_tmp - version_2 = version_2_tmp - if version_1 == version_2 or version_1.version[0] == version_2.version[0]: - return True - return False - - -class MultiLock(object): - """A class which can attempt to obtain many locks at once and release - said locks when exiting. - - Useful as a context manager around many locks (instead of having to nest - said individual context managers). - """ - - def __init__(self, locks): - assert len(locks) > 0, "Zero locks requested" - self._locks = locks - self._locked = [False] * len(locks) - - def __enter__(self): - - def is_locked(lock): - # NOTE(harlowja): the threading2 lock doesn't seem to have this - # attribute, so thats why we are checking it existing first. - if hasattr(lock, 'locked'): - return lock.locked() - return False - - for i in xrange(0, len(self._locked)): - if self._locked[i] or is_locked(self._locks[i]): - raise threading.ThreadError("Lock %s not previously released" - % (i + 1)) - self._locked[i] = False - for (i, lock) in enumerate(self._locks): - self._locked[i] = lock.acquire() - - def __exit__(self, type, value, traceback): - for (i, locked) in enumerate(self._locked): - try: - if locked: - self._locks[i].release() - self._locked[i] = False - except threading.ThreadError: - LOG.exception("Unable to release lock %s", i + 1) - - -class CountDownLatch(object): - """Similar in concept to the java count down latch.""" - - def __init__(self, count=0): - self.count = count - self.lock = threading.Condition() - - def countDown(self): - with self.lock: - self.count -= 1 - if self.count <= 0: - self.lock.notifyAll() - - def await(self, timeout=None): - end_time = None - if timeout is not None: - timeout = max(0, timeout) - end_time = time.time() + timeout - time_up = False - with self.lock: - while True: - # Stop waiting on these 2 conditions. - if time_up or self.count <= 0: - break - # Was this a spurious wakeup or did we really end?? - self.lock.wait(timeout=timeout) - if end_time is not None: - if time.time() >= end_time: - time_up = True - else: - # Reduce the timeout so that we don't wait extra time - # over what we initially were requested to. - timeout = end_time - time.time() - return self.count <= 0 - - -class LastFedIter(object): - """An iterator which yields back the first item and then yields back - results from the provided iterator. - """ - - def __init__(self, first, rest_itr): - self.first = first - self.rest_itr = rest_itr - - def __iter__(self): - yield self.first - for i in self.rest_itr: - yield i - - -class ThreadGroupExecutor(object): - """A simple thread executor that spins up new threads (or greenthreads) for - each task to be completed (no pool limit is enforced). - - TODO(harlowja): Likely if we use the more advanced executors that come with - the concurrent.futures library we can just get rid of this. - """ - - def __init__(self, daemonize=True): - self._threads = [] - self._group = threading2.ThreadGroup() - self._daemonize = daemonize - - def submit(self, fn, *args, **kwargs): - t = threading2.Thread(target=fn, group=self._group, - args=args, kwargs=kwargs) - t.daemon = self._daemonize - self._threads.append(t) - t.start() - - def await_termination(self, timeout=None): - if not self._threads: - return - return self._group.join(timeout) - - class FlowFailure(object): """When a task failure occurs the following object will be given to revert and can be used to interrogate what caused the failure. @@ -271,7 +55,7 @@ class Runner(object): """ def __init__(self, task, uuid=None): - task_factory = getattr(task, TASK_FACTORY_ATTRIBUTE, None) + task_factory = getattr(task, utils.TASK_FACTORY_ATTRIBUTE, None) if task_factory: self.task = task_factory(task) else: @@ -306,7 +90,7 @@ class Runner(object): @property def version(self): - return get_task_version(self.task) + return misc.get_task_version(self.task) @property def name(self): diff --git a/taskflow/graph_utils.py b/taskflow/utils/graph_utils.py similarity index 100% rename from taskflow/graph_utils.py rename to taskflow/utils/graph_utils.py diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py new file mode 100644 index 000000000..cd055a0ec --- /dev/null +++ b/taskflow/utils/misc.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from distutils import version + + +def get_task_version(task): + """Gets a tasks *string* version, whether it is a task object/function.""" + task_version = getattr(task, 'version') + if isinstance(task_version, (list, tuple)): + task_version = '.'.join(str(item) for item in task_version) + if task_version is not None and not isinstance(task_version, basestring): + task_version = str(task_version) + return task_version + + +def is_version_compatible(version_1, version_2): + """Checks for major version compatibility of two *string" versions.""" + try: + version_1_tmp = version.StrictVersion(version_1) + version_2_tmp = version.StrictVersion(version_2) + except ValueError: + version_1_tmp = version.LooseVersion(version_1) + version_2_tmp = version.LooseVersion(version_2) + version_1 = version_1_tmp + version_2 = version_2_tmp + if version_1 == version_2 or version_1.version[0] == version_2.version[0]: + return True + return False + + +class LastFedIter(object): + """An iterator which yields back the first item and then yields back + results from the provided iterator. + """ + + def __init__(self, first, rest_itr): + self.first = first + self.rest_itr = rest_itr + + def __iter__(self): + yield self.first + for i in self.rest_itr: + yield i diff --git a/taskflow/utils/reflection.py b/taskflow/utils/reflection.py new file mode 100644 index 000000000..36c6801a1 --- /dev/null +++ b/taskflow/utils/reflection.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import inspect +import types + + +def get_callable_name(function): + """Generate a name from callable + + Tries to do the best to guess fully qualified callable name. + """ + im_class = getattr(function, 'im_class', None) + if im_class is not None: + if im_class is type: + # this is bound class method + im_class = function.im_self + parts = (im_class.__module__, im_class.__name__, + function.__name__) + elif isinstance(function, types.FunctionType): + parts = (function.__module__, function.__name__) + else: + im_class = type(function) + if im_class is type: + im_class = function + parts = (im_class.__module__, im_class.__name__) + return '.'.join(parts) + + +def is_bound_method(method): + return getattr(method, 'im_self', None) is not None + + +def get_required_callable_args(function): + """Get names of argument required by callable""" + + if isinstance(function, type): + bound = True + function = function.__init__ + elif isinstance(function, (types.FunctionType, types.MethodType)): + bound = is_bound_method(function) + function = getattr(function, '__wrapped__', function) + else: + function = function.__call__ + bound = is_bound_method(function) + + argspec = inspect.getargspec(function) + f_args = argspec.args + if argspec.defaults: + f_args = f_args[:-len(argspec.defaults)] + if bound: + f_args = f_args[1:] + return f_args diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py new file mode 100644 index 000000000..05c5e3565 --- /dev/null +++ b/taskflow/utils/threading_utils.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading +import threading2 +import time + + +LOG = logging.getLogger(__name__) + + +def await(check_functor, timeout=None): + if timeout is not None: + end_time = time.time() + max(0, timeout) + else: + end_time = None + # Use the same/similar scheme that the python condition class uses. + delay = 0.0005 + while not check_functor(): + time.sleep(delay) + if end_time is not None: + remaining = end_time - time.time() + if remaining <= 0: + return False + delay = min(delay * 2, remaining, 0.05) + else: + delay = min(delay * 2, 0.05) + return True + + +class MultiLock(object): + """A class which can attempt to obtain many locks at once and release + said locks when exiting. + + Useful as a context manager around many locks (instead of having to nest + said individual context managers). + """ + + def __init__(self, locks): + assert len(locks) > 0, "Zero locks requested" + self._locks = locks + self._locked = [False] * len(locks) + + def __enter__(self): + + def is_locked(lock): + # NOTE(harlowja): the threading2 lock doesn't seem to have this + # attribute, so thats why we are checking it existing first. + if hasattr(lock, 'locked'): + return lock.locked() + return False + + for i in xrange(0, len(self._locked)): + if self._locked[i] or is_locked(self._locks[i]): + raise threading.ThreadError("Lock %s not previously released" + % (i + 1)) + self._locked[i] = False + for (i, lock) in enumerate(self._locks): + self._locked[i] = lock.acquire() + + def __exit__(self, type, value, traceback): + for (i, locked) in enumerate(self._locked): + try: + if locked: + self._locks[i].release() + self._locked[i] = False + except threading.ThreadError: + LOG.exception("Unable to release lock %s", i + 1) + + +class CountDownLatch(object): + """Similar in concept to the java count down latch.""" + + def __init__(self, count=0): + self.count = count + self.lock = threading.Condition() + + def countDown(self): + with self.lock: + self.count -= 1 + if self.count <= 0: + self.lock.notifyAll() + + def await(self, timeout=None): + end_time = None + if timeout is not None: + timeout = max(0, timeout) + end_time = time.time() + timeout + time_up = False + with self.lock: + while True: + # Stop waiting on these 2 conditions. + if time_up or self.count <= 0: + break + # Was this a spurious wakeup or did we really end?? + self.lock.wait(timeout=timeout) + if end_time is not None: + if time.time() >= end_time: + time_up = True + else: + # Reduce the timeout so that we don't wait extra time + # over what we initially were requested to. + timeout = end_time - time.time() + return self.count <= 0 + + +class ThreadGroupExecutor(object): + """A simple thread executor that spins up new threads (or greenthreads) for + each task to be completed (no pool limit is enforced). + + TODO(harlowja): Likely if we use the more advanced executors that come with + the concurrent.futures library we can just get rid of this. + """ + + def __init__(self, daemonize=True): + self._threads = [] + self._group = threading2.ThreadGroup() + self._daemonize = daemonize + + def submit(self, fn, *args, **kwargs): + t = threading2.Thread(target=fn, group=self._group, + args=args, kwargs=kwargs) + t.daemon = self._daemonize + self._threads.append(t) + t.start() + + def await_termination(self, timeout=None): + if not self._threads: + return + return self._group.join(timeout)