diff --git a/doc/source/conductors.rst b/doc/source/conductors.rst index 56fb0e0e..16191ff0 100644 --- a/doc/source/conductors.rst +++ b/doc/source/conductors.rst @@ -67,14 +67,15 @@ Interfaces Implementations =============== -.. automodule:: taskflow.conductors.single_threaded +.. automodule:: taskflow.conductors.backends +.. automodule:: taskflow.conductors.backends.impl_blocking Hierarchy ========= .. inheritance-diagram:: taskflow.conductors.base - taskflow.conductors.single_threaded + taskflow.conductors.backends.impl_blocking :parts: 1 .. _railroad conductors: http://en.wikipedia.org/wiki/Conductor_%28transportation%29 diff --git a/setup.cfg b/setup.cfg index fcaff44d..d9ffce21 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,9 @@ packages = taskflow.jobboards = zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard +taskflow.conductors = + blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor + taskflow.persistence = dir = taskflow.persistence.backends.impl_dir:DirBackend file = taskflow.persistence.backends.impl_dir:DirBackend diff --git a/taskflow/conductors/backends/__init__.py b/taskflow/conductors/backends/__init__.py new file mode 100644 index 00000000..0fd75305 --- /dev/null +++ b/taskflow/conductors/backends/__init__.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import stevedore.driver + +from taskflow import exceptions as exc + +# NOTE(harlowja): this is the entrypoint namespace, not the module namespace. +CONDUCTOR_NAMESPACE = 'taskflow.conductors' + +LOG = logging.getLogger(__name__) + + +def fetch(kind, name, jobboard, namespace=CONDUCTOR_NAMESPACE, **kwargs): + """Fetch a conductor backend with the given options. + + This fetch method will look for the entrypoint 'kind' in the entrypoint + namespace, and then attempt to instantiate that entrypoint using the + provided name, jobboard and any board specific kwargs. + """ + LOG.debug('Looking for %r conductor driver in %r', kind, namespace) + try: + mgr = stevedore.driver.DriverManager( + namespace, kind, + invoke_on_load=True, + invoke_args=(name, jobboard), + invoke_kwds=kwargs) + return mgr.driver + except RuntimeError as e: + raise exc.NotFound("Could not find conductor %s" % (kind), e) diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py new file mode 100644 index 00000000..c53248ee --- /dev/null +++ b/taskflow/conductors/backends/impl_blocking.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from taskflow.conductors import base +from taskflow import exceptions as excp +from taskflow.listeners import logging as logging_listener +from taskflow import logging +from taskflow.types import timing as tt +from taskflow.utils import async_utils +from taskflow.utils import deprecation +from taskflow.utils import threading_utils + +LOG = logging.getLogger(__name__) +WAIT_TIMEOUT = 0.5 +NO_CONSUME_EXCEPTIONS = tuple([ + excp.ExecutionFailure, + excp.StorageFailure, +]) + + +class BlockingConductor(base.Conductor): + """A conductor that runs jobs in its own dispatching loop. + + This conductor iterates over jobs in the provided jobboard (waiting for + the given timeout if no jobs exist) and attempts to claim them, work on + those jobs in its local thread (blocking further work from being claimed + and consumed) and then consume those work units after completetion. This + process will repeat until the conductor has been stopped or other critical + error occurs. + + NOTE(harlowja): consumption occurs even if a engine fails to run due to + a task failure. This is only skipped when an execution failure or + a storage failure occurs which are *usually* correctable by re-running on + a different conductor (storage failures and execution failures may be + transient issues that can be worked around by later execution). If a job + after completing can not be consumed or abandoned the conductor relies + upon the jobboard capabilities to automatically abandon these jobs. + """ + + def __init__(self, name, jobboard, + persistence=None, engine=None, + engine_options=None, wait_timeout=None): + super(BlockingConductor, self).__init__( + name, jobboard, persistence=persistence, + engine=engine, engine_options=engine_options) + if wait_timeout is None: + wait_timeout = WAIT_TIMEOUT + if isinstance(wait_timeout, (int, float) + six.string_types): + self._wait_timeout = tt.Timeout(float(wait_timeout)) + elif isinstance(wait_timeout, tt.Timeout): + self._wait_timeout = wait_timeout + else: + raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) + self._dead = threading_utils.Event() + + @deprecation.removed_kwarg('timeout', + version="0.8", removal_version="?") + def stop(self, timeout=None): + """Requests the conductor to stop dispatching. + + This method can be used to request that a conductor stop its + consumption & dispatching loop. + + The method returns immediately regardless of whether the conductor has + been stopped. + + :param timeout: This parameter is **deprecated** and is present for + backward compatibility **only**. In order to wait for + the conductor to gracefully shut down, :meth:`wait` + should be used instead. + """ + self._wait_timeout.interrupt() + + @property + def dispatching(self): + return not self._dead.is_set() + + def _dispatch_job(self, job): + engine = self._engine_from_job(job) + consume = True + with logging_listener.LoggingListener(engine, log=LOG): + LOG.debug("Dispatching engine %s for job: %s", engine, job) + try: + engine.run() + except excp.WrappedFailure as e: + if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): + consume = False + if LOG.isEnabledFor(logging.WARNING): + if consume: + LOG.warn("Job execution failed (consumption being" + " skipped): %s [%s failures]", job, len(e)) + else: + LOG.warn("Job execution failed (consumption" + " proceeding): %s [%s failures]", job, len(e)) + # Show the failure/s + traceback (if possible)... + for i, f in enumerate(e): + LOG.warn("%s. %s", i + 1, f.pformat(traceback=True)) + except NO_CONSUME_EXCEPTIONS: + LOG.warn("Job execution failed (consumption being" + " skipped): %s", job, exc_info=True) + consume = False + except Exception: + LOG.warn("Job execution failed (consumption proceeding): %s", + job, exc_info=True) + else: + LOG.info("Job completed successfully: %s", job) + return async_utils.make_completed_future(consume) + + def run(self): + self._dead.clear() + try: + while True: + if self._wait_timeout.is_stopped(): + break + dispatched = 0 + for job in self._jobboard.iterjobs(): + if self._wait_timeout.is_stopped(): + break + LOG.debug("Trying to claim job: %s", job) + try: + self._jobboard.claim(job, self._name) + except (excp.UnclaimableJob, excp.NotFound): + LOG.debug("Job already claimed or consumed: %s", job) + continue + consume = False + try: + f = self._dispatch_job(job) + except Exception: + LOG.warn("Job dispatching failed: %s", job, + exc_info=True) + else: + dispatched += 1 + consume = f.result() + try: + if consume: + self._jobboard.consume(job, self._name) + else: + self._jobboard.abandon(job, self._name) + except (excp.JobFailure, excp.NotFound): + if consume: + LOG.warn("Failed job consumption: %s", job, + exc_info=True) + else: + LOG.warn("Failed job abandonment: %s", job, + exc_info=True) + if dispatched == 0 and not self._wait_timeout.is_stopped(): + self._wait_timeout.wait() + finally: + self._dead.set() + + def wait(self, timeout=None): + """Waits for the conductor to gracefully exit. + + This method waits for the conductor to gracefully exit. An optional + timeout can be provided, which will cause the method to return + within the specified timeout. If the timeout is reached, the returned + value will be False. + + :param timeout: Maximum number of seconds that the :meth:`wait` method + should block for. + """ + return self._dead.wait(timeout) diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index f7546c3e..48344c53 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -24,7 +24,7 @@ from taskflow.utils import lock_utils @six.add_metaclass(abc.ABCMeta) class Conductor(object): - """Conductors conduct jobs & assist in associated runtime interactions. + """Base for all conductor implementations. Conductors act as entities which extract jobs from a jobboard, assign there work to some engine (using some desired configuration) and then wait @@ -34,8 +34,8 @@ class Conductor(object): period of time will finish up the prior failed conductors work. """ - def __init__(self, name, jobboard, persistence, - engine=None, engine_options=None): + def __init__(self, name, jobboard, + persistence=None, engine=None, engine_options=None): self._name = name self._jobboard = jobboard self._engine = engine @@ -101,7 +101,7 @@ class Conductor(object): @lock_utils.locked def close(self): - """Closes the jobboard, disallowing further use.""" + """Closes the contained jobboard, disallowing further use.""" self._jobboard.close() @abc.abstractmethod diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index a52dc347..aa90645f 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -12,163 +14,15 @@ # License for the specific language governing permissions and limitations # under the License. -import six - -from taskflow.conductors import base -from taskflow import exceptions as excp -from taskflow.listeners import logging as logging_listener -from taskflow import logging -from taskflow.types import timing as tt -from taskflow.utils import async_utils +from taskflow.conductors.backends import impl_blocking from taskflow.utils import deprecation -from taskflow.utils import threading_utils -LOG = logging.getLogger(__name__) -WAIT_TIMEOUT = 0.5 -NO_CONSUME_EXCEPTIONS = tuple([ - excp.ExecutionFailure, - excp.StorageFailure, -]) +# TODO(harlowja): remove this module soon... +deprecation.removed_module(__name__, + replacement_name="the conductor entrypoints", + version="0.8", removal_version="?") - -class SingleThreadedConductor(base.Conductor): - """A conductor that runs jobs in its own dispatching loop. - - This conductor iterates over jobs in the provided jobboard (waiting for - the given timeout if no jobs exist) and attempts to claim them, work on - those jobs in its local thread (blocking further work from being claimed - and consumed) and then consume those work units after completetion. This - process will repeat until the conductor has been stopped or other critical - error occurs. - - NOTE(harlowja): consumption occurs even if a engine fails to run due to - a task failure. This is only skipped when an execution failure or - a storage failure occurs which are *usually* correctable by re-running on - a different conductor (storage failures and execution failures may be - transient issues that can be worked around by later execution). If a job - after completing can not be consumed or abandoned the conductor relies - upon the jobboard capabilities to automatically abandon these jobs. - """ - - def __init__(self, name, jobboard, persistence, - engine=None, engine_options=None, wait_timeout=None): - super(SingleThreadedConductor, self).__init__( - name, jobboard, persistence, - engine=engine, engine_options=engine_options) - if wait_timeout is None: - wait_timeout = WAIT_TIMEOUT - if isinstance(wait_timeout, (int, float) + six.string_types): - self._wait_timeout = tt.Timeout(float(wait_timeout)) - elif isinstance(wait_timeout, tt.Timeout): - self._wait_timeout = wait_timeout - else: - raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) - self._dead = threading_utils.Event() - - @deprecation.removed_kwarg('timeout', - version="0.8", removal_version="?") - def stop(self, timeout=None): - """Requests the conductor to stop dispatching. - - This method can be used to request that a conductor stop its - consumption & dispatching loop. - - The method returns immediately regardless of whether the conductor has - been stopped. - - :param timeout: This parameter is **deprecated** and is present for - backward compatibility **only**. In order to wait for - the conductor to gracefully shut down, :meth:`wait` - should be used instead. - """ - self._wait_timeout.interrupt() - - @property - def dispatching(self): - return not self._dead.is_set() - - def _dispatch_job(self, job): - engine = self._engine_from_job(job) - consume = True - with logging_listener.LoggingListener(engine, log=LOG): - LOG.debug("Dispatching engine %s for job: %s", engine, job) - try: - engine.run() - except excp.WrappedFailure as e: - if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): - consume = False - if LOG.isEnabledFor(logging.WARNING): - if consume: - LOG.warn("Job execution failed (consumption being" - " skipped): %s [%s failures]", job, len(e)) - else: - LOG.warn("Job execution failed (consumption" - " proceeding): %s [%s failures]", job, len(e)) - # Show the failure/s + traceback (if possible)... - for i, f in enumerate(e): - LOG.warn("%s. %s", i + 1, f.pformat(traceback=True)) - except NO_CONSUME_EXCEPTIONS: - LOG.warn("Job execution failed (consumption being" - " skipped): %s", job, exc_info=True) - consume = False - except Exception: - LOG.warn("Job execution failed (consumption proceeding): %s", - job, exc_info=True) - else: - LOG.info("Job completed successfully: %s", job) - return async_utils.make_completed_future(consume) - - def run(self): - self._dead.clear() - try: - while True: - if self._wait_timeout.is_stopped(): - break - dispatched = 0 - for job in self._jobboard.iterjobs(): - if self._wait_timeout.is_stopped(): - break - LOG.debug("Trying to claim job: %s", job) - try: - self._jobboard.claim(job, self._name) - except (excp.UnclaimableJob, excp.NotFound): - LOG.debug("Job already claimed or consumed: %s", job) - continue - consume = False - try: - f = self._dispatch_job(job) - except Exception: - LOG.warn("Job dispatching failed: %s", job, - exc_info=True) - else: - dispatched += 1 - consume = f.result() - try: - if consume: - self._jobboard.consume(job, self._name) - else: - self._jobboard.abandon(job, self._name) - except (excp.JobFailure, excp.NotFound): - if consume: - LOG.warn("Failed job consumption: %s", job, - exc_info=True) - else: - LOG.warn("Failed job abandonment: %s", job, - exc_info=True) - if dispatched == 0 and not self._wait_timeout.is_stopped(): - self._wait_timeout.wait() - finally: - self._dead.set() - - def wait(self, timeout=None): - """Waits for the conductor to gracefully exit. - - This method waits for the conductor to gracefully exit. An optional - timeout can be provided, which will cause the method to return - within the specified timeout. If the timeout is reached, the returned - value will be False. - - :param timeout: Maximum number of seconds that the :meth:`wait` method - should block for. - """ - return self._dead.wait(timeout) +# TODO(harlowja): remove this proxy/legacy class soon... +SingleThreadedConductor = deprecation.moved_inheritable_class( + impl_blocking.BlockingConductor, 'SingleThreadedConductor', + __name__, version="0.8", removal_version="?") diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_blocking.py similarity index 90% rename from taskflow/tests/unit/conductor/test_conductor.py rename to taskflow/tests/unit/conductor/test_blocking.py index 8b21e56d..33de7807 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_blocking.py @@ -19,7 +19,7 @@ import contextlib from zake import fake_client -from taskflow.conductors import single_threaded as stc +from taskflow.conductors import backends from taskflow import engines from taskflow.jobs.backends import impl_zookeeper from taskflow.jobs import base @@ -50,10 +50,13 @@ def test_factory(blowup): return f -class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): - ComponentBundle = collections.namedtuple('ComponentBundle', - ['board', 'client', - 'persistence', 'conductor']) +ComponentBundle = collections.namedtuple('ComponentBundle', + ['board', 'client', + 'persistence', 'conductor']) + + +class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase): + KIND = 'blocking' def make_components(self, name='testing', wait_timeout=0.1): client = fake_client.FakeClient() @@ -61,9 +64,10 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): board = impl_zookeeper.ZookeeperJobBoard(name, {}, client=client, persistence=persistence) - conductor = stc.SingleThreadedConductor(name, board, persistence, - wait_timeout=wait_timeout) - return self.ComponentBundle(board, client, persistence, conductor) + conductor = backends.fetch(self.KIND, name, board, + persistence=persistence, + wait_timeout=wait_timeout) + return ComponentBundle(board, client, persistence, conductor) def test_connection(self): components = self.make_components() diff --git a/taskflow/utils/deprecation.py b/taskflow/utils/deprecation.py index 9426f118..55ed982d 100644 --- a/taskflow/utils/deprecation.py +++ b/taskflow/utils/deprecation.py @@ -118,8 +118,8 @@ class MovedClassProxy(object): type(self).__name__, id(self), wrapped, id(wrapped)) -def _generate_moved_message(prefix, postfix=None, message=None, - version=None, removal_version=None): +def _generate_message(prefix, postfix=None, message=None, + version=None, removal_version=None): message_components = [prefix] if version: message_components.append(" in version '%s'" % version) @@ -143,9 +143,9 @@ def renamed_kwarg(old_name, new_name, message=None, prefix = _KWARG_MOVED_PREFIX_TPL % old_name postfix = _KWARG_MOVED_POSTFIX_TPL % new_name - out_message = _generate_moved_message(prefix, postfix=postfix, - message=message, version=version, - removal_version=removal_version) + out_message = _generate_message(prefix, postfix=postfix, + message=message, version=version, + removal_version=removal_version) def decorator(f): @@ -165,9 +165,9 @@ def removed_kwarg(old_name, message=None, """Decorates a kwarg accepting function to deprecate a removed kwarg.""" prefix = _KWARG_MOVED_PREFIX_TPL % old_name - out_message = _generate_moved_message(prefix, postfix=None, - message=message, version=version, - removal_version=removal_version) + out_message = _generate_message(prefix, postfix=None, + message=message, version=version, + removal_version=removal_version) def decorator(f): @@ -204,7 +204,7 @@ def _moved_decorator(kind, new_attribute_name, message=None, old_name = ".".join((base_name, old_attribute_name)) new_name = ".".join((base_name, new_attribute_name)) prefix = _KIND_MOVED_PREFIX_TPL % (kind, old_name, new_name) - out_message = _generate_moved_message( + out_message = _generate_message( prefix, message=message, version=version, removal_version=removal_version) deprecation(out_message, stacklevel=stacklevel) @@ -215,6 +215,20 @@ def _moved_decorator(kind, new_attribute_name, message=None, return decorator +def removed_module(module_name, replacement_name=None, message=None, + version=None, removal_version=None, stacklevel=4): + prefix = "The '%s' module usage is deprecated" % module_name + if replacement_name: + postfix = ", please use %s instead" % replacement_name + else: + postfix = None + out_message = _generate_message(prefix, + postfix=postfix, message=message, + version=version, + removal_version=removal_version) + deprecation(out_message, stacklevel=stacklevel) + + def moved_property(new_attribute_name, message=None, version=None, removal_version=None, stacklevel=3): """Decorates a *instance* property that was moved to another location.""" @@ -240,9 +254,9 @@ def moved_inheritable_class(new_class, old_class_name, old_module_name, old_name = ".".join((old_module_name, old_class_name)) new_name = reflection.get_class_name(new_class) prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name) - out_message = _generate_moved_message(prefix, - message=message, version=version, - removal_version=removal_version) + out_message = _generate_message(prefix, + message=message, version=version, + removal_version=removal_version) def decorator(f): @@ -273,7 +287,7 @@ def moved_class(new_class, old_class_name, old_module_name, message=None, old_name = ".".join((old_module_name, old_class_name)) new_name = reflection.get_class_name(new_class) prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name) - out_message = _generate_moved_message(prefix, - message=message, version=version, - removal_version=removal_version) + out_message = _generate_message(prefix, + message=message, version=version, + removal_version=removal_version) return MovedClassProxy(new_class, out_message, stacklevel=stacklevel)