Split utils module

In this commit we split utils module into several parts:
- flow_utils, with code used in running flows;
- threading_utils, with code that helps in working with threads;
- reflection, with code that inspects python objects metadata;
- misc, with all the other code that used to live in utils.py.

We also move graph_utils into taskflow.utils package.

This commit just moves code around. It should not change any logic (with
exception of complex_graph example).

Change-Id: Iebfe45395f0ff502bc00fc7ae14829130b2c6abe
This commit is contained in:
Ivan A. Melnikov
2013-09-03 11:40:21 +04:00
parent 98baedda27
commit 8a79c25292
15 changed files with 374 additions and 301 deletions

View File

@@ -10,7 +10,6 @@ sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
from taskflow import decorators
from taskflow.patterns import graph_flow as gf
from taskflow import utils
def flow_notify(state, details):
@@ -105,15 +104,10 @@ def startup(context, **kwargs):
}
flow_notifier = utils.TransitionNotifier()
flow_notifier.register('*', flow_notify)
task_notifier = utils.TransitionNotifier()
task_notifier.register('*', task_notify)
flow = gf.Flow("make-auto")
flow.notifier = flow_notifier
flow.task_notifier = task_notifier
flow.notifier.register('*', flow_notify)
flow.task_notifier.register('*', task_notify)
# Lets build a car!!
flow.add(build_spec)

View File

@@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
from taskflow.utils import flow_utils
class Flow(object):
@@ -83,8 +83,8 @@ class Flow(object):
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
self.notifier = flow_utils.TransitionNotifier()
self.task_notifier = flow_utils.TransitionNotifier()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)

View File

@@ -24,9 +24,9 @@ from networkx import exception as g_exc
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import graph_utils
from taskflow.patterns import linear_flow
from taskflow import utils
from taskflow.utils import flow_utils
from taskflow.utils import graph_utils
LOG = logging.getLogger(__name__)
@@ -47,7 +47,7 @@ class Flow(linear_flow.Flow):
# together later after all nodes have been added since if we try
# to infer the edges at this stage we likely will fail finding
# dependencies from nodes that don't exist.
r = utils.AOTRunner(task)
r = flow_utils.AOTRunner(task)
self._graph.add_node(r, uuid=r.uuid, infer=infer)
self._reset_internals()
return r.uuid

View File

@@ -26,7 +26,7 @@ from taskflow.openstack.common import excutils
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
from taskflow.utils import flow_utils
from taskflow import flow
@@ -46,7 +46,7 @@ class Flow(flow.Flow):
super(Flow, self).__init__(name, parents, uuid)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
self._accumulator = flow_utils.RollbackAccumulator()
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
@@ -63,7 +63,7 @@ class Flow(flow.Flow):
@decorators.locked
def add(self, task):
"""Adds a given task to this flow."""
r = utils.AOTRunner(task)
r = flow_utils.AOTRunner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
@@ -170,7 +170,8 @@ class Flow(flow.Flow):
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.Rollback(context, runner, self, self.task_notifier)
rb = flow_utils.Rollback(context, runner, self,
self.task_notifier)
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
@@ -212,7 +213,8 @@ class Flow(flow.Flow):
'flow': self,
'runner': runner,
})
self.rollback(context, utils.FlowFailure(runner, self))
self.rollback(context,
flow_utils.FlowFailure(runner, self))
run_check_functor = functools.partial(abort_if,
ok_states=[states.STARTED,

View File

@@ -19,7 +19,7 @@
import logging
from taskflow import states
from taskflow import utils
from taskflow.utils import misc as utils
LOG = logging.getLogger(__name__)

View File

@@ -18,9 +18,11 @@
from taskflow import exceptions as exc
from taskflow import flow
from taskflow import graph_utils
from taskflow import states
from taskflow import utils
from taskflow.utils import flow_utils
from taskflow.utils import graph_utils
from taskflow.utils import threading_utils
import collections
import functools
@@ -122,7 +124,7 @@ class Flow(flow.Flow):
def reset(self):
# All locks are used so that resets can not happen while running or
# cancelling or modifying.
with utils.MultiLock(self._core_locks):
with threading_utils.MultiLock(self._core_locks):
super(Flow, self).reset()
self.results = {}
self.resumer = None
@@ -143,7 +145,7 @@ class Flow(flow.Flow):
# running. Further state management logic is then used while running
# to verify that the flow should still be running when it has been
# cancelled.
with utils.MultiLock(self._cancel_locks):
with threading_utils.MultiLock(self._cancel_locks):
check()
if len(self._graph) == 0:
was_empty = True
@@ -184,7 +186,7 @@ class Flow(flow.Flow):
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
with threading_utils.MultiLock(self._core_locks):
check()
runner = ThreadRunner(task, self, timeout)
self._graph.add_node(runner, infer=infer)
@@ -236,7 +238,7 @@ class Flow(flow.Flow):
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
with threading_utils.MultiLock(self._core_locks):
check()
added = []
for t in tasks:
@@ -271,7 +273,7 @@ class Flow(flow.Flow):
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
with threading_utils.MultiLock(self._core_locks):
(provider, consumer) = check_and_fetch()
self._graph.add_edge(provider, consumer, reason='manual')
LOG.debug("Connecting %s as a manual provider for %s",
@@ -332,7 +334,7 @@ class Flow(flow.Flow):
for r in self._graph.nodes_iter():
r.reset()
r._result_cb = result_cb
executor = utils.ThreadGroupExecutor()
executor = threading_utils.ThreadGroupExecutor()
for r in self._graph.nodes_iter():
executor.submit(r, *args, **kwargs)
executor.await_termination()
@@ -342,7 +344,7 @@ class Flow(flow.Flow):
return
causes = []
for r in failures:
causes.append(utils.FlowFailure(r, self))
causes.append(flow_utils.FlowFailure(r, self))
try:
self.rollback(context, causes)
except exc.InvalidStateException:
@@ -396,7 +398,7 @@ class Flow(flow.Flow):
# mutation lock to stop simultaneous running and simultaneous mutating
# which are not allowed on a running flow. Allow simultaneous cancel
# by performing repeated state checking while running.
with utils.MultiLock(self._run_locks):
with threading_utils.MultiLock(self._run_locks):
check()
connect_and_verify()
try:
@@ -418,13 +420,13 @@ class Flow(flow.Flow):
# All locks must be acquired so that modifications can not be made
# while another entity is running, rolling-back, cancelling or
# performing a mutation operation.
with utils.MultiLock(self._core_locks):
with threading_utils.MultiLock(self._core_locks):
check()
accum = utils.RollbackAccumulator()
accum = flow_utils.RollbackAccumulator()
for r in self._graph.nodes_iter():
if r.has_ran():
accum.add(utils.Rollback(context, r,
self, self.task_notifier))
accum.add(flow_utils.Rollback(context, r,
self, self.task_notifier))
try:
self._change_state(context, states.REVERTING)
accum.rollback(cause)
@@ -432,7 +434,7 @@ class Flow(flow.Flow):
self._change_state(context, states.FAILURE)
class ThreadRunner(utils.Runner):
class ThreadRunner(flow_utils.Runner):
"""A helper class that will use a countdown latch to avoid calling its
callable object until said countdown latch has emptied. After it has
been emptied the predecessor tasks will be examined for dependent results
@@ -465,7 +467,7 @@ class ThreadRunner(utils.Runner):
# simultaneously for a given flow.
self._state_lock = flow._state_lock
self._cancel_lock = threading.RLock()
self._latch = utils.CountDownLatch()
self._latch = threading_utils.CountDownLatch()
# Any related family.
self._predecessors = []
self._successors = []

View File

@@ -26,7 +26,7 @@ import threading
from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils
from taskflow import utils
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -67,7 +67,7 @@ def _taskdetails_merge(td_e, td_new):
def taskdetails_save(td):
with utils.MultiLock(READ_SAVE_ORDER):
with threading_utils.MultiLock(READ_SAVE_ORDER):
try:
return _taskdetails_merge(TASK_DETAILS[td.uuid], td)
except KeyError:
@@ -76,7 +76,7 @@ def taskdetails_save(td):
def flowdetails_save(fd):
try:
with utils.MultiLock(READ_SAVE_ORDER):
with threading_utils.MultiLock(READ_SAVE_ORDER):
e_fd = FLOW_DETAILS[fd.uuid]
if e_fd.meta != fd.meta:
e_fd.meta = fd.meta
@@ -99,7 +99,7 @@ def flowdetails_save(fd):
def clear_all():
with utils.MultiLock(READ_SAVE_ORDER):
with threading_utils.MultiLock(READ_SAVE_ORDER):
count = 0
for lb_id in list(LOG_BOOKS.iterkeys()):
logbook_destroy(lb_id)
@@ -117,7 +117,7 @@ def logbook_get(lb_id):
def logbook_destroy(lb_id):
try:
with utils.MultiLock(READ_SAVE_ORDER):
with threading_utils.MultiLock(READ_SAVE_ORDER):
# Do the same cascading delete that the sql layer does.
lb = LOG_BOOKS.pop(lb_id)
for fd in lb:
@@ -131,7 +131,7 @@ def logbook_destroy(lb_id):
def logbook_save(lb):
# Acquire all the locks that will be needed to perform this operation with
# out being affected by other threads doing it at the same time.
with utils.MultiLock(READ_SAVE_ORDER):
with threading_utils.MultiLock(READ_SAVE_ORDER):
# Get a existing logbook model (or create it if it isn't there).
try:
backing_lb = LOG_BOOKS[lb.uuid]

View File

@@ -19,7 +19,8 @@
import abc
from taskflow import utils
from taskflow.utils import misc
from taskflow.utils import reflection
class BaseTask(object):
@@ -50,7 +51,7 @@ class BaseTask(object):
return self._name
def __str__(self):
return "%s==%s" % (self.name, utils.get_task_version(self))
return "%s==%s" % (self.name, misc.get_task_version(self))
@abc.abstractmethod
def execute(self, context, *args, **kwargs):
@@ -86,10 +87,10 @@ class Task(BaseTask):
arguments names will be added to task requirements
"""
if name is None:
name = utils.get_callable_name(self)
name = reflection.get_callable_name(self)
super(Task, self).__init__(name)
if requires_from_args:
f_args = utils.get_required_callable_args(self.execute)
f_args = reflection.get_required_callable_args(self.execute)
self.requires.update(a for a in f_args if a != 'context')
@@ -115,7 +116,7 @@ class FunctorTask(BaseTask):
"""
name = kwargs.pop('name', None)
if name is None:
name = utils.get_callable_name(execute)
name = reflection.get_callable_name(execute)
super(FunctorTask, self).__init__(name)
self._execute = execute
self._revert = kwargs.pop('revert', None)
@@ -124,7 +125,7 @@ class FunctorTask(BaseTask):
self.provides.update(kwargs.pop('provides', ()))
self.requires.update(kwargs.pop('requires', ()))
if kwargs.pop('auto_extract', True):
f_args = utils.get_required_callable_args(execute)
f_args = reflection.get_required_callable_args(execute)
self.requires.update(a for a in f_args if a != 'context')
if kwargs:
raise TypeError('__init__() got an unexpected keyword argument %r'

View File

@@ -20,7 +20,8 @@ import functools
from taskflow import decorators
from taskflow import test
from taskflow import utils
from taskflow.utils import flow_utils
from taskflow.utils import reflection
class UtilTest(test.TestCase):
@@ -30,7 +31,7 @@ class UtilTest(test.TestCase):
def caller(token, e):
context[token] = True
accum = utils.RollbackAccumulator()
accum = flow_utils.RollbackAccumulator()
def blowup():
for i in range(0, 10):
@@ -94,37 +95,37 @@ class ClassWithInit(object):
class GetCallableNameTest(test.TestCase):
def test_mere_function(self):
name = utils.get_callable_name(mere_function)
name = reflection.get_callable_name(mere_function)
self.assertEquals(name, '.'.join((__name__, 'mere_function')))
def test_method(self):
name = utils.get_callable_name(Class.method)
name = reflection.get_callable_name(Class.method)
self.assertEquals(name, '.'.join((__name__, 'Class', 'method')))
def test_instance_method(self):
name = utils.get_callable_name(Class().method)
name = reflection.get_callable_name(Class().method)
self.assertEquals(name, '.'.join((__name__, 'Class', 'method')))
def test_static_method(self):
# NOTE(imelnikov): static method are just functions, class name
# is not recorded anywhere in them
name = utils.get_callable_name(Class.static_method)
name = reflection.get_callable_name(Class.static_method)
self.assertEquals(name, '.'.join((__name__, 'static_method')))
def test_class_method(self):
name = utils.get_callable_name(Class.class_method)
name = reflection.get_callable_name(Class.class_method)
self.assertEquals(name, '.'.join((__name__, 'Class', 'class_method')))
def test_constructor(self):
name = utils.get_callable_name(Class)
name = reflection.get_callable_name(Class)
self.assertEquals(name, '.'.join((__name__, 'Class')))
def test_callable_class(self):
name = utils.get_callable_name(CallableClass())
name = reflection.get_callable_name(CallableClass())
self.assertEquals(name, '.'.join((__name__, 'CallableClass')))
def test_callable_class_call(self):
name = utils.get_callable_name(CallableClass().__call__)
name = reflection.get_callable_name(CallableClass().__call__)
self.assertEquals(name, '.'.join((__name__, 'CallableClass',
'__call__')))
@@ -132,40 +133,36 @@ class GetCallableNameTest(test.TestCase):
class GetRequiredCallableArgsTest(test.TestCase):
def test_mere_function(self):
self.assertEquals(['a', 'b'],
utils.get_required_callable_args(mere_function))
result = reflection.get_required_callable_args(mere_function)
self.assertEquals(['a', 'b'], result)
def test_function_with_defaults(self):
self.assertEquals(['a', 'b'],
utils.get_required_callable_args(
function_with_defaults))
result = reflection.get_required_callable_args(function_with_defaults)
self.assertEquals(['a', 'b'], result)
def test_method(self):
self.assertEquals(['self', 'c', 'd'],
utils.get_required_callable_args(Class.method))
result = reflection.get_required_callable_args(Class.method)
self.assertEquals(['self', 'c', 'd'], result)
def test_instance_method(self):
self.assertEquals(['c', 'd'],
utils.get_required_callable_args(Class().method))
result = reflection.get_required_callable_args(Class().method)
self.assertEquals(['c', 'd'], result)
def test_class_method(self):
self.assertEquals(['g', 'h'],
utils.get_required_callable_args(
Class.class_method))
result = reflection.get_required_callable_args(Class.class_method)
self.assertEquals(['g', 'h'], result)
def test_class_constructor(self):
self.assertEquals(['k', 'l'],
utils.get_required_callable_args(
ClassWithInit))
result = reflection.get_required_callable_args(ClassWithInit)
self.assertEquals(['k', 'l'], result)
def test_class_with_call(self):
self.assertEquals(['i', 'j'],
utils.get_required_callable_args(
CallableClass()))
result = reflection.get_required_callable_args(CallableClass())
self.assertEquals(['i', 'j'], result)
def test_decorators_work(self):
@decorators.locked
def locked_fun(x, y):
pass
self.assertEquals(['x', 'y'],
utils.get_required_callable_args(locked_fun))
result = reflection.get_required_callable_args(locked_fun)
self.assertEquals(['x', 'y'], result)

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'

View File

@@ -2,8 +2,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -19,233 +18,18 @@
import collections
import copy
import inspect
import logging
import threading
import time
import types
import weakref
import threading2
from distutils import version
from taskflow.openstack.common import uuidutils
from taskflow import states
from taskflow import utils
from taskflow.utils import misc
TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
def get_callable_name(function):
"""Generate a name from callable
Tries to do the best to guess fully qualified callable name.
"""
im_class = getattr(function, 'im_class', None)
if im_class is not None:
if im_class is type:
# this is bound class method
im_class = function.im_self
parts = (im_class.__module__, im_class.__name__,
function.__name__)
elif isinstance(function, types.FunctionType):
parts = (function.__module__, function.__name__)
else:
im_class = type(function)
if im_class is type:
im_class = function
parts = (im_class.__module__, im_class.__name__)
return '.'.join(parts)
def is_bound_method(method):
return getattr(method, 'im_self', None) is not None
def get_required_callable_args(function):
"""Get names of argument required by callable"""
if isinstance(function, type):
bound = True
function = function.__init__
elif isinstance(function, (types.FunctionType, types.MethodType)):
bound = is_bound_method(function)
function = getattr(function, '__wrapped__', function)
else:
function = function.__call__
bound = is_bound_method(function)
argspec = inspect.getargspec(function)
f_args = argspec.args
if argspec.defaults:
f_args = f_args[:-len(argspec.defaults)]
if bound:
f_args = f_args[1:]
return f_args
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = getattr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = '.'.join(str(item) for item in task_version)
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
try:
version_1_tmp = version.StrictVersion(version_1)
version_2_tmp = version.StrictVersion(version_2)
except ValueError:
version_1_tmp = version.LooseVersion(version_1)
version_2_tmp = version.LooseVersion(version_2)
version_1 = version_1_tmp
version_2 = version_2_tmp
if version_1 == version_2 or version_1.version[0] == version_2.version[0]:
return True
return False
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
def __init__(self, count=0):
self.count = count
self.lock = threading.Condition()
def countDown(self):
with self.lock:
self.count -= 1
if self.count <= 0:
self.lock.notifyAll()
def await(self, timeout=None):
end_time = None
if timeout is not None:
timeout = max(0, timeout)
end_time = time.time() + timeout
time_up = False
with self.lock:
while True:
# Stop waiting on these 2 conditions.
if time_up or self.count <= 0:
break
# Was this a spurious wakeup or did we really end??
self.lock.wait(timeout=timeout)
if end_time is not None:
if time.time() >= end_time:
time_up = True
else:
# Reduce the timeout so that we don't wait extra time
# over what we initially were requested to.
timeout = end_time - time.time()
return self.count <= 0
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i
class ThreadGroupExecutor(object):
"""A simple thread executor that spins up new threads (or greenthreads) for
each task to be completed (no pool limit is enforced).
TODO(harlowja): Likely if we use the more advanced executors that come with
the concurrent.futures library we can just get rid of this.
"""
def __init__(self, daemonize=True):
self._threads = []
self._group = threading2.ThreadGroup()
self._daemonize = daemonize
def submit(self, fn, *args, **kwargs):
t = threading2.Thread(target=fn, group=self._group,
args=args, kwargs=kwargs)
t.daemon = self._daemonize
self._threads.append(t)
t.start()
def await_termination(self, timeout=None):
if not self._threads:
return
return self._group.join(timeout)
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
@@ -271,7 +55,7 @@ class Runner(object):
"""
def __init__(self, task, uuid=None):
task_factory = getattr(task, TASK_FACTORY_ATTRIBUTE, None)
task_factory = getattr(task, utils.TASK_FACTORY_ATTRIBUTE, None)
if task_factory:
self.task = task_factory(task)
else:
@@ -306,7 +90,7 @@ class Runner(object):
@property
def version(self):
return get_task_version(self.task)
return misc.get_task_version(self.task)
@property
def name(self):

60
taskflow/utils/misc.py Normal file
View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from distutils import version
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = getattr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = '.'.join(str(item) for item in task_version)
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
try:
version_1_tmp = version.StrictVersion(version_1)
version_2_tmp = version.StrictVersion(version_2)
except ValueError:
version_1_tmp = version.LooseVersion(version_1)
version_2_tmp = version.LooseVersion(version_2)
version_1 = version_1_tmp
version_2 = version_2_tmp
if version_1 == version_2 or version_1.version[0] == version_2.version[0]:
return True
return False
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i

View File

@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import types
def get_callable_name(function):
"""Generate a name from callable
Tries to do the best to guess fully qualified callable name.
"""
im_class = getattr(function, 'im_class', None)
if im_class is not None:
if im_class is type:
# this is bound class method
im_class = function.im_self
parts = (im_class.__module__, im_class.__name__,
function.__name__)
elif isinstance(function, types.FunctionType):
parts = (function.__module__, function.__name__)
else:
im_class = type(function)
if im_class is type:
im_class = function
parts = (im_class.__module__, im_class.__name__)
return '.'.join(parts)
def is_bound_method(method):
return getattr(method, 'im_self', None) is not None
def get_required_callable_args(function):
"""Get names of argument required by callable"""
if isinstance(function, type):
bound = True
function = function.__init__
elif isinstance(function, (types.FunctionType, types.MethodType)):
bound = is_bound_method(function)
function = getattr(function, '__wrapped__', function)
else:
function = function.__call__
bound = is_bound_method(function)
argspec = inspect.getargspec(function)
f_args = argspec.args
if argspec.defaults:
f_args = f_args[:-len(argspec.defaults)]
if bound:
f_args = f_args[1:]
return f_args

View File

@@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import threading2
import time
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
def __init__(self, count=0):
self.count = count
self.lock = threading.Condition()
def countDown(self):
with self.lock:
self.count -= 1
if self.count <= 0:
self.lock.notifyAll()
def await(self, timeout=None):
end_time = None
if timeout is not None:
timeout = max(0, timeout)
end_time = time.time() + timeout
time_up = False
with self.lock:
while True:
# Stop waiting on these 2 conditions.
if time_up or self.count <= 0:
break
# Was this a spurious wakeup or did we really end??
self.lock.wait(timeout=timeout)
if end_time is not None:
if time.time() >= end_time:
time_up = True
else:
# Reduce the timeout so that we don't wait extra time
# over what we initially were requested to.
timeout = end_time - time.time()
return self.count <= 0
class ThreadGroupExecutor(object):
"""A simple thread executor that spins up new threads (or greenthreads) for
each task to be completed (no pool limit is enforced).
TODO(harlowja): Likely if we use the more advanced executors that come with
the concurrent.futures library we can just get rid of this.
"""
def __init__(self, daemonize=True):
self._threads = []
self._group = threading2.ThreadGroup()
self._daemonize = daemonize
def submit(self, fn, *args, **kwargs):
t = threading2.Thread(target=fn, group=self._group,
args=args, kwargs=kwargs)
t.daemon = self._daemonize
self._threads.append(t)
t.start()
def await_termination(self, timeout=None):
if not self._threads:
return
return self._group.join(timeout)