Merge "Allow loading conductors via entrypoints"
This commit is contained in:
@@ -67,14 +67,15 @@ Interfaces
|
||||
Implementations
|
||||
===============
|
||||
|
||||
.. automodule:: taskflow.conductors.single_threaded
|
||||
.. automodule:: taskflow.conductors.backends
|
||||
.. automodule:: taskflow.conductors.backends.impl_blocking
|
||||
|
||||
Hierarchy
|
||||
=========
|
||||
|
||||
.. inheritance-diagram::
|
||||
taskflow.conductors.base
|
||||
taskflow.conductors.single_threaded
|
||||
taskflow.conductors.backends.impl_blocking
|
||||
:parts: 1
|
||||
|
||||
.. _railroad conductors: http://en.wikipedia.org/wiki/Conductor_%28transportation%29
|
||||
|
||||
@@ -37,6 +37,9 @@ packages =
|
||||
taskflow.jobboards =
|
||||
zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard
|
||||
|
||||
taskflow.conductors =
|
||||
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
|
||||
|
||||
taskflow.persistence =
|
||||
dir = taskflow.persistence.backends.impl_dir:DirBackend
|
||||
file = taskflow.persistence.backends.impl_dir:DirBackend
|
||||
|
||||
45
taskflow/conductors/backends/__init__.py
Normal file
45
taskflow/conductors/backends/__init__.py
Normal file
@@ -0,0 +1,45 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import stevedore.driver
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
|
||||
# NOTE(harlowja): this is the entrypoint namespace, not the module namespace.
|
||||
CONDUCTOR_NAMESPACE = 'taskflow.conductors'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch(kind, name, jobboard, namespace=CONDUCTOR_NAMESPACE, **kwargs):
|
||||
"""Fetch a conductor backend with the given options.
|
||||
|
||||
This fetch method will look for the entrypoint 'kind' in the entrypoint
|
||||
namespace, and then attempt to instantiate that entrypoint using the
|
||||
provided name, jobboard and any board specific kwargs.
|
||||
"""
|
||||
LOG.debug('Looking for %r conductor driver in %r', kind, namespace)
|
||||
try:
|
||||
mgr = stevedore.driver.DriverManager(
|
||||
namespace, kind,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(name, jobboard),
|
||||
invoke_kwds=kwargs)
|
||||
return mgr.driver
|
||||
except RuntimeError as e:
|
||||
raise exc.NotFound("Could not find conductor %s" % (kind), e)
|
||||
175
taskflow/conductors/backends/impl_blocking.py
Normal file
175
taskflow/conductors/backends/impl_blocking.py
Normal file
@@ -0,0 +1,175 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import deprecation
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
|
||||
|
||||
class BlockingConductor(base.Conductor):
|
||||
"""A conductor that runs jobs in its own dispatching loop.
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs in its local thread (blocking further work from being claimed
|
||||
and consumed) and then consume those work units after completetion. This
|
||||
process will repeat until the conductor has been stopped or other critical
|
||||
error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a task failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None):
|
||||
super(BlockingConductor, self).__init__(
|
||||
name, jobboard, persistence=persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
self._wait_timeout = tt.Timeout(float(wait_timeout))
|
||||
elif isinstance(wait_timeout, tt.Timeout):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading_utils.Event()
|
||||
|
||||
@deprecation.removed_kwarg('timeout',
|
||||
version="0.8", removal_version="?")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
:param timeout: This parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for
|
||||
the conductor to gracefully shut down, :meth:`wait`
|
||||
should be used instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
consume = True
|
||||
with logging_listener.LoggingListener(engine, log=LOG):
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
try:
|
||||
engine.run()
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if LOG.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
LOG.warn("Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
LOG.warn("%s. %s", i + 1, f.pformat(traceback=True))
|
||||
except NO_CONSUME_EXCEPTIONS:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
LOG.warn("Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return async_utils.make_completed_future(consume)
|
||||
|
||||
def run(self):
|
||||
self._dead.clear()
|
||||
try:
|
||||
while True:
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
dispatched = 0
|
||||
for job in self._jobboard.iterjobs():
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
LOG.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
consume = False
|
||||
try:
|
||||
f = self._dispatch_job(job)
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
dispatched += 1
|
||||
consume = f.result()
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
if dispatched == 0 and not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be False.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
||||
@@ -24,7 +24,7 @@ from taskflow.utils import lock_utils
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Conductor(object):
|
||||
"""Conductors conduct jobs & assist in associated runtime interactions.
|
||||
"""Base for all conductor implementations.
|
||||
|
||||
Conductors act as entities which extract jobs from a jobboard, assign
|
||||
there work to some engine (using some desired configuration) and then wait
|
||||
@@ -34,8 +34,8 @@ class Conductor(object):
|
||||
period of time will finish up the prior failed conductors work.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None):
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None, engine_options=None):
|
||||
self._name = name
|
||||
self._jobboard = jobboard
|
||||
self._engine = engine
|
||||
@@ -101,7 +101,7 @@ class Conductor(object):
|
||||
|
||||
@lock_utils.locked
|
||||
def close(self):
|
||||
"""Closes the jobboard, disallowing further use."""
|
||||
"""Closes the contained jobboard, disallowing further use."""
|
||||
self._jobboard.close()
|
||||
|
||||
@abc.abstractmethod
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
@@ -12,163 +14,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.conductors.backends import impl_blocking
|
||||
from taskflow.utils import deprecation
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
# TODO(harlowja): remove this module soon...
|
||||
deprecation.removed_module(__name__,
|
||||
replacement_name="the conductor entrypoints",
|
||||
version="0.8", removal_version="?")
|
||||
|
||||
|
||||
class SingleThreadedConductor(base.Conductor):
|
||||
"""A conductor that runs jobs in its own dispatching loop.
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs in its local thread (blocking further work from being claimed
|
||||
and consumed) and then consume those work units after completetion. This
|
||||
process will repeat until the conductor has been stopped or other critical
|
||||
error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a task failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None, wait_timeout=None):
|
||||
super(SingleThreadedConductor, self).__init__(
|
||||
name, jobboard, persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
self._wait_timeout = tt.Timeout(float(wait_timeout))
|
||||
elif isinstance(wait_timeout, tt.Timeout):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading_utils.Event()
|
||||
|
||||
@deprecation.removed_kwarg('timeout',
|
||||
version="0.8", removal_version="?")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
:param timeout: This parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for
|
||||
the conductor to gracefully shut down, :meth:`wait`
|
||||
should be used instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
consume = True
|
||||
with logging_listener.LoggingListener(engine, log=LOG):
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
try:
|
||||
engine.run()
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if LOG.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
LOG.warn("Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
LOG.warn("%s. %s", i + 1, f.pformat(traceback=True))
|
||||
except NO_CONSUME_EXCEPTIONS:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
LOG.warn("Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return async_utils.make_completed_future(consume)
|
||||
|
||||
def run(self):
|
||||
self._dead.clear()
|
||||
try:
|
||||
while True:
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
dispatched = 0
|
||||
for job in self._jobboard.iterjobs():
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
LOG.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
consume = False
|
||||
try:
|
||||
f = self._dispatch_job(job)
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
dispatched += 1
|
||||
consume = f.result()
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
if dispatched == 0 and not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be False.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
||||
# TODO(harlowja): remove this proxy/legacy class soon...
|
||||
SingleThreadedConductor = deprecation.moved_inheritable_class(
|
||||
impl_blocking.BlockingConductor, 'SingleThreadedConductor',
|
||||
__name__, version="0.8", removal_version="?")
|
||||
|
||||
@@ -19,7 +19,7 @@ import contextlib
|
||||
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.conductors import single_threaded as stc
|
||||
from taskflow.conductors import backends
|
||||
from taskflow import engines
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
from taskflow.jobs import base
|
||||
@@ -50,10 +50,13 @@ def test_factory(blowup):
|
||||
return f
|
||||
|
||||
|
||||
class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
|
||||
|
||||
class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
KIND = 'blocking'
|
||||
|
||||
def make_components(self, name='testing', wait_timeout=0.1):
|
||||
client = fake_client.FakeClient()
|
||||
@@ -61,9 +64,10 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
board = impl_zookeeper.ZookeeperJobBoard(name, {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
conductor = stc.SingleThreadedConductor(name, board, persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
return self.ComponentBundle(board, client, persistence, conductor)
|
||||
conductor = backends.fetch(self.KIND, name, board,
|
||||
persistence=persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
return ComponentBundle(board, client, persistence, conductor)
|
||||
|
||||
def test_connection(self):
|
||||
components = self.make_components()
|
||||
@@ -118,8 +118,8 @@ class MovedClassProxy(object):
|
||||
type(self).__name__, id(self), wrapped, id(wrapped))
|
||||
|
||||
|
||||
def _generate_moved_message(prefix, postfix=None, message=None,
|
||||
version=None, removal_version=None):
|
||||
def _generate_message(prefix, postfix=None, message=None,
|
||||
version=None, removal_version=None):
|
||||
message_components = [prefix]
|
||||
if version:
|
||||
message_components.append(" in version '%s'" % version)
|
||||
@@ -143,9 +143,9 @@ def renamed_kwarg(old_name, new_name, message=None,
|
||||
|
||||
prefix = _KWARG_MOVED_PREFIX_TPL % old_name
|
||||
postfix = _KWARG_MOVED_POSTFIX_TPL % new_name
|
||||
out_message = _generate_moved_message(prefix, postfix=postfix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix, postfix=postfix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
@@ -165,9 +165,9 @@ def removed_kwarg(old_name, message=None,
|
||||
"""Decorates a kwarg accepting function to deprecate a removed kwarg."""
|
||||
|
||||
prefix = _KWARG_MOVED_PREFIX_TPL % old_name
|
||||
out_message = _generate_moved_message(prefix, postfix=None,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix, postfix=None,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
@@ -204,7 +204,7 @@ def _moved_decorator(kind, new_attribute_name, message=None,
|
||||
old_name = ".".join((base_name, old_attribute_name))
|
||||
new_name = ".".join((base_name, new_attribute_name))
|
||||
prefix = _KIND_MOVED_PREFIX_TPL % (kind, old_name, new_name)
|
||||
out_message = _generate_moved_message(
|
||||
out_message = _generate_message(
|
||||
prefix, message=message,
|
||||
version=version, removal_version=removal_version)
|
||||
deprecation(out_message, stacklevel=stacklevel)
|
||||
@@ -215,6 +215,20 @@ def _moved_decorator(kind, new_attribute_name, message=None,
|
||||
return decorator
|
||||
|
||||
|
||||
def removed_module(module_name, replacement_name=None, message=None,
|
||||
version=None, removal_version=None, stacklevel=4):
|
||||
prefix = "The '%s' module usage is deprecated" % module_name
|
||||
if replacement_name:
|
||||
postfix = ", please use %s instead" % replacement_name
|
||||
else:
|
||||
postfix = None
|
||||
out_message = _generate_message(prefix,
|
||||
postfix=postfix, message=message,
|
||||
version=version,
|
||||
removal_version=removal_version)
|
||||
deprecation(out_message, stacklevel=stacklevel)
|
||||
|
||||
|
||||
def moved_property(new_attribute_name, message=None,
|
||||
version=None, removal_version=None, stacklevel=3):
|
||||
"""Decorates a *instance* property that was moved to another location."""
|
||||
@@ -240,9 +254,9 @@ def moved_inheritable_class(new_class, old_class_name, old_module_name,
|
||||
old_name = ".".join((old_module_name, old_class_name))
|
||||
new_name = reflection.get_class_name(new_class)
|
||||
prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name)
|
||||
out_message = _generate_moved_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
@@ -273,7 +287,7 @@ def moved_class(new_class, old_class_name, old_module_name, message=None,
|
||||
old_name = ".".join((old_module_name, old_class_name))
|
||||
new_name = reflection.get_class_name(new_class)
|
||||
prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name)
|
||||
out_message = _generate_moved_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
return MovedClassProxy(new_class, out_message, stacklevel=stacklevel)
|
||||
|
||||
Reference in New Issue
Block a user