diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 70c448bf..02150378 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -153,7 +153,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): else: if request.expired: expired_requests[request_uuid] = request - elif request.state == pr.WAITING: + elif request.current_state == pr.WAITING: waiting_requests[request_uuid] = request if expired_requests: with self._ongoing_requests_lock: diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 1784ab3a..78991ffd 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -18,6 +18,8 @@ import abc import collections import threading +from automaton import exceptions as machine_excp +from automaton import machines import fasteners import futurist from oslo_serialization import jsonutils @@ -45,29 +47,9 @@ EVENT = 'EVENT' # for). WAITING_STATES = (WAITING, PENDING) -_ALL_STATES = (WAITING, PENDING, RUNNING, SUCCESS, FAILURE, EVENT) -_STOP_TIMER_STATES = (RUNNING, SUCCESS, FAILURE) - -# Transitions that a request state can go through. -_ALLOWED_TRANSITIONS = ( - # Used when a executor starts to publish a request to a selected worker. - (WAITING, PENDING), - # When a request expires (isn't able to be processed by any worker). - (WAITING, FAILURE), - # Worker has started executing a request. - (PENDING, RUNNING), - # Worker failed to construct/process a request to run (either the worker - # did not transition to RUNNING in the given timeout or the worker itself - # had some type of failure before RUNNING started). - # - # Also used by the executor if the request was attempted to be published - # but that did publishing process did not work out. - (PENDING, FAILURE), - # Execution failed due to some type of remote failure. - (RUNNING, FAILURE), - # Execution succeeded & has completed. - (RUNNING, SUCCESS), -) +# Once these states have been entered a request can no longer be +# automatically expired. +STOP_TIMER_STATES = (RUNNING, SUCCESS, FAILURE) # Remote task actions. EXECUTE = 'execute' @@ -108,7 +90,54 @@ NO_RESULT = object() LOG = logging.getLogger(__name__) +def make_an_event(new_state): + """Turns a new/target state into an event name.""" + return ('on_%s' % new_state).lower() + + +def build_a_machine(freeze=True): + """Builds a state machine that requests are allowed to go through.""" + + m = machines.FiniteMachine() + for st in (WAITING, PENDING, RUNNING): + m.add_state(st) + for st in (SUCCESS, FAILURE): + m.add_state(st, terminal=True) + + # When a executor starts to publish a request to a selected worker but the + # executor has not recved confirmation from that worker that anything has + # happened yet. + m.default_start_state = WAITING + m.add_transition(WAITING, PENDING, make_an_event(PENDING)) + + # When a request expires (isn't able to be processed by any worker). + m.add_transition(WAITING, FAILURE, make_an_event(FAILURE)) + + # Worker has started executing a request. + m.add_transition(PENDING, RUNNING, make_an_event(RUNNING)) + + # Worker failed to construct/process a request to run (either the worker + # did not transition to RUNNING in the given timeout or the worker itself + # had some type of failure before RUNNING started). + # + # Also used by the executor if the request was attempted to be published + # but that did publishing process did not work out. + m.add_transition(PENDING, FAILURE, make_an_event(FAILURE)) + + # Execution failed due to some type of remote failure. + m.add_transition(RUNNING, FAILURE, make_an_event(FAILURE)) + + # Execution succeeded & has completed. + m.add_transition(RUNNING, SUCCESS, make_an_event(SUCCESS)) + + # No further changes allowed. + if freeze: + m.freeze() + return m + + def failure_to_dict(failure): + """Attempts to convert a failure object into a jsonifyable dictionary.""" failure_dict = failure.to_dict() try: # it's possible the exc_args can't be serialized as JSON @@ -215,6 +244,21 @@ class Request(Message): Every request is created in the WAITING state and is expired within the given timeout if it does not transition out of the (WAITING, PENDING) states. + + State machine a request goes through as it progresses (or expires):: + + +------------+------------+---------+----------+---------+ + | Start | Event | End | On Enter | On Exit | + +------------+------------+---------+----------+---------+ + | FAILURE[$] | . | . | . | . | + | PENDING | on_failure | FAILURE | . | . | + | PENDING | on_running | RUNNING | . | . | + | RUNNING | on_failure | FAILURE | . | . | + | RUNNING | on_success | SUCCESS | . | . | + | SUCCESS[$] | . | . | . | . | + | WAITING[^] | on_failure | FAILURE | . | . | + | WAITING[^] | on_pending | PENDING | . | . | + +------------+------------+---------+----------+---------+ """ #: String constant representing this message type. @@ -269,13 +313,19 @@ class Request(Message): self._failures = failures self._watch = timeutils.StopWatch(duration=timeout).start() self._lock = threading.Lock() - self.state = WAITING + self._machine = build_a_machine() + self._machine.initialize() self.task = task self.uuid = uuid self.created_on = timeutils.now() self.future = futurist.Future() self.future.atom = task + @property + def current_state(self): + """Current state the request is in.""" + return self._machine.current_state + def set_result(self, result): """Sets the responses futures result.""" self.future.set_result((self._event, result)) @@ -291,7 +341,7 @@ class Request(Message): state for more then the given timeout (it is not considered to be expired in any other state). """ - if self.state in WAITING_STATES: + if self._machine.current_state in WAITING_STATES: return self._watch.expired() return False @@ -346,23 +396,25 @@ class Request(Message): """Transitions the request to a new state. If transition was performed, it returns True. If transition - should was ignored, it returns False. If transition was not + was ignored, it returns False. If transition was not valid (and will not be performed), it raises an InvalidState exception. """ - old_state = self.state + old_state = self._machine.current_state if old_state == new_state: return False - pair = (old_state, new_state) - if pair not in _ALLOWED_TRANSITIONS: + try: + self._machine.process_event(make_an_event(new_state)) + except (machine_excp.NotFound, machine_excp.InvalidState) as e: raise excp.InvalidState("Request transition from %s to %s is" - " not allowed" % pair) - if new_state in _STOP_TIMER_STATES: - self._watch.stop() - self.state = new_state - LOG.debug("Transitioned '%s' from %s state to %s state", self, - old_state, new_state) - return True + " not allowed: %s" % (old_state, + new_state, e)) + else: + if new_state in STOP_TIMER_STATES: + self._watch.stop() + LOG.debug("Transitioned '%s' from %s state to %s state", self, + old_state, new_state) + return True @classmethod def validate(cls, data): @@ -435,7 +487,7 @@ class Response(Message): 'properties': { 'state': { "type": "string", - "enum": list(_ALL_STATES), + "enum": list(build_a_machine().states) + [EVENT], }, 'data': { "anyOf": [ diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index ebd67432..86a7e898 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -121,8 +121,8 @@ class TestProtocol(test.TestCase): def test_request_transitions(self): request = self.request() - self.assertEqual(pr.WAITING, request.state) - self.assertIn(request.state, pr.WAITING_STATES) + self.assertEqual(pr.WAITING, request.current_state) + self.assertIn(request.current_state, pr.WAITING_STATES) self.assertRaises(excp.InvalidState, request.transition, pr.SUCCESS) self.assertFalse(request.transition(pr.WAITING)) self.assertTrue(request.transition(pr.PENDING))