From 543cf78a6f26fe4929b53a7bfb9cbdc5f2624fbd Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 18 Jun 2015 15:48:11 -0700 Subject: [PATCH] Remove 2.6 classifier + 2.6 compatibility code Fixes bug 1445827 Depends-On: I02e3c9aacef0b295a2f823a5cbaf11768a90cb82 Change-Id: I1db681803598ac1bc917fd74a99458bc61edf3f1 --- requirements.txt | 3 -- setup.cfg | 1 - taskflow/atom.py | 28 ++++++++----------- taskflow/conductors/backends/impl_blocking.py | 5 ++-- taskflow/engines/action_engine/executor.py | 3 +- taskflow/engines/worker_based/proxy.py | 4 +-- .../tests/unit/conductor/test_blocking.py | 5 ++-- taskflow/tests/unit/jobs/base.py | 7 +++-- taskflow/tests/unit/test_listeners.py | 4 +-- .../tests/unit/test_utils_threading_utils.py | 12 +++----- .../tests/unit/worker_based/test_executor.py | 4 +-- .../unit/worker_based/test_message_pump.py | 6 ++-- taskflow/tests/utils.py | 4 +-- taskflow/types/fsm.py | 9 ++---- taskflow/types/periodic.py | 4 +-- taskflow/types/sets.py | 7 +---- taskflow/types/timing.py | 6 ++-- taskflow/utils/threading_utils.py | 20 ------------- 18 files changed, 49 insertions(+), 83 deletions(-) diff --git a/requirements.txt b/requirements.txt index a2ce9064e..f69bfa5eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,9 +7,6 @@ pbr>=0.11,<2.0 # Packages needed for using this library. -# Only needed on python 2.6 -ordereddict - # Python 2->3 compatibility library. six>=1.9.0 diff --git a/setup.cfg b/setup.cfg index a45b6dd8c..ac5b566a0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,7 +17,6 @@ classifier = Operating System :: POSIX :: Linux Programming Language :: Python Programming Language :: Python :: 2 - Programming Language :: Python :: 2.6 Programming Language :: Python :: 2.7 Programming Language :: Python :: 3 Programming Language :: Python :: 3.4 diff --git a/taskflow/atom.py b/taskflow/atom.py index 82671df7e..fba04a975 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -19,11 +19,6 @@ import abc import collections import itertools -try: - from collections import OrderedDict # noqa -except ImportError: - from ordereddict import OrderedDict # noqa - from oslo_utils import reflection import six from six.moves import zip as compat_zip @@ -46,23 +41,24 @@ def _save_as_to_mapping(save_as): # outside of code so that it's more easily understandable, since what an # atom returns is pretty crucial for other later operations. if save_as is None: - return OrderedDict() + return collections.OrderedDict() if isinstance(save_as, six.string_types): # NOTE(harlowja): this means that your atom will only return one item # instead of a dictionary-like object or a indexable object (like a # list or tuple). - return OrderedDict([(save_as, None)]) + return collections.OrderedDict([(save_as, None)]) elif isinstance(save_as, _sequence_types): # NOTE(harlowja): this means that your atom will return a indexable # object, like a list or tuple and the results can be mapped by index # to that tuple/list that is returned for others to use. - return OrderedDict((key, num) for num, key in enumerate(save_as)) + return collections.OrderedDict((key, num) + for num, key in enumerate(save_as)) elif isinstance(save_as, _set_types): # NOTE(harlowja): in the case where a set is given we will not be # able to determine the numeric ordering in a reliable way (since it # may be an unordered set) so the only way for us to easily map the # result of the atom will be via the key itself. - return OrderedDict((key, key) for key in save_as) + return collections.OrderedDict((key, key) for key in save_as) else: raise TypeError('Atom provides parameter ' 'should be str, set or tuple/list, not %r' % save_as) @@ -76,9 +72,9 @@ def _build_rebind_dict(args, rebind_args): new name onto the required name). """ if rebind_args is None: - return OrderedDict() + return collections.OrderedDict() elif isinstance(rebind_args, (list, tuple)): - rebind = OrderedDict(compat_zip(args, rebind_args)) + rebind = collections.OrderedDict(compat_zip(args, rebind_args)) if len(args) < len(rebind_args): rebind.update((a, a) for a in rebind_args[len(args):]) return rebind @@ -112,7 +108,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, ignore_list = [] # Build the required names. - required = OrderedDict() + required = collections.OrderedDict() # Add required arguments to required mappings if inference is enabled. if do_infer: @@ -133,9 +129,9 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, opt_args = sets.OrderedSet(all_args) opt_args = opt_args - set(itertools.chain(six.iterkeys(required), iter(ignore_list))) - optional = OrderedDict((a, a) for a in opt_args) + optional = collections.OrderedDict((a, a) for a in opt_args) else: - optional = OrderedDict() + optional = collections.OrderedDict() # Check if we are given some extra arguments that we aren't able to accept. if not reflection.accepts_kwargs(function): @@ -206,14 +202,14 @@ class Atom(object): self.requires = sets.OrderedSet() self.optional = sets.OrderedSet() self.provides = sets.OrderedSet(self.save_as) - self.rebind = OrderedDict() + self.rebind = collections.OrderedDict() def _build_arg_mapping(self, executor, requires=None, rebind=None, auto_extract=True, ignore_list=None): required, optional = _build_arg_mapping(self.name, requires, rebind, executor, auto_extract, ignore_list=ignore_list) - rebind = OrderedDict() + rebind = collections.OrderedDict() for (arg_name, bound_name) in itertools.chain(six.iteritems(required), six.iteritems(optional)): rebind.setdefault(arg_name, bound_name) diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index fb8a3c3a7..1546c0ada 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + try: from contextlib import ExitStack # noqa except ImportError: @@ -26,7 +28,6 @@ from taskflow.listeners import logging as logging_listener from taskflow import logging from taskflow.types import timing as tt from taskflow.utils import async_utils -from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) WAIT_TIMEOUT = 0.5 @@ -69,7 +70,7 @@ class BlockingConductor(base.Conductor): self._wait_timeout = wait_timeout else: raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) - self._dead = threading_utils.Event() + self._dead = threading.Event() @removals.removed_kwarg('timeout', version="0.8", removal_version="?") def stop(self, timeout=None): diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index c630c1618..2f8ddb099 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -19,6 +19,7 @@ import collections from multiprocessing import managers import os import pickle +import threading from oslo_utils import excutils from oslo_utils import reflection @@ -240,7 +241,7 @@ class _Dispatcher(object): raise ValueError("Provided dispatch periodicity must be greater" " than zero and not '%s'" % dispatch_periodicity) self._targets = {} - self._dead = threading_utils.Event() + self._dead = threading.Event() self._dispatch_periodicity = dispatch_periodicity self._stop_when_empty = False diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index e9d2ec22a..386e37798 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -15,6 +15,7 @@ # under the License. import collections +import threading import kombu from kombu import exceptions as kombu_exceptions @@ -22,7 +23,6 @@ import six from taskflow.engines.worker_based import dispatcher from taskflow import logging -from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -75,7 +75,7 @@ class Proxy(object): self._topic = topic self._exchange_name = exchange self._on_wait = on_wait - self._running = threading_utils.Event() + self._running = threading.Event() self._dispatcher = dispatcher.TypeDispatcher( # NOTE(skudriashev): Process all incoming messages only if proxy is # running, otherwise requeue them. diff --git a/taskflow/tests/unit/conductor/test_blocking.py b/taskflow/tests/unit/conductor/test_blocking.py index 33de78079..caab904f5 100644 --- a/taskflow/tests/unit/conductor/test_blocking.py +++ b/taskflow/tests/unit/conductor/test_blocking.py @@ -16,6 +16,7 @@ import collections import contextlib +import threading from zake import fake_client @@ -93,7 +94,7 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase): def test_run(self): components = self.make_components() components.conductor.connect() - consumed_event = threading_utils.Event() + consumed_event = threading.Event() def on_consume(state, details): consumed_event.set() @@ -123,7 +124,7 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase): def test_fail_run(self): components = self.make_components() components.conductor.connect() - consumed_event = threading_utils.Event() + consumed_event = threading.Event() def on_consume(state, details): consumed_event.set() diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index 8a8bee221..654702aed 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -15,6 +15,7 @@ # under the License. import contextlib +import threading import time from kazoo.recipe import watchers @@ -53,8 +54,8 @@ def flush(client, path=None): # before this context manager exits. if not path: path = FLUSH_PATH_TPL % uuidutils.generate_uuid() - created = threading_utils.Event() - deleted = threading_utils.Event() + created = threading.Event() + deleted = threading.Event() def on_created(data, stat): if stat is not None: @@ -126,7 +127,7 @@ class BoardTestMixin(object): self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) def test_wait_arrival(self): - ev = threading_utils.Event() + ev = threading.Event() jobs = [] def poster(wait_post=0.2): diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 13aa95031..80bac47fe 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -16,6 +16,7 @@ import contextlib import logging +import threading import time from oslo_serialization import jsonutils @@ -38,7 +39,6 @@ from taskflow.test import mock from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils -from taskflow.utils import threading_utils _LOG_LEVELS = frozenset([ @@ -89,7 +89,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): self.board.connect() def _post_claim_job(self, job_name, book=None, details=None): - arrived = threading_utils.Event() + arrived = threading.Event() def set_on_children(children): if children: diff --git a/taskflow/tests/unit/test_utils_threading_utils.py b/taskflow/tests/unit/test_utils_threading_utils.py index 66ef2d09b..f354a989e 100644 --- a/taskflow/tests/unit/test_utils_threading_utils.py +++ b/taskflow/tests/unit/test_utils_threading_utils.py @@ -16,6 +16,7 @@ import collections import functools +import threading import time from taskflow import test @@ -28,17 +29,12 @@ def _spinner(death): class TestThreadHelpers(test.TestCase): - def test_event_wait(self): - e = tu.Event() - e.set() - self.assertTrue(e.wait()) - def test_alive_thread_falsey(self): for v in [False, 0, None, ""]: self.assertFalse(tu.is_alive(v)) def test_alive_thread(self): - death = tu.Event() + death = threading.Event() t = tu.daemon_thread(_spinner, death) self.assertFalse(tu.is_alive(t)) t.start() @@ -48,7 +44,7 @@ class TestThreadHelpers(test.TestCase): self.assertFalse(tu.is_alive(t)) def test_daemon_thread(self): - death = tu.Event() + death = threading.Event() t = tu.daemon_thread(_spinner, death) self.assertTrue(t.daemon) @@ -59,7 +55,7 @@ class TestThreadBundle(test.TestCase): def setUp(self): super(TestThreadBundle, self).setUp() self.bundle = tu.ThreadBundle() - self.death = tu.Event() + self.death = threading.Event() self.addCleanup(self.bundle.stop) self.addCleanup(self.death.set) diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index e7831783d..d944b64bf 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import threading import time from concurrent import futures @@ -26,7 +27,6 @@ from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils from taskflow.types import failure -from taskflow.utils import threading_utils class TestWorkerTaskExecutor(test.MockTestCase): @@ -43,7 +43,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.executor_uuid = 'executor-uuid' self.executor_exchange = 'executor-exchange' self.executor_topic = 'test-topic1' - self.proxy_started_event = threading_utils.Event() + self.proxy_started_event = threading.Event() # patch classes self.proxy_mock, self.proxy_inst_mock = self.patchClass( diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index 59bddbda7..c1a16f585 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + from oslo_utils import uuidutils from taskflow.engines.worker_based import dispatcher @@ -31,7 +33,7 @@ POLLING_INTERVAL = 0.01 class TestMessagePump(test.TestCase): def test_notify(self): - barrier = threading_utils.Event() + barrier = threading.Event() on_notify = mock.MagicMock() on_notify.side_effect = lambda *args, **kwargs: barrier.set() @@ -56,7 +58,7 @@ class TestMessagePump(test.TestCase): on_notify.assert_called_with({}, mock.ANY) def test_response(self): - barrier = threading_utils.Event() + barrier = threading.Event() on_response = mock.MagicMock() on_response.side_effect = lambda *args, **kwargs: barrier.set() diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 43f208bba..6148e0a61 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -16,6 +16,7 @@ import contextlib import string +import threading import six @@ -26,7 +27,6 @@ from taskflow import retry from taskflow import task from taskflow.types import failure from taskflow.utils import kazoo_utils -from taskflow.utils import threading_utils ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' @@ -365,7 +365,7 @@ class WaitForOneFromTask(ProgressingTask): self.wait_states = [wait_states] else: self.wait_states = wait_states - self.event = threading_utils.Event() + self.event = threading.Event() def execute(self): if not self.event.wait(WAIT_TIMEOUT): diff --git a/taskflow/types/fsm.py b/taskflow/types/fsm.py index afb4eb1d2..1ed3193f5 100644 --- a/taskflow/types/fsm.py +++ b/taskflow/types/fsm.py @@ -14,10 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -try: - from collections import OrderedDict # noqa -except ImportError: - from ordereddict import OrderedDict # noqa +import collections import six @@ -66,7 +63,7 @@ class FSM(object): """ def __init__(self, start_state): self._transitions = {} - self._states = OrderedDict() + self._states = collections.OrderedDict() self._start_state = start_state self._current = None self.frozen = False @@ -127,7 +124,7 @@ class FSM(object): 'on_enter': on_enter, 'on_exit': on_exit, } - self._transitions[state] = OrderedDict() + self._transitions[state] = collections.OrderedDict() @misc.disallow_when_frozen(FrozenMachine) def add_reaction(self, state, event, reaction, *args, **kwargs): diff --git a/taskflow/types/periodic.py b/taskflow/types/periodic.py index 7314988c8..20d9a52e9 100644 --- a/taskflow/types/periodic.py +++ b/taskflow/types/periodic.py @@ -16,6 +16,7 @@ import heapq import inspect +import threading from debtcollector import removals from oslo_utils import reflection @@ -23,7 +24,6 @@ import six from taskflow import logging from taskflow.utils import misc -from taskflow.utils import threading_utils as tu LOG = logging.getLogger(__name__) @@ -146,7 +146,7 @@ class PeriodicWorker(object): @removals.removed_kwarg('tombstone', version="0.8", removal_version="?") def __init__(self, callables, tombstone=None): if tombstone is None: - self._tombstone = tu.Event() + self._tombstone = threading.Event() else: self._tombstone = tombstone self._callables = [] diff --git a/taskflow/types/sets.py b/taskflow/types/sets.py index 527ad2a7c..a462189f2 100644 --- a/taskflow/types/sets.py +++ b/taskflow/types/sets.py @@ -17,11 +17,6 @@ import collections import itertools -try: - from collections import OrderedDict # noqa -except ImportError: - from ordereddict import OrderedDict # noqa - import six @@ -51,7 +46,7 @@ class OrderedSet(collections.Set, collections.Hashable): __slots__ = ['_data'] def __init__(self, iterable=None): - self._data = _merge_in(OrderedDict(), iterable) + self._data = _merge_in(collections.OrderedDict(), iterable) def __hash__(self): return self._hash() diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 791bce3ef..57fe12876 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -14,9 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_utils import timeutils +import threading -from taskflow.utils import threading_utils +from oslo_utils import timeutils #: Moved to oslo.utils (just reference them from there until a later time). Split = timeutils.Split @@ -35,7 +35,7 @@ class Timeout(object): if timeout < 0: raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) self._timeout = timeout - self._event = threading_utils.Event() + self._event = threading.Event() def interrupt(self): self._event.set() diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index b2790657c..e859fffa5 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -16,7 +16,6 @@ import collections import multiprocessing -import sys import threading import six @@ -25,25 +24,6 @@ from six.moves import _thread from taskflow.utils import misc -if sys.version_info[0:2] == (2, 6): - # This didn't return that was/wasn't set in 2.6, since we actually care - # whether it did or didn't add that feature by taking the code from 2.7 - # that added this functionality... - # - # TODO(harlowja): remove when we can drop 2.6 support. - class Event(threading._Event): - def wait(self, timeout=None): - self.__cond.acquire() - try: - if not self.__flag: - self.__cond.wait(timeout) - return self.__flag - finally: - self.__cond.release() -else: - Event = threading.Event - - def is_alive(thread): """Helper to determine if a thread is alive (handles none safely).""" if not thread: