Remove 2.6 classifier + 2.6 compatibility code

Fixes bug 1445827

Depends-On: I02e3c9aacef0b295a2f823a5cbaf11768a90cb82

Change-Id: I1db681803598ac1bc917fd74a99458bc61edf3f1
This commit is contained in:
Joshua Harlow 2015-06-18 15:48:11 -07:00 committed by Joshua Harlow
parent caf37be345
commit 543cf78a6f
18 changed files with 49 additions and 83 deletions

View File

@ -7,9 +7,6 @@ pbr>=0.11,<2.0
# Packages needed for using this library.
# Only needed on python 2.6
ordereddict
# Python 2->3 compatibility library.
six>=1.9.0

View File

@ -17,7 +17,6 @@ classifier =
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.6
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.4

View File

@ -19,11 +19,6 @@ import abc
import collections
import itertools
try:
from collections import OrderedDict # noqa
except ImportError:
from ordereddict import OrderedDict # noqa
from oslo_utils import reflection
import six
from six.moves import zip as compat_zip
@ -46,23 +41,24 @@ def _save_as_to_mapping(save_as):
# outside of code so that it's more easily understandable, since what an
# atom returns is pretty crucial for other later operations.
if save_as is None:
return OrderedDict()
return collections.OrderedDict()
if isinstance(save_as, six.string_types):
# NOTE(harlowja): this means that your atom will only return one item
# instead of a dictionary-like object or a indexable object (like a
# list or tuple).
return OrderedDict([(save_as, None)])
return collections.OrderedDict([(save_as, None)])
elif isinstance(save_as, _sequence_types):
# NOTE(harlowja): this means that your atom will return a indexable
# object, like a list or tuple and the results can be mapped by index
# to that tuple/list that is returned for others to use.
return OrderedDict((key, num) for num, key in enumerate(save_as))
return collections.OrderedDict((key, num)
for num, key in enumerate(save_as))
elif isinstance(save_as, _set_types):
# NOTE(harlowja): in the case where a set is given we will not be
# able to determine the numeric ordering in a reliable way (since it
# may be an unordered set) so the only way for us to easily map the
# result of the atom will be via the key itself.
return OrderedDict((key, key) for key in save_as)
return collections.OrderedDict((key, key) for key in save_as)
else:
raise TypeError('Atom provides parameter '
'should be str, set or tuple/list, not %r' % save_as)
@ -76,9 +72,9 @@ def _build_rebind_dict(args, rebind_args):
new name onto the required name).
"""
if rebind_args is None:
return OrderedDict()
return collections.OrderedDict()
elif isinstance(rebind_args, (list, tuple)):
rebind = OrderedDict(compat_zip(args, rebind_args))
rebind = collections.OrderedDict(compat_zip(args, rebind_args))
if len(args) < len(rebind_args):
rebind.update((a, a) for a in rebind_args[len(args):])
return rebind
@ -112,7 +108,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
ignore_list = []
# Build the required names.
required = OrderedDict()
required = collections.OrderedDict()
# Add required arguments to required mappings if inference is enabled.
if do_infer:
@ -133,9 +129,9 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
opt_args = sets.OrderedSet(all_args)
opt_args = opt_args - set(itertools.chain(six.iterkeys(required),
iter(ignore_list)))
optional = OrderedDict((a, a) for a in opt_args)
optional = collections.OrderedDict((a, a) for a in opt_args)
else:
optional = OrderedDict()
optional = collections.OrderedDict()
# Check if we are given some extra arguments that we aren't able to accept.
if not reflection.accepts_kwargs(function):
@ -206,14 +202,14 @@ class Atom(object):
self.requires = sets.OrderedSet()
self.optional = sets.OrderedSet()
self.provides = sets.OrderedSet(self.save_as)
self.rebind = OrderedDict()
self.rebind = collections.OrderedDict()
def _build_arg_mapping(self, executor, requires=None, rebind=None,
auto_extract=True, ignore_list=None):
required, optional = _build_arg_mapping(self.name, requires, rebind,
executor, auto_extract,
ignore_list=ignore_list)
rebind = OrderedDict()
rebind = collections.OrderedDict()
for (arg_name, bound_name) in itertools.chain(six.iteritems(required),
six.iteritems(optional)):
rebind.setdefault(arg_name, bound_name)

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
try:
from contextlib import ExitStack # noqa
except ImportError:
@ -26,7 +28,6 @@ from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
WAIT_TIMEOUT = 0.5
@ -69,7 +70,7 @@ class BlockingConductor(base.Conductor):
self._wait_timeout = wait_timeout
else:
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
self._dead = threading_utils.Event()
self._dead = threading.Event()
@removals.removed_kwarg('timeout', version="0.8", removal_version="?")
def stop(self, timeout=None):

View File

@ -19,6 +19,7 @@ import collections
from multiprocessing import managers
import os
import pickle
import threading
from oslo_utils import excutils
from oslo_utils import reflection
@ -240,7 +241,7 @@ class _Dispatcher(object):
raise ValueError("Provided dispatch periodicity must be greater"
" than zero and not '%s'" % dispatch_periodicity)
self._targets = {}
self._dead = threading_utils.Event()
self._dead = threading.Event()
self._dispatch_periodicity = dispatch_periodicity
self._stop_when_empty = False

View File

@ -15,6 +15,7 @@
# under the License.
import collections
import threading
import kombu
from kombu import exceptions as kombu_exceptions
@ -22,7 +23,6 @@ import six
from taskflow.engines.worker_based import dispatcher
from taskflow import logging
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@ -75,7 +75,7 @@ class Proxy(object):
self._topic = topic
self._exchange_name = exchange
self._on_wait = on_wait
self._running = threading_utils.Event()
self._running = threading.Event()
self._dispatcher = dispatcher.TypeDispatcher(
# NOTE(skudriashev): Process all incoming messages only if proxy is
# running, otherwise requeue them.

View File

@ -16,6 +16,7 @@
import collections
import contextlib
import threading
from zake import fake_client
@ -93,7 +94,7 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
def test_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading_utils.Event()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
@ -123,7 +124,7 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
def test_fail_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading_utils.Event()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()

View File

@ -15,6 +15,7 @@
# under the License.
import contextlib
import threading
import time
from kazoo.recipe import watchers
@ -53,8 +54,8 @@ def flush(client, path=None):
# before this context manager exits.
if not path:
path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
created = threading_utils.Event()
deleted = threading_utils.Event()
created = threading.Event()
deleted = threading.Event()
def on_created(data, stat):
if stat is not None:
@ -126,7 +127,7 @@ class BoardTestMixin(object):
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
def test_wait_arrival(self):
ev = threading_utils.Event()
ev = threading.Event()
jobs = []
def poster(wait_post=0.2):

View File

@ -16,6 +16,7 @@
import contextlib
import logging
import threading
import time
from oslo_serialization import jsonutils
@ -38,7 +39,6 @@ from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils
from taskflow.utils import threading_utils
_LOG_LEVELS = frozenset([
@ -89,7 +89,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
self.board.connect()
def _post_claim_job(self, job_name, book=None, details=None):
arrived = threading_utils.Event()
arrived = threading.Event()
def set_on_children(children):
if children:

View File

@ -16,6 +16,7 @@
import collections
import functools
import threading
import time
from taskflow import test
@ -28,17 +29,12 @@ def _spinner(death):
class TestThreadHelpers(test.TestCase):
def test_event_wait(self):
e = tu.Event()
e.set()
self.assertTrue(e.wait())
def test_alive_thread_falsey(self):
for v in [False, 0, None, ""]:
self.assertFalse(tu.is_alive(v))
def test_alive_thread(self):
death = tu.Event()
death = threading.Event()
t = tu.daemon_thread(_spinner, death)
self.assertFalse(tu.is_alive(t))
t.start()
@ -48,7 +44,7 @@ class TestThreadHelpers(test.TestCase):
self.assertFalse(tu.is_alive(t))
def test_daemon_thread(self):
death = tu.Event()
death = threading.Event()
t = tu.daemon_thread(_spinner, death)
self.assertTrue(t.daemon)
@ -59,7 +55,7 @@ class TestThreadBundle(test.TestCase):
def setUp(self):
super(TestThreadBundle, self).setUp()
self.bundle = tu.ThreadBundle()
self.death = tu.Event()
self.death = threading.Event()
self.addCleanup(self.bundle.stop)
self.addCleanup(self.death.set)

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from concurrent import futures
@ -26,7 +27,6 @@ from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.types import failure
from taskflow.utils import threading_utils
class TestWorkerTaskExecutor(test.MockTestCase):
@ -43,7 +43,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.executor_uuid = 'executor-uuid'
self.executor_exchange = 'executor-exchange'
self.executor_topic = 'test-topic1'
self.proxy_started_event = threading_utils.Event()
self.proxy_started_event = threading.Event()
# patch classes
self.proxy_mock, self.proxy_inst_mock = self.patchClass(

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_utils import uuidutils
from taskflow.engines.worker_based import dispatcher
@ -31,7 +33,7 @@ POLLING_INTERVAL = 0.01
class TestMessagePump(test.TestCase):
def test_notify(self):
barrier = threading_utils.Event()
barrier = threading.Event()
on_notify = mock.MagicMock()
on_notify.side_effect = lambda *args, **kwargs: barrier.set()
@ -56,7 +58,7 @@ class TestMessagePump(test.TestCase):
on_notify.assert_called_with({}, mock.ANY)
def test_response(self):
barrier = threading_utils.Event()
barrier = threading.Event()
on_response = mock.MagicMock()
on_response.side_effect = lambda *args, **kwargs: barrier.set()

View File

@ -16,6 +16,7 @@
import contextlib
import string
import threading
import six
@ -26,7 +27,6 @@ from taskflow import retry
from taskflow import task
from taskflow.types import failure
from taskflow.utils import kazoo_utils
from taskflow.utils import threading_utils
ARGS_KEY = '__args__'
KWARGS_KEY = '__kwargs__'
@ -365,7 +365,7 @@ class WaitForOneFromTask(ProgressingTask):
self.wait_states = [wait_states]
else:
self.wait_states = wait_states
self.event = threading_utils.Event()
self.event = threading.Event()
def execute(self):
if not self.event.wait(WAIT_TIMEOUT):

View File

@ -14,10 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
try:
from collections import OrderedDict # noqa
except ImportError:
from ordereddict import OrderedDict # noqa
import collections
import six
@ -66,7 +63,7 @@ class FSM(object):
"""
def __init__(self, start_state):
self._transitions = {}
self._states = OrderedDict()
self._states = collections.OrderedDict()
self._start_state = start_state
self._current = None
self.frozen = False
@ -127,7 +124,7 @@ class FSM(object):
'on_enter': on_enter,
'on_exit': on_exit,
}
self._transitions[state] = OrderedDict()
self._transitions[state] = collections.OrderedDict()
@misc.disallow_when_frozen(FrozenMachine)
def add_reaction(self, state, event, reaction, *args, **kwargs):

View File

@ -16,6 +16,7 @@
import heapq
import inspect
import threading
from debtcollector import removals
from oslo_utils import reflection
@ -23,7 +24,6 @@ import six
from taskflow import logging
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
LOG = logging.getLogger(__name__)
@ -146,7 +146,7 @@ class PeriodicWorker(object):
@removals.removed_kwarg('tombstone', version="0.8", removal_version="?")
def __init__(self, callables, tombstone=None):
if tombstone is None:
self._tombstone = tu.Event()
self._tombstone = threading.Event()
else:
self._tombstone = tombstone
self._callables = []

View File

@ -17,11 +17,6 @@
import collections
import itertools
try:
from collections import OrderedDict # noqa
except ImportError:
from ordereddict import OrderedDict # noqa
import six
@ -51,7 +46,7 @@ class OrderedSet(collections.Set, collections.Hashable):
__slots__ = ['_data']
def __init__(self, iterable=None):
self._data = _merge_in(OrderedDict(), iterable)
self._data = _merge_in(collections.OrderedDict(), iterable)
def __hash__(self):
return self._hash()

View File

@ -14,9 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
import threading
from taskflow.utils import threading_utils
from oslo_utils import timeutils
#: Moved to oslo.utils (just reference them from there until a later time).
Split = timeutils.Split
@ -35,7 +35,7 @@ class Timeout(object):
if timeout < 0:
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
self._timeout = timeout
self._event = threading_utils.Event()
self._event = threading.Event()
def interrupt(self):
self._event.set()

View File

@ -16,7 +16,6 @@
import collections
import multiprocessing
import sys
import threading
import six
@ -25,25 +24,6 @@ from six.moves import _thread
from taskflow.utils import misc
if sys.version_info[0:2] == (2, 6):
# This didn't return that was/wasn't set in 2.6, since we actually care
# whether it did or didn't add that feature by taking the code from 2.7
# that added this functionality...
#
# TODO(harlowja): remove when we can drop 2.6 support.
class Event(threading._Event):
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
return self.__flag
finally:
self.__cond.release()
else:
Event = threading.Event
def is_alive(thread):
"""Helper to determine if a thread is alive (handles none safely)."""
if not thread: