diff --git a/taskflow/job.py b/taskflow/job.py index 589c39a1..0857b451 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -18,8 +18,6 @@ import abc import logging -import re -import types from taskflow import exceptions as exc from taskflow import states @@ -30,73 +28,6 @@ from taskflow.openstack.common import uuidutils LOG = logging.getLogger(__name__) -def _get_task_version(task): - """Gets a tasks *string* version, whether it is a task object/function.""" - task_version = utils.get_attr(task, 'version') - if isinstance(task_version, (list, tuple)): - task_version = utils.join(task_version, with_what=".") - if task_version is not None and not isinstance(task_version, basestring): - task_version = str(task_version) - return task_version - - -def _get_task_name(task): - """Gets a tasks *string* name, whether it is a task object/function.""" - task_name = "" - if isinstance(task, (types.MethodType, types.FunctionType)): - # If its a function look for the attributes that should have been - # set using the task() decorator provided in the decorators file. If - # those have not been set, then we should at least have enough basic - # information (not a version) to form a useful task name. - task_name = utils.get_attr(task, 'name') - if not task_name: - name_pieces = [a for a in utils.get_many_attr(task, - '__module__', - '__name__') - if a is not None] - task_name = utils.join(name_pieces, ".") - else: - task_name = str(task) - return task_name - - -def _is_version_compatible(version_1, version_2): - """Checks for major version compatibility of two *string" versions.""" - if version_1 == version_2: - # Equivalent exactly, so skip the rest. - return True - - def _convert_to_pieces(version): - try: - pieces = [] - for p in version.split("."): - p = p.strip() - if not len(p): - pieces.append(0) - continue - # Clean off things like 1alpha, or 2b and just select the - # digit that starts that entry instead. - p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p) - if p_match: - p = p_match.group(1) - pieces.append(int(p)) - except (AttributeError, TypeError, ValueError): - pieces = [] - return pieces - - version_1_pieces = _convert_to_pieces(version_1) - version_2_pieces = _convert_to_pieces(version_2) - if len(version_1_pieces) == 0 or len(version_2_pieces) == 0: - return False - - # Ensure major version compatibility to start. - major1 = version_1_pieces[0] - major2 = version_2_pieces[0] - if major1 != major2: - return False - return True - - class Claimer(object): """A base class for objects that can attempt to claim a given job, so that said job can be worked on.""" @@ -155,100 +86,6 @@ class Job(object): self._state = new_state # TODO(harlowja): add logbook info? - def _workflow_listener(self, state, details): - """Ensure that when we receive an event from said workflow that we - make sure a logbook entry exists for that flow.""" - flow = details['flow'] - if flow.name in self.logbook: - return - self.logbook.add_flow(flow.name) - - def _task_listener(self, state, details): - """Store the result of the task under the given flow in the log - book so that it can be retrieved later.""" - flow = details['flow'] - metadata = {} - flow_details = self.logbook[flow.name] - if state in (states.SUCCESS, states.FAILURE): - metadata['result'] = details['result'] - - task = details['task'] - name = _get_task_name(task) - if name not in flow_details: - metadata['states'] = [state] - metadata['version'] = _get_task_version(task) - flow_details.add_task(name, metadata) - else: - details = flow_details[name] - - # Warn about task versions possibly being incompatible - my_version = _get_task_version(task) - prev_version = details.metadata.get('version') - if not _is_version_compatible(my_version, prev_version): - LOG.warn("Updating a task with a different version than the" - " one being listened to (%s != %s)", - prev_version, my_version) - - past_states = details.metadata.get('states', []) - past_states.append(state) - details.metadata['states'] = past_states - details.metadata.update(metadata) - - def _task_result_fetcher(self, _context, flow, task, task_uuid): - flow_details = self.logbook[flow.name] - - # See if it completed before (or failed before) so that we can use its - # results instead of having to recompute it. - not_found = (False, False, None) - name = _get_task_name(task) - if name not in flow_details: - return not_found - - details = flow_details[name] - has_completed = False - was_failure = False - task_states = details.metadata.get('states', []) - for state in task_states: - if state in (states.SUCCESS, states.FAILURE): - if state == states.FAILURE: - was_failure = True - has_completed = True - break - - # Warn about task versions possibly being incompatible - my_version = _get_task_version(task) - prev_version = details.metadata.get('version') - if not _is_version_compatible(my_version, prev_version): - LOG.warn("Fetching task results from a task with a different" - " version from the one being requested (%s != %s)", - prev_version, my_version) - - if has_completed: - return (True, was_failure, details.metadata.get('result')) - - return not_found - - def associate(self, flow, parents=True): - """Attachs the needed resumption and state change tracking listeners - to the given workflow so that the workflow can be resumed/tracked - using the jobs components.""" - flow.task_notifier.register('*', self._task_listener) - flow.notifier.register('*', self._workflow_listener) - flow.result_fetcher = self._task_result_fetcher - if parents and flow.parents: - for p in flow.parents: - self.associate(p, parents) - - def disassociate(self, flow, parents=True): - """Detaches the needed resumption and state change tracking listeners - from the given workflow.""" - flow.notifier.deregister('*', self._workflow_listener) - flow.task_notifier.deregister('*', self._task_listener) - flow.result_fetcher = None - if parents and flow.parents: - for p in flow.parents: - self.disassociate(p, parents) - @property def logbook(self): """Fetches (or creates) a logbook entry for this job.""" @@ -271,24 +108,9 @@ class Job(object): self._change_state(states.CLAIMED) def run(self, flow, *args, **kwargs): - already_associated = [] - - def associate_all(a_flow): - if a_flow in already_associated: - return - # Associate with the flow. - self.associate(a_flow) - already_associated.append(a_flow) - # Ensure we are associated with all the flows parents. - if a_flow.parents: - for p in a_flow.parents: - associate_all(p) - if flow.state != states.PENDING: raise exc.InvalidStateException("Unable to run %s when in" " state %s" % (flow, flow.state)) - - associate_all(flow) return flow.run(self.context, *args, **kwargs) def unclaim(self): diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py index ccea3669..a20a7096 100644 --- a/taskflow/patterns/base.py +++ b/taskflow/patterns/base.py @@ -89,8 +89,8 @@ class Flow(object): def __str__(self): lines = ["Flow: %s" % (self.name)] - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (self.state)) + return "; ".join(lines) @abc.abstractmethod def add(self, task): diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 83a7433a..c9683cb4 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -50,6 +50,7 @@ class Flow(linear_flow.Flow): r = utils.Runner(task) self._graph.add_node(r, uuid=r.uuid) self._runners = [] + self._leftoff_at = None return r.uuid def _add_dependency(self, provider, requirer): @@ -58,11 +59,10 @@ class Flow(linear_flow.Flow): def __str__(self): lines = ["GraphFlow: %s" % (self.name)] - lines.append(" Number of tasks: %s" % (self._graph.number_of_nodes())) - lines.append(" Number of dependencies: %s" - % (self._graph.number_of_edges())) - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (self._graph.number_of_nodes())) + lines.append("%s" % (self._graph.number_of_edges())) + lines.append("%s" % (self.state)) + return "; ".join(lines) @decorators.locked def remove(self, task_uuid): @@ -76,10 +76,11 @@ class Flow(linear_flow.Flow): for r in remove_nodes: self._graph.remove_node(r) self._runners = [] + self._leftoff_at = None def _ordering(self): try: - return self._connect() + return iter(self._connect()) except g_exc.NetworkXUnfeasible: raise exc.InvalidStateException("Unable to correctly determine " "the path through the provided " diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 81228451..6ab08387 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -18,7 +18,6 @@ import collections import copy -import functools import logging from taskflow.openstack.common import excutils @@ -46,23 +45,17 @@ class Flow(base.Flow): # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. self._accumulator = utils.RollbackAccumulator() - # This should be a functor that returns whether a given task has - # already ran by returning a pair of (has_result, was_error, result). - # - # NOTE(harlowja): This allows for resumption by skipping tasks which - # have already occurred. The previous return value is needed due to - # the contract we have with tasks that they will be given the value - # they returned if reversion is triggered. - self.result_fetcher = None # Tasks results are stored here. Lookup is by the uuid that was # returned from the add function. self.results = {} - # The last index in the order we left off at before being - # interrupted (or failing). - self._left_off_at = 0 + # The previously left off iterator that can be used to resume from + # the last task (if interrupted and soft-reset). + self._leftoff_at = None # All runners to run are collected here. self._runners = [] self._connected = False + # The resumption strategy to use. + self.resumer = None @decorators.locked def add_many(self, tasks): @@ -78,6 +71,7 @@ class Flow(base.Flow): r = utils.Runner(task) r.runs_before = list(reversed(self._runners)) self._connected = False + self._leftoff_at = None self._runners.append(r) return r.uuid @@ -104,10 +98,9 @@ class Flow(base.Flow): def __str__(self): lines = ["LinearFlow: %s" % (self.name)] - lines.append(" Number of tasks: %s" % (len(self._runners))) - lines.append(" Last index: %s" % (self._left_off_at)) - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (len(self._runners))) + lines.append("%s" % (self.state)) + return "; ".join(lines) @decorators.locked def remove(self, task_uuid): @@ -116,6 +109,7 @@ class Flow(base.Flow): if r.uuid == task_uuid: self._runners.pop(i) self._connected = False + self._leftoff_at = None removed = True break if not removed: @@ -132,22 +126,26 @@ class Flow(base.Flow): return self._runners def _ordering(self): - return self._connect() + return iter(self._connect()) @decorators.locked def run(self, context, *args, **kwargs): super(Flow, self).run(context, *args, **kwargs) - if self.result_fetcher: - result_fetcher = functools.partial(self.result_fetcher, context) - else: - result_fetcher = None + def resume_it(): + if self._leftoff_at is not None: + return ([], self._leftoff_at) + if self.resumer: + (finished, leftover) = self.resumer.resume(self, + self._ordering()) + else: + finished = [] + leftover = self._ordering() + return (finished, leftover) self._change_state(context, states.STARTED) try: - run_order = self._ordering() - if self._left_off_at > 0: - run_order = run_order[self._left_off_at:] + those_finished, leftover = resume_it() except Exception: with excutils.save_and_reraise_exception(): self._change_state(context, states.FAILURE) @@ -169,6 +167,9 @@ class Flow(base.Flow): result = runner(context, *args, **kwargs) else: if failed: + # TODO(harlowja): make this configurable?? + # If we previously failed, we want to fail again at + # the same place. if not result: # If no exception or exception message was provided # or captured from the previous run then we need to @@ -196,8 +197,6 @@ class Flow(base.Flow): # intentionally). rb.result = result runner.result = result - # Alter the index we have ran at. - self._left_off_at += 1 self.results[runner.uuid] = copy.deepcopy(result) self.task_notifier.notify(states.SUCCESS, details={ 'context': context, @@ -207,7 +206,7 @@ class Flow(base.Flow): 'task_uuid': runner.uuid, }) except Exception as e: - cause = utils.FlowFailure(runner.task, self, e) + cause = utils.FlowFailure(runner, self, e) with excutils.save_and_reraise_exception(): # Notify any listeners that the task has errored. self.task_notifier.notify(states.FAILURE, details={ @@ -219,51 +218,41 @@ class Flow(base.Flow): }) self.rollback(context, cause) - # Ensure in a ready to run state. - for runner in run_order: - runner.reset() - - last_runner = 0 - was_interrupted = False - if result_fetcher: + if len(those_finished): self._change_state(context, states.RESUMING) - for (i, runner) in enumerate(run_order): - if self.state == states.INTERRUPTED: - was_interrupted = True - break - (has_result, was_error, result) = result_fetcher(self, - runner.task, - runner.uuid) - if not has_result: - break + for (r, details) in those_finished: # Fake running the task so that we trigger the same # notifications and state changes (and rollback that # would have happened in a normal flow). - last_runner = i + 1 - run_it(runner, failed=was_error, result=result, - simulate_run=True) + failed = states.FAILURE in details.get('states', []) + result = details.get('result') + run_it(r, failed=failed, result=result, simulate_run=True) - if was_interrupted: + self._leftoff_at = leftover + self._change_state(context, states.RUNNING) + if self.state == states.INTERRUPTED: return - self._change_state(context, states.RUNNING) - for runner in run_order[last_runner:]: + was_interrupted = False + for r in leftover: + r.reset() + run_it(r) if self.state == states.INTERRUPTED: was_interrupted = True break - run_it(runner) if not was_interrupted: # Only gets here if everything went successfully. self._change_state(context, states.SUCCESS) + self._leftoff_at = None @decorators.locked def reset(self): super(Flow, self).reset() self.results = {} - self.result_fetcher = None + self.resumer = None self._accumulator.reset() - self._left_off_at = 0 + self._leftoff_at = None self._connected = False @decorators.locked diff --git a/taskflow/patterns/resumption/__init__.py b/taskflow/patterns/resumption/__init__.py new file mode 100644 index 00000000..830dd2e7 --- /dev/null +++ b/taskflow/patterns/resumption/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py new file mode 100644 index 00000000..a77cd665 --- /dev/null +++ b/taskflow/patterns/resumption/logbook.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from taskflow import states +from taskflow import utils + +LOG = logging.getLogger(__name__) + + +class Resumption(object): + # NOTE(harlowja): This allows for resumption by skipping tasks which + # have already occurred, aka fast-forwarding through a workflow to + # the last point it stopped (if possible). + def __init__(self, logbook): + self._logbook = logbook + + def record_for(self, flow): + + def _task_listener(state, details): + """Store the result of the task under the given flow in the log + book so that it can be retrieved later.""" + task_id = details['task_uuid'] + task = details['task'] + flow = details['flow'] + LOG.debug("Recording %s:%s of %s has finished state %s", + utils.get_task_name(task), task_id, flow, state) + # TODO(harlowja): switch to using uuids + flow_id = flow.name + metadata = {} + flow_details = self._logbook[flow_id] + if state in (states.SUCCESS, states.FAILURE): + metadata['result'] = details['result'] + if task_id not in flow_details: + metadata['states'] = [state] + metadata['version'] = utils.get_task_version(task) + flow_details.add_task(task_id, metadata) + else: + details = flow_details[task_id] + immediate_version = utils.get_task_version(task) + recorded_version = details.metadata.get('version') + if recorded_version is not None: + if not utils.is_version_compatible(recorded_version, + immediate_version): + LOG.warn("Updating a task with a different version" + " than the one being listened to (%s != %s)", + recorded_version, immediate_version) + past_states = details.metadata.get('states', []) + if state not in past_states: + past_states.append(state) + details.metadata['states'] = past_states + if metadata: + details.metadata.update(metadata) + + def _workflow_listener(state, details): + """Ensure that when we receive an event from said workflow that we + make sure a logbook entry exists for that flow.""" + flow = details['flow'] + old_state = details['old_state'] + LOG.debug("%s has transitioned from %s to %s", flow, old_state, + state) + # TODO(harlowja): switch to using uuids + flow_id = flow.name + if flow_id in self._logbook: + return + self._logbook.add_flow(flow_id) + + flow.task_notifier.register('*', _task_listener) + flow.notifier.register('*', _workflow_listener) + + def _reconcile_versions(self, desired_version, task_details): + # For now don't do anything to reconcile the desired version + # from the actual version present in the task details, but in the + # future we could try to alter the task details to be in the older + # format (or more complicated logic...) + return task_details + + def _get_details(self, flow_details, runner): + task_id = runner.uuid + if task_id not in flow_details: + return (False, None) + details = flow_details[task_id] + has_completed = False + for state in details.metadata.get('states', []): + if state in (states.SUCCESS, states.FAILURE): + has_completed = True + break + if not has_completed: + return (False, None) + immediate_version = utils.get_task_version(runner.task) + recorded_version = details.metadata.get('version') + if recorded_version is not None: + if not utils.is_version_compatible(recorded_version, + immediate_version): + LOG.warn("Fetching runner metadata from a task with" + " a different version from the one being" + " processed (%s != %s)", recorded_version, + immediate_version) + details = self._reconcile_versions(immediate_version, details) + return (True, details) + + def resume(self, flow, ordering): + """Splits the initial ordering into two segments, the first which + has already completed (or errored) and the second which has not + completed or errored.""" + + # TODO(harlowja): switch to using uuids + flow_id = flow.name + if flow_id not in self._logbook: + LOG.debug("No record of %s", flow) + return ([], ordering) + flow_details = self._logbook[flow_id] + ran_already = [] + for r in ordering: + LOG.debug("Checking if ran %s of %s", r, flow) + (has_ran, details) = self._get_details(flow_details, r) + LOG.debug(has_ran) + if not has_ran: + # We need to put back the last task we took out since it did + # not run and therefore needs to, thats why we have this + # different iterator (which can do this). + return (ran_already, utils.LastFedIter(r, ordering)) + LOG.debug("Already ran %s", r) + ran_already.append((r, details.metadata)) + return (ran_already, iter([])) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index c60a8e83..1955fee3 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -22,7 +22,9 @@ from taskflow import decorators from taskflow import exceptions as exc from taskflow import states +from taskflow.backends import memory from taskflow.patterns import linear_flow as lw +from taskflow.patterns.resumption import logbook as lr from taskflow.tests import utils @@ -216,25 +218,11 @@ class LinearFlowTest(unittest2.TestCase): def test_interrupt_flow(self): wf = lw.Flow("the-int-action") - result_storage = {} - # If we interrupt we need to know how to resume so attach the needed # parts to do that... - - def result_fetcher(_ctx, _wf, task, task_uuid): - if task.name in result_storage: - return (True, False, result_storage.get(task.name)) - return (False, False, None) - - def task_listener(state, details): - if state not in (states.SUCCESS, states.FAILURE,): - return - task = details['task'] - if task.name not in result_storage: - result_storage[task.name] = details['result'] - - wf.result_fetcher = result_fetcher - wf.task_notifier.register('*', task_listener) + tracker = lr.Resumption(memory.MemoryLogBook()) + tracker.record_for(wf) + wf.resumer = tracker wf.add(self.make_reverting_task(1)) wf.add(self.make_interrupt_task(2, wf)) @@ -250,9 +238,8 @@ class LinearFlowTest(unittest2.TestCase): # And now reset and resume. wf.reset() - wf.result_fetcher = result_fetcher - wf.task_notifier.register('*', task_listener) - + tracker.record_for(wf) + wf.resumer = tracker self.assertEquals(states.PENDING, wf.state) wf.run(context) self.assertEquals(2, len(context)) diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index 9d5d93e2..6a1bcd5f 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -28,6 +28,7 @@ from taskflow import states from taskflow.backends import memory from taskflow.patterns import linear_flow as lw +from taskflow.patterns.resumption import logbook as lr from taskflow.tests import utils @@ -75,7 +76,9 @@ class MemoryBackendTest(unittest2.TestCase): wf = lw.Flow('dummy') for _i in range(0, 5): wf.add(utils.null_functor) - j.associate(wf) + tracker = lr.Resumption(j.logbook) + tracker.record_for(wf) + wf.resumer = tracker j.state = states.RUNNING wf.run(j.context) j.state = states.SUCCESS @@ -118,7 +121,10 @@ class MemoryBackendTest(unittest2.TestCase): self.assertEquals('me', j.owner) wf = lw.Flow("the-int-action") - j.associate(wf) + tracker = lr.Resumption(j.logbook) + tracker.record_for(wf) + wf.resumer = tracker + self.assertEquals(states.PENDING, wf.state) call_log = [] @@ -142,7 +148,6 @@ class MemoryBackendTest(unittest2.TestCase): wf.add(task_1) wf.add(task_1_5) # Interrupt it after task_1 finishes wf.add(task_2) - wf.run(j.context) self.assertEquals(1, len(j.logbook)) @@ -150,8 +155,9 @@ class MemoryBackendTest(unittest2.TestCase): self.assertEquals(1, len(call_log)) wf.reset() - j.associate(wf) self.assertEquals(states.PENDING, wf.state) + tracker.record_for(wf) + wf.resumer = tracker wf.run(j.context) self.assertEquals(1, len(j.logbook)) @@ -171,7 +177,9 @@ class MemoryBackendTest(unittest2.TestCase): wf = lw.Flow('the-line-action') self.assertEquals(states.PENDING, wf.state) - j.associate(wf) + tracker = lr.Resumption(j.logbook) + tracker.record_for(wf) + wf.resumer = tracker call_log = [] diff --git a/taskflow/utils.py b/taskflow/utils.py index bfe6ea63..e1332446 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -20,9 +20,11 @@ import collections import contextlib import copy import logging +import re import sys import threading import time +import types from taskflow.openstack.common import uuidutils @@ -52,6 +54,73 @@ def get_many_attr(obj, *attrs): return many +def get_task_version(task): + """Gets a tasks *string* version, whether it is a task object/function.""" + task_version = get_attr(task, 'version') + if isinstance(task_version, (list, tuple)): + task_version = join(task_version, with_what=".") + if task_version is not None and not isinstance(task_version, basestring): + task_version = str(task_version) + return task_version + + +def get_task_name(task): + """Gets a tasks *string* name, whether it is a task object/function.""" + task_name = "" + if isinstance(task, (types.MethodType, types.FunctionType)): + # If its a function look for the attributes that should have been + # set using the task() decorator provided in the decorators file. If + # those have not been set, then we should at least have enough basic + # information (not a version) to form a useful task name. + task_name = get_attr(task, 'name') + if not task_name: + name_pieces = [a for a in get_many_attr(task, + '__module__', + '__name__') + if a is not None] + task_name = join(name_pieces, ".") + else: + task_name = str(task) + return task_name + + +def is_version_compatible(version_1, version_2): + """Checks for major version compatibility of two *string" versions.""" + if version_1 == version_2: + # Equivalent exactly, so skip the rest. + return True + + def _convert_to_pieces(version): + try: + pieces = [] + for p in version.split("."): + p = p.strip() + if not len(p): + pieces.append(0) + continue + # Clean off things like 1alpha, or 2b and just select the + # digit that starts that entry instead. + p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p) + if p_match: + p = p_match.group(1) + pieces.append(int(p)) + except (AttributeError, TypeError, ValueError): + pieces = [] + return pieces + + version_1_pieces = _convert_to_pieces(version_1) + version_2_pieces = _convert_to_pieces(version_2) + if len(version_1_pieces) == 0 or len(version_2_pieces) == 0: + return False + + # Ensure major version compatibility to start. + major1 = version_1_pieces[0] + major2 = version_2_pieces[0] + if major1 != major2: + return False + return True + + def await(check_functor, timeout=None): if timeout is not None: end_time = time.time() + max(0, timeout) @@ -71,12 +140,26 @@ def await(check_functor, timeout=None): return True +class LastFedIter(object): + """An iterator which yields back the first item and then yields back + results from the provided iterator.""" + + def __init__(self, first, rest_itr): + self.first = first + self.rest_itr = rest_itr + + def __iter__(self): + yield self.first + for i in self.rest_itr: + yield i + + class FlowFailure(object): """When a task failure occurs the following object will be given to revert and can be used to interrogate what caused the failure.""" - def __init__(self, task, flow, exception): - self.task = task + def __init__(self, runner, flow, exception): + self.runner = runner self.flow = flow self.exc = exception self.exc_info = sys.exc_info() @@ -121,7 +204,7 @@ class Runner(object): self.result = None def __str__(self): - return "%s@%s" % (self.task, self.uuid) + return "%s:%s" % (self.task, self.uuid) def __call__(self, *args, **kwargs): # Find all of our inputs first.