Remove six

This library no longer supports Python 2, thus usage of six can be
removed. This also removes workaround about pickle library used in
Python 2 only.

Change-Id: I19d298cf0f402d65f0b142dea0bf35cf992332a9
This commit is contained in:
Takashi Kajinami 2022-05-17 22:56:45 +09:00
parent b5b69e8110
commit 44f17d005f
79 changed files with 266 additions and 455 deletions

View File

@ -43,11 +43,6 @@ Miscellaneous
.. automodule:: taskflow.utils.misc
Mixins
~~~~~~
.. automodule:: taskflow.utils.mixins
Persistence
~~~~~~~~~~~

View File

@ -7,9 +7,6 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
# Packages needed for using this library.
# Python 2->3 compatibility library.
six>=1.10.0 # MIT
# For async and/or periodic work
futurist>=1.2.0 # Apache-2.0

View File

@ -21,8 +21,6 @@ from collections import abc as cabc
import itertools
from oslo_utils import reflection
import six
from six.moves import zip as compat_zip
from taskflow.types import sets
from taskflow.utils import misc
@ -47,7 +45,7 @@ def _save_as_to_mapping(save_as):
# atom returns is pretty crucial for other later operations.
if save_as is None:
return collections.OrderedDict()
if isinstance(save_as, six.string_types):
if isinstance(save_as, str):
# NOTE(harlowja): this means that your atom will only return one item
# instead of a dictionary-like object or a indexable object (like a
# list or tuple).
@ -83,7 +81,7 @@ def _build_rebind_dict(req_args, rebind_args):
# the required argument names (if they are the same length then
# this determines how to remap the required argument names to the
# rebound ones).
rebind = collections.OrderedDict(compat_zip(req_args, rebind_args))
rebind = collections.OrderedDict(zip(req_args, rebind_args))
if len(req_args) < len(rebind_args):
# Extra things were rebound, that may be because of *args
# or **kwargs (or some other reason); so just keep all of them
@ -128,7 +126,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Add additional manually provided requirements to required mappings.
if reqs:
if isinstance(reqs, six.string_types):
if isinstance(reqs, str):
required.update({reqs: reqs})
else:
required.update((a, a) for a in reqs)
@ -139,7 +137,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Determine if there are optional arguments that we may or may not take.
if do_infer:
opt_args = sets.OrderedSet(all_args)
opt_args = opt_args - set(itertools.chain(six.iterkeys(required),
opt_args = opt_args - set(itertools.chain(required.keys(),
iter(ignore_list)))
optional = collections.OrderedDict((a, a) for a in opt_args)
else:
@ -147,7 +145,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Check if we are given some extra arguments that we aren't able to accept.
if not reflection.accepts_kwargs(function):
extra_args = sets.OrderedSet(six.iterkeys(required))
extra_args = sets.OrderedSet(required.keys())
extra_args -= all_args
if extra_args:
raise ValueError('Extra arguments given to atom %s: %s'
@ -161,8 +159,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
return required, optional
@six.add_metaclass(abc.ABCMeta)
class Atom(object):
class Atom(object, metaclass=abc.ABCMeta):
"""An unit of work that causes a flow to progress (in some manner).
An atom is a named object that operates with input data to perform
@ -299,13 +296,13 @@ class Atom(object):
# key value, then well there is no rebinding happening, otherwise
# there will be.
rebind = collections.OrderedDict()
for (arg_name, bound_name) in itertools.chain(six.iteritems(required),
six.iteritems(optional)):
for (arg_name, bound_name) in itertools.chain(required.items(),
optional.items()):
rebind.setdefault(arg_name, bound_name)
requires = sets.OrderedSet(six.itervalues(required))
optional = sets.OrderedSet(six.itervalues(optional))
requires = sets.OrderedSet(required.values())
optional = sets.OrderedSet(optional.values())
if self.inject:
inject_keys = frozenset(six.iterkeys(self.inject))
inject_keys = frozenset(self.inject.keys())
requires -= inject_keys
optional -= inject_keys
return rebind, requires, optional

View File

@ -20,7 +20,6 @@ import threading
from oslo_utils import excutils
from oslo_utils import timeutils
import six
from taskflow.conductors import base
from taskflow import exceptions as excp
@ -34,8 +33,7 @@ from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class ExecutorConductor(base.Conductor):
class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
"""Dispatches jobs from blocking :py:meth:`.run` method to some executor.
This conductor iterates over jobs in the provided jobboard (waiting for

View File

@ -13,7 +13,6 @@
# under the License.
import futurist
import six
from taskflow.conductors.backends import impl_executor
from taskflow.utils import threading_utils as tu
@ -63,7 +62,7 @@ class NonBlockingConductor(impl_executor.ExecutorConductor):
if executor_factory is None:
self._executor_factory = self._default_executor_factory
else:
if not six.callable(executor_factory):
if not callable(executor_factory):
raise ValueError("Provided keyword argument 'executor_factory'"
" must be callable")
self._executor_factory = executor_factory

View File

@ -17,7 +17,6 @@ import os
import threading
import fasteners
import six
from taskflow import engines
from taskflow import exceptions as excp
@ -26,8 +25,7 @@ from taskflow.types import notifier
from taskflow.utils import misc
@six.add_metaclass(abc.ABCMeta)
class Conductor(object):
class Conductor(object, metaclass=abc.ABCMeta):
"""Base for all conductor implementations.
Conductors act as entities which extract jobs from a jobboard, assign

View File

@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from taskflow.utils import misc
@ -74,7 +72,7 @@ class Depth(misc.StrEnum):
if isinstance(desired_depth, cls):
# Nothing to do in the first place...
return desired_depth
if not isinstance(desired_depth, six.string_types):
if not isinstance(desired_depth, str):
raise TypeError("Unexpected desired depth type, string type"
" expected, not %s" % type(desired_depth))
try:

View File

@ -16,13 +16,10 @@
import abc
import six
from taskflow import states
@six.add_metaclass(abc.ABCMeta)
class Action(object):
class Action(object, metaclass=abc.ABCMeta):
"""An action that handles executing, state changes, ... of atoms."""
NO_RESULT = object()

View File

@ -18,7 +18,6 @@ import threading
import fasteners
from oslo_utils import excutils
import six
from taskflow import flow
from taskflow import logging
@ -165,7 +164,7 @@ class FlowCompiler(object):
decomposed = dict(
(child, self._deep_compiler_func(child, parent=tree_node)[0])
for child in flow)
decomposed_graphs = list(six.itervalues(decomposed))
decomposed_graphs = list(decomposed.values())
graph = gr.merge_graphs(graph, *decomposed_graphs,
overlap_detector=_overlap_occurrence_detector)
for u, v, attr_dict in flow.iter_links():

View File

@ -19,7 +19,6 @@ import weakref
from oslo_utils import reflection
from oslo_utils import strutils
import six
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
@ -30,8 +29,7 @@ from taskflow import states as st
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Strategy(object):
class Strategy(object, metaclass=abc.ABCMeta):
"""Failure resolution strategy base class."""
strategy = None

View File

@ -17,8 +17,6 @@
import abc
import itertools
import six
from taskflow import deciders
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import traversal
@ -28,8 +26,7 @@ from taskflow import states
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Decider(object):
class Decider(object, metaclass=abc.ABCMeta):
"""Base class for deciders.
Provides interface to be implemented by sub-classes.
@ -135,7 +132,7 @@ class IgnoreDecider(Decider):
states_intentions = runtime.storage.get_atoms_states(
ed.from_node.name for ed in self._edge_deciders
if ed.kind in compiler.ATOMS)
for atom_name in six.iterkeys(states_intentions):
for atom_name in states_intentions.keys():
atom_state, _atom_intention = states_intentions[atom_name]
if atom_state != states.IGNORE:
history[atom_name] = runtime.storage.get(atom_name)
@ -155,7 +152,7 @@ class IgnoreDecider(Decider):
LOG.trace("Out of %s deciders there were %s 'do no run it'"
" voters, %s 'do run it' voters and %s 'ignored'"
" voters for transition to atom '%s' given history %s",
sum(len(eds) for eds in six.itervalues(voters)),
sum(len(eds) for eds in voters.values()),
list(ed.from_node.name
for ed in voters['do_not_run_it']),
list(ed.from_node.name for ed in voters['run_it']),

View File

@ -22,11 +22,11 @@ import threading
from automaton import runners
from concurrent import futures
import fasteners
import functools
import networkx as nx
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import timeutils
import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
@ -65,7 +65,7 @@ def _pre_check(check_compiled=True, check_storage_ensured=True,
def decorator(meth):
do_what = meth.__name__
@six.wraps(meth)
@functools.wraps(meth)
def wrapper(self, *args, **kwargs):
if check_compiled and not self._compiled:
raise exc.InvalidState("Can not %s an engine which"
@ -335,8 +335,8 @@ class ActionEngine(base.Engine):
e_failures = self.storage.get_execute_failures()
r_failures = self.storage.get_revert_failures()
er_failures = itertools.chain(
six.itervalues(e_failures),
six.itervalues(r_failures))
e_failures.values(),
r_failures.values())
failure.Failure.reraise_if_any(er_failures)
finally:
if w is not None:
@ -594,7 +594,7 @@ String (case insensitive) Executor used
executor_cls = cls._default_executor_cls
# Match the desired executor to a class that will work with it...
desired_executor = options.get('executor')
if isinstance(desired_executor, six.string_types):
if isinstance(desired_executor, str):
matched_executor_cls = None
for m in cls._executor_str_matchers:
if m.matches(desired_executor):

View File

@ -17,7 +17,6 @@
import abc
import futurist
import six
from taskflow import task as ta
from taskflow.types import failure
@ -106,8 +105,7 @@ class SerialRetryExecutor(object):
return fut
@six.add_metaclass(abc.ABCMeta)
class TaskExecutor(object):
class TaskExecutor(object, metaclass=abc.ABCMeta):
"""Executes and reverts tasks.
This class takes task and its arguments and executes or reverts it.

View File

@ -30,7 +30,6 @@ import time
import futurist
from oslo_utils import excutils
import six
from taskflow.engines.action_engine import executor as base
from taskflow import logging
@ -80,19 +79,6 @@ SCHEMAS = {
},
}
# See http://bugs.python.org/issue1457119 for why this is so complex...
_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError]
try:
import cPickle
_DECODE_ENCODE_ERRORS.append(cPickle.PickleError)
del cPickle
except (ImportError, AttributeError):
pass
_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS)
# Use the best pickle from here on out...
from six.moves import cPickle as pickle
class UnknownSender(Exception):
"""Exception raised when message from unknown sender is recvd."""
@ -142,13 +128,13 @@ class Reader(object):
])
def __init__(self, auth_key, dispatch_func, msg_limit=-1):
if not six.callable(dispatch_func):
if not callable(dispatch_func):
raise ValueError("Expected provided dispatch function"
" to be callable")
self.auth_key = auth_key
self.dispatch_func = dispatch_func
msg_limiter = iter_utils.iter_forever(msg_limit)
self.msg_count = six.next(msg_limiter)
self.msg_count = next(msg_limiter)
self._msg_limiter = msg_limiter
self._buffer = misc.BytesIO()
self._state = None
@ -200,7 +186,7 @@ class Reader(object):
# (instead of the receiver discarding it after the fact)...
functools.partial(_decode_message, self.auth_key, data,
self._memory['mac']))
self.msg_count = six.next(self._msg_limiter)
self.msg_count = next(self._msg_limiter)
self._memory.clear()
def _transition(self):
@ -267,7 +253,7 @@ def _create_random_string(desired_length):
def _calculate_hmac(auth_key, body):
mac = hmac.new(auth_key, body, hashlib.md5).hexdigest()
if isinstance(mac, six.text_type):
if isinstance(mac, str):
mac = mac.encode("ascii")
return mac
@ -427,11 +413,8 @@ class DispatcherHandler(asyncore.dispatcher):
CHUNK_SIZE = 8192
def __init__(self, sock, addr, dispatcher):
if six.PY2:
asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock)
else:
super(DispatcherHandler, self).__init__(map=dispatcher.map,
sock=sock)
super(DispatcherHandler, self).__init__(map=dispatcher.map,
sock=sock)
self.blobs_to_write = list(dispatcher.challenge_pieces)
self.reader = Reader(dispatcher.auth_key, self._dispatch)
self.targets = dispatcher.targets
@ -508,7 +491,7 @@ class DispatcherHandler(asyncore.dispatcher):
except (IOError, UnknownSender):
LOG.warning("Invalid received message", exc_info=True)
self.handle_close()
except _DECODE_ENCODE_ERRORS:
except (pickle.PickleError, TypeError):
LOG.warning("Badly formatted message", exc_info=True)
self.handle_close()
except (ValueError, su.ValidationError):
@ -526,10 +509,7 @@ class Dispatcher(asyncore.dispatcher):
MAX_BACKLOG = 5
def __init__(self, map, auth_key, identity):
if six.PY2:
asyncore.dispatcher.__init__(self, map=map)
else:
super(Dispatcher, self).__init__(map=map)
super(Dispatcher, self).__init__(map=map)
self.identity = identity
self.challenge_pieces = _encode_message(auth_key, CHALLENGE,
identity, reverse=True)

View File

@ -17,14 +17,11 @@
import abc
import six
from taskflow.types import notifier
from taskflow.utils import misc
@six.add_metaclass(abc.ABCMeta)
class Engine(object):
class Engine(object, metaclass=abc.ABCMeta):
"""Base for all engines implementations.
:ivar Engine.notifier: A notification object that will dispatch

View File

@ -18,7 +18,6 @@ import contextlib
from oslo_utils import importutils
from oslo_utils import reflection
import six
import stevedore.driver
from taskflow import exceptions as exc
@ -68,7 +67,7 @@ def _fetch_factory(factory_name):
def _fetch_validate_factory(flow_factory):
if isinstance(flow_factory, six.string_types):
if isinstance(flow_factory, str):
factory_fun = _fetch_factory(flow_factory)
factory_name = flow_factory
else:

View File

@ -18,7 +18,6 @@ import functools
import threading
from oslo_utils import timeutils
import six
from taskflow.engines.action_engine import executor
from taskflow.engines.worker_based import dispatcher
@ -141,7 +140,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if not self._ongoing_requests:
return
with self._ongoing_requests_lock:
ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests))
ongoing_requests_uuids = set(self._ongoing_requests.keys())
waiting_requests = {}
expired_requests = {}
for request_uuid in ongoing_requests_uuids:

View File

@ -25,7 +25,6 @@ import futurist
from oslo_serialization import jsonutils
from oslo_utils import reflection
from oslo_utils import timeutils
import six
from taskflow.engines.action_engine import executor
from taskflow import exceptions as excp
@ -148,8 +147,7 @@ def failure_to_dict(failure):
return failure.to_dict(include_args=False)
@six.add_metaclass(abc.ABCMeta)
class Message(object):
class Message(object, metaclass=abc.ABCMeta):
"""Base class for all message types."""
def __repr__(self):
@ -292,7 +290,7 @@ class Request(Message):
},
'action': {
"type": "string",
"enum": list(six.iterkeys(ACTION_TO_EVENT)),
"enum": list(ACTION_TO_EVENT.keys()),
},
# Keyword arguments that end up in the revert() or execute()
# method of the remote task.
@ -367,7 +365,7 @@ class Request(Message):
request['result'] = ('success', result)
if self._failures:
request['failures'] = {}
for atom_name, failure in six.iteritems(self._failures):
for atom_name, failure in self._failures.items():
request['failures'][atom_name] = failure_to_dict(failure)
return request
@ -431,7 +429,7 @@ class Request(Message):
# Validate all failure dictionaries that *may* be present...
failures = []
if 'failures' in data:
failures.extend(six.itervalues(data['failures']))
failures.extend(data['failures'].values())
result = data.get('result')
if result is not None:
result_data_type, result_data = result
@ -470,7 +468,7 @@ class Request(Message):
arguments['result'] = result_data
if failures is not None:
arguments['failures'] = {}
for task, fail_data in six.iteritems(failures):
for task, fail_data in failures.items():
arguments['failures'][task] = ft.Failure.from_dict(fail_data)
return _WorkUnit(task_cls, task_name, action, arguments)

View File

@ -19,7 +19,6 @@ import threading
import kombu
from kombu import exceptions as kombu_exceptions
import six
from taskflow.engines.worker_based import dispatcher
from taskflow import logging
@ -85,7 +84,7 @@ class Proxy(object):
ensure_options = self.DEFAULT_RETRY_OPTIONS.copy()
if retry_options is not None:
# Override the defaults with any user provided values...
for k in set(six.iterkeys(ensure_options)):
for k in set(ensure_options.keys()):
if k in retry_options:
# Ensure that the right type is passed in...
val = retry_options[k]
@ -154,7 +153,7 @@ class Proxy(object):
def publish(self, msg, routing_key, reply_to=None, correlation_id=None):
"""Publish message to the named exchange with given routing key."""
if isinstance(routing_key, six.string_types):
if isinstance(routing_key, str):
routing_keys = [routing_key]
else:
routing_keys = routing_key

View File

@ -19,7 +19,6 @@ import threading
from oslo_utils import reflection
from oslo_utils import timeutils
import six
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
@ -39,7 +38,7 @@ class TopicWorker(object):
def __init__(self, topic, tasks, identity=_NO_IDENTITY):
self.tasks = []
for task in tasks:
if not isinstance(task, six.string_types):
if not isinstance(task, str):
task = reflection.get_class_name(task)
self.tasks.append(task)
self.topic = topic
@ -47,7 +46,7 @@ class TopicWorker(object):
self.last_seen = None
def performs(self, task):
if not isinstance(task, six.string_types):
if not isinstance(task, str):
task = reflection.get_class_name(task)
return task in self.tasks
@ -215,18 +214,18 @@ class ProxyWorkerFinder(object):
dead_workers = {}
with self._cond:
now = timeutils.now()
for topic, worker in six.iteritems(self._workers):
for topic, worker in self._workers.items():
if worker.last_seen is None:
continue
secs_since_last_seen = max(0, now - worker.last_seen)
if secs_since_last_seen >= self._worker_expiry:
dead_workers[topic] = (worker, secs_since_last_seen)
for topic in six.iterkeys(dead_workers):
for topic in dead_workers.keys():
self._workers.pop(topic)
if dead_workers:
self._cond.notify_all()
if dead_workers and LOG.isEnabledFor(logging.INFO):
for worker, secs_since_last_seen in six.itervalues(dead_workers):
for worker, secs_since_last_seen in dead_workers.values():
LOG.info("Removed worker '%s' as it has not responded to"
" notification requests in %0.3f seconds",
worker, secs_since_last_seen)
@ -245,7 +244,7 @@ class ProxyWorkerFinder(object):
"""Gets a worker that can perform a given task."""
available_workers = []
with self._cond:
for worker in six.itervalues(self._workers):
for worker in self._workers.values():
if worker.performs(task):
available_workers.append(worker)
if available_workers:

View File

@ -21,7 +21,7 @@ import shutil
import sys
import tempfile
from six.moves import urllib_parse
from urllib import parse as urllib_parse
from taskflow import exceptions
from taskflow.persistence import backends

View File

@ -30,8 +30,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
import six
from six.moves import range as compat_range
from zake import fake_client
from taskflow import exceptions as excp
@ -139,7 +137,7 @@ def producer(ident, client):
name = "P-%s" % (ident)
safe_print(name, "started")
with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
for i in compat_range(0, PRODUCER_UNITS):
for i in range(0, PRODUCER_UNITS):
job_name = "%s-%s" % (name, i)
details = {
'color': random.choice(['red', 'blue']),
@ -151,22 +149,22 @@ def producer(ident, client):
def main():
if six.PY3:
# TODO(harlowja): Hack to make eventlet work right, remove when the
# following is fixed: https://github.com/eventlet/eventlet/issues/230
from taskflow.utils import eventlet_utils as _eu # noqa
try:
import eventlet as _eventlet # noqa
except ImportError:
pass
# TODO(harlowja): Hack to make eventlet work right, remove when the
# following is fixed: https://github.com/eventlet/eventlet/issues/230
from taskflow.utils import eventlet_utils as _eu # noqa
try:
import eventlet as _eventlet # noqa
except ImportError:
pass
with contextlib.closing(fake_client.FakeClient()) as c:
created = []
for i in compat_range(0, PRODUCERS):
for i in range(0, PRODUCERS):
p = threading_utils.daemon_thread(producer, i + 1, c)
created.append(p)
p.start()
consumed = collections.deque()
for i in compat_range(0, WORKERS):
for i in range(0, WORKERS):
w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
created.append(w)
w.start()

View File

@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
import futurist
from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
@ -86,9 +85,9 @@ def main():
tbl = []
cols = random.randint(1, 100)
rows = random.randint(1, 100)
for _i in compat_range(0, rows):
for _i in range(0, rows):
row = []
for _j in compat_range(0, cols):
for _j in range(0, cols):
row.append(random.random())
tbl.append(row)
@ -112,7 +111,7 @@ def main():
#
# TODO(harlowja): probably easier just to sort instead of search...
computed_tbl = []
for i in compat_range(0, len(tbl)):
for i in range(0, len(tbl)):
for t in f:
if t.index == i:
computed_tbl.append(e.storage.get(t.name))

View File

@ -18,8 +18,6 @@ import logging
import os
import sys
import six
logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
@ -81,6 +79,6 @@ for f in flows:
while engine_iters:
for it in list(engine_iters):
try:
print(six.next(it))
print(next(it))
except StopIteration:
engine_iters.remove(it)

View File

@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
import futurist
import six
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
@ -73,7 +72,7 @@ with futurist.ThreadPoolExecutor() as ex:
# and there is no more engine work to be done.
for it in cloned_iters:
try:
six.next(it)
next(it)
except StopIteration:
try:
iters.remove(it)

View File

@ -33,8 +33,6 @@ sys.path.insert(0, self_dir)
# produced values and perform a final summation and this result will then be
# printed (and verified to ensure the calculation was as expected).
import six
from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
@ -51,7 +49,7 @@ class TotalReducer(task.Task):
def execute(self, *args, **kwargs):
# Reduces all mapped summed outputs into a single value.
total = 0
for (k, v) in six.iteritems(kwargs):
for (k, v) in kwargs.items():
# If any other kwargs was passed in, we don't want to use those
# in the calculation of the total...
if k.startswith('reduction_'):

View File

@ -34,7 +34,6 @@ sys.path.insert(0, top_dir)
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from zake import fake_client
from taskflow.conductors import backends as conductors
@ -114,7 +113,7 @@ def review_iter():
"""Makes reviews (never-ending iterator/generator)."""
review_id_gen = itertools.count(0)
while True:
review_id = six.next(review_id_gen)
review_id = next(review_id_gen)
review = {
'id': review_id,
}
@ -172,7 +171,7 @@ def generate_reviewer(client, saver, name=NAME):
review_generator = review_iter()
with contextlib.closing(jb):
while not no_more.is_set():
review = six.next(review_generator)
review = next(review_generator)
details = {
'store': {
'review': review,

View File

@ -25,8 +25,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
from six.moves import range as compat_range
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
@ -124,7 +122,7 @@ if __name__ == "__main__":
try:
# Create a set of worker threads to simulate actual remote workers...
print('Running %s workers.' % (MEMORY_WORKERS))
for i in compat_range(0, MEMORY_WORKERS):
for i in range(0, MEMORY_WORKERS):
# Give each one its own unique topic name so that they can
# correctly communicate with the engine (they will all share the
# same exchange).

View File

@ -24,8 +24,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
from six.moves import range as compat_range
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
@ -84,7 +82,7 @@ class MandelCalculator(task.Task):
def mandelbrot(x, y, max_iters):
c = complex(x, y)
z = 0.0j
for i in compat_range(max_iters):
for i in range(max_iters):
z = z * z + c
if (z.real * z.real + z.imag * z.imag) >= 4:
return i
@ -95,10 +93,10 @@ class MandelCalculator(task.Task):
pixel_size_x = (max_x - min_x) / width
pixel_size_y = (max_y - min_y) / height
block = []
for y in compat_range(chunk[0], chunk[1]):
for y in range(chunk[0], chunk[1]):
row = []
imag = min_y + y * pixel_size_y
for x in compat_range(0, width):
for x in range(0, width):
real = min_x + x * pixel_size_x
row.append(mandelbrot(real, imag, max_iters))
block.append(row)
@ -133,7 +131,7 @@ def calculate(engine_conf):
# Compose our workflow.
height, _width = IMAGE_SIZE
chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
for i in compat_range(0, CHUNK_COUNT):
for i in range(0, CHUNK_COUNT):
chunk_name = 'chunk_%s' % i
task_name = "calculation_%s" % i
# Break the calculation up into chunk size pieces.
@ -225,7 +223,7 @@ def create_fractal():
try:
# Create a set of workers to simulate actual remote workers.
print('Running %s workers.' % (WORKERS))
for i in compat_range(0, WORKERS):
for i in range(0, WORKERS):
worker_conf['topic'] = 'calculator_%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)

View File

@ -14,13 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
import traceback
from oslo_utils import excutils
from oslo_utils import reflection
import six
from taskflow.utils import mixins
def raise_with_cause(exc_cls, message, *args, **kwargs):
@ -89,7 +88,7 @@ class TaskFlowException(Exception):
if indent < 0:
raise ValueError("Provided 'indent' must be greater than"
" or equal to zero instead of %s" % indent)
buf = six.StringIO()
buf = io.StringIO()
if show_root_class:
buf.write(reflection.get_class_name(self, fully_qualified=False))
buf.write(": ")
@ -244,7 +243,7 @@ class NotImplementedError(NotImplementedError):
"""
class WrappedFailure(mixins.StrMixin, Exception):
class WrappedFailure(Exception):
"""Wraps one or several failure objects.
When exception/s cannot be re-raised (for example, because the value and
@ -298,17 +297,17 @@ class WrappedFailure(mixins.StrMixin, Exception):
return None
def __bytes__(self):
buf = six.BytesIO()
buf = io.BytesIO()
buf.write(b'WrappedFailure: [')
causes_gen = (six.binary_type(cause) for cause in self._causes)
causes_gen = (bytes(cause) for cause in self._causes)
buf.write(b", ".join(causes_gen))
buf.write(b']')
return buf.getvalue()
def __unicode__(self):
buf = six.StringIO()
def __str__(self):
buf = io.StringIO()
buf.write(u'WrappedFailure: [')
causes_gen = (six.text_type(cause) for cause in self._causes)
causes_gen = (str(cause) for cause in self._causes)
buf.write(u", ".join(causes_gen))
buf.write(u']')
return buf.getvalue()

View File

@ -17,7 +17,6 @@
import abc
from oslo_utils import reflection
import six
# Link metadata keys that have inherent/special meaning.
#
@ -43,8 +42,7 @@ _CHOP_PAT_LEN = len(_CHOP_PAT)
LINK_DECIDER_DEPTH = 'decider_depth'
@six.add_metaclass(abc.ABCMeta)
class Flow(object):
class Flow(object, metaclass=abc.ABCMeta):
"""The base abstract class of all flow implementations.
A flow is a structure that defines relationships between tasks. You can
@ -60,7 +58,7 @@ class Flow(object):
"""
def __init__(self, name, retry=None):
self._name = six.text_type(name)
self._name = str(name)
self._retry = retry
# NOTE(akarpinska): if retry doesn't have a name,
# the name of its owner will be assigned

View File

@ -30,8 +30,6 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils
from redis import exceptions as redis_exceptions
from redis import sentinel
import six
from six.moves import range as compat_range
from taskflow import exceptions as exc
from taskflow.jobs import base
@ -620,9 +618,9 @@ return cmsgpack.pack(result)
key_pieces = [key_piece]
if more_key_pieces:
key_pieces.extend(more_key_pieces)
for i in compat_range(0, len(namespace_pieces)):
for i in range(0, len(namespace_pieces)):
namespace_pieces[i] = misc.binary_encode(namespace_pieces[i])
for i in compat_range(0, len(key_pieces)):
for i in range(0, len(key_pieces)):
key_pieces[i] = misc.binary_encode(key_pieces[i])
namespace = b"".join(namespace_pieces)
key = self.KEY_PIECE_SEP.join(key_pieces)
@ -696,7 +694,7 @@ return cmsgpack.pack(result)
'already_claimed': self.SCRIPT_ALREADY_CLAIMED,
}
prepared_scripts = {}
for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES):
for n, raw_script_tpl in self.SCRIPT_TEMPLATES.items():
script_tpl = string.Template(raw_script_tpl)
script_blob = script_tpl.substitute(**script_params)
script = self._client.register_script(script_blob)
@ -761,7 +759,7 @@ return cmsgpack.pack(result)
})
with _translate_failures():
raw_posting = self._dumps(posting)
raw_job_uuid = six.b(job_uuid)
raw_job_uuid = job_uuid.encode('latin-1')
was_posted = bool(self._client.hsetnx(self.listings_key,
raw_job_uuid, raw_posting))
if not was_posted:
@ -813,7 +811,7 @@ return cmsgpack.pack(result)
with _translate_failures():
raw_postings = self._client.hgetall(self.listings_key)
postings = []
for raw_job_key, raw_posting in six.iteritems(raw_postings):
for raw_job_key, raw_posting in raw_postings.items():
try:
job_data = self._loads(raw_posting)
try:

View File

@ -30,7 +30,6 @@ from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from taskflow.conductors import base as c_base
from taskflow import exceptions as excp
@ -373,7 +372,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if ensure_fresh:
self._force_refresh()
with self._job_cond:
return sorted(six.itervalues(self._known_jobs))
return sorted(self._known_jobs.values())
def _force_refresh(self):
try:
@ -479,7 +478,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths = []
pending_removals = []
with self._job_cond:
for path in six.iterkeys(self._known_jobs):
for path in self._known_jobs.keys():
if path not in child_paths:
pending_removals.append(path)
for path in child_paths:

View File

@ -18,12 +18,12 @@
import abc
import collections
import contextlib
import functools
import time
import enum
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from taskflow import exceptions as excp
from taskflow import states
@ -105,8 +105,7 @@ class JobPriority(enum.Enum):
return tuple(values)
@six.add_metaclass(abc.ABCMeta)
class Job(object):
class Job(object, metaclass=abc.ABCMeta):
"""A abstraction that represents a named and trackable unit of work.
A job connects a logbook, a owner, a priority, last modified and created