diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 6f2e8b82..f03aa8e2 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -33,7 +33,6 @@ from taskflow import logging from taskflow import task as task_atom from taskflow.types import failure from taskflow.types import notifier -from taskflow.types import timing from taskflow.utils import async_utils from taskflow.utils import threading_utils @@ -176,7 +175,7 @@ class _WaitWorkItem(object): 'kind': _KIND_COMPLETE_ME, } if self._channel.put(message): - watch = timing.StopWatch() + watch = timeutils.StopWatch() watch.start() self._barrier.wait() LOG.blather("Waited %s seconds until task '%s' %s emitted" @@ -305,7 +304,7 @@ class _Dispatcher(object): " %s to target '%s'", kind, sender, target) def run(self, queue): - watch = timing.StopWatch(duration=self._dispatch_periodicity) + watch = timeutils.StopWatch(duration=self._dispatch_periodicity) while (not self._dead.is_set() or (self._stop_when_empty and self._targets)): watch.restart() diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 823f83c9..44913064 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -28,7 +28,6 @@ from taskflow.engines.action_engine import executor from taskflow import exceptions as excp from taskflow import logging from taskflow.types import failure as ft -from taskflow.types import timing as tt from taskflow.utils import schema_utils as su # NOTE(skudriashev): This is protocol states and events, which are not @@ -239,7 +238,7 @@ class Request(Message): self._event = ACTION_TO_EVENT[action] self._arguments = arguments self._kwargs = kwargs - self._watch = tt.StopWatch(duration=timeout).start() + self._watch = timeutils.StopWatch(duration=timeout).start() self._state = WAITING self._lock = threading.Lock() self._created_on = timeutils.utcnow() diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 033bd3c7..f1bd0fcd 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -17,6 +17,7 @@ import functools from oslo_utils import reflection +from oslo_utils import timeutils from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr @@ -24,7 +25,6 @@ from taskflow.engines.worker_based import proxy from taskflow import logging from taskflow.types import failure as ft from taskflow.types import notifier as nt -from taskflow.types import timing as tt from taskflow.utils import kombu_utils as ku from taskflow.utils import misc @@ -76,7 +76,7 @@ class Server(object): def _on_receive(content, message): LOG.debug("Submitting message '%s' for execution in the" " future to '%s'", ku.DelayedPretty(message), func_name) - watch = tt.StopWatch() + watch = timeutils.StopWatch() watch.start() try: self._executor.submit(_on_run, watch, content, message) diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 5d212e5c..09a41ab3 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -22,6 +22,7 @@ import threading from futurist import periodics from oslo_utils import reflection +from oslo_utils import timeutils import six from taskflow.engines.worker_based import dispatcher @@ -29,7 +30,6 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base from taskflow.types import notifier -from taskflow.types import timing as tt from taskflow.utils import kombu_utils as ku LOG = logging.getLogger(__name__) @@ -123,7 +123,7 @@ class WorkerFinder(object): """ if workers <= 0: raise ValueError("Worker amount must be greater than zero") - watch = tt.StopWatch(duration=timeout) + watch = timeutils.StopWatch(duration=timeout) watch.start() with self._cond: while self._total_workers() < workers: diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 4d61dc01..5d5e7255 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -34,7 +34,6 @@ from taskflow import exceptions as exc from taskflow.jobs import base from taskflow import logging from taskflow import states -from taskflow.types import timing from taskflow.utils import misc from taskflow.utils import redis_utils as ru @@ -741,7 +740,7 @@ return cmsgpack.pack(result) # up to the provided max-delay. In the future we could try having # a secondary client connected into redis pubsub and use that # instead, but for now this is simpler. - w = timing.StopWatch(duration=timeout) + w = timeutils.StopWatch(duration=timeout) w.start() delay = initial_delay while True: diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 9df706e7..34a87d5a 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -27,6 +27,7 @@ from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import excutils +from oslo_utils import timeutils from oslo_utils import uuidutils import six @@ -34,7 +35,6 @@ from taskflow import exceptions as excp from taskflow.jobs import base from taskflow import logging from taskflow import states -from taskflow.types import timing as tt from taskflow.utils import kazoo_utils from taskflow.utils import misc @@ -672,7 +672,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def wait(self, timeout=None): # Wait until timeout expires (or forever) for jobs to appear. - watch = tt.StopWatch(duration=timeout) + watch = timeutils.StopWatch(duration=timeout) watch.start() with self._job_cond: while True: diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index e346f08f..d91ff007 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -20,12 +20,12 @@ import itertools import time from debtcollector import moves +from oslo_utils import timeutils from taskflow import exceptions as exc from taskflow.listeners import base from taskflow import logging from taskflow import states -from taskflow.types import timing as tt STARTING_STATES = frozenset((states.RUNNING, states.REVERTING)) FINISHED_STATES = frozenset((base.FINISH_STATES + (states.REVERTED,))) @@ -81,7 +81,7 @@ class DurationListener(base.Listener): if state == states.PENDING: self._timers.pop(task_name, None) elif state in STARTING_STATES: - self._timers[task_name] = tt.StopWatch().start() + self._timers[task_name] = timeutils.StopWatch().start() elif state in FINISHED_STATES: timer = self._timers.pop(task_name, None) if timer is not None: diff --git a/taskflow/types/latch.py b/taskflow/types/latch.py index 07783309..160df511 100644 --- a/taskflow/types/latch.py +++ b/taskflow/types/latch.py @@ -16,7 +16,7 @@ import threading -from taskflow.types import timing as tt +from oslo_utils import timeutils class Latch(object): @@ -55,7 +55,7 @@ class Latch(object): timeout expires otherwise false :rtype: boolean """ - watch = tt.StopWatch(duration=timeout) + watch = timeutils.StopWatch(duration=timeout) watch.start() with self._cond: while self._count > 0: diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 57fe1287..2fa7d20a 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -16,13 +16,13 @@ import threading +from debtcollector import moves from oslo_utils import timeutils -#: Moved to oslo.utils (just reference them from there until a later time). -Split = timeutils.Split -#: Moved to oslo.utils (just reference them from there until a later time). -StopWatch = timeutils.StopWatch +# TODO(harlowja): Keep alias class... around until 2.0 is released. +StopWatch = moves.moved_class(timeutils.StopWatch, 'StopWatch', __name__, + version="1.15", removal_version="2.0") class Timeout(object): diff --git a/tools/speed_test.py b/tools/speed_test.py index 45bca783..f9da37ac 100644 --- a/tools/speed_test.py +++ b/tools/speed_test.py @@ -20,13 +20,13 @@ import argparse import cProfile as profiler import pstats +from oslo_utils import timeutils import six from six.moves import range as compat_range from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow import task -from taskflow.types import timing def print_header(name): @@ -68,7 +68,7 @@ class ProfileIt(object): class TimeIt(object): def __init__(self, name, args): - self.watch = timing.StopWatch() + self.watch = timeutils.StopWatch() self.name = name self.args = args