Merge "Use a automaton machine for WBE request state machine"

This commit is contained in:
Jenkins
2016-04-28 20:29:37 +00:00
committed by Gerrit Code Review
3 changed files with 92 additions and 40 deletions

View File

@@ -153,7 +153,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
else:
if request.expired:
expired_requests[request_uuid] = request
elif request.state == pr.WAITING:
elif request.current_state == pr.WAITING:
waiting_requests[request_uuid] = request
if expired_requests:
with self._ongoing_requests_lock:

View File

@@ -18,6 +18,8 @@ import abc
import collections
import threading
from automaton import exceptions as machine_excp
from automaton import machines
import fasteners
import futurist
from oslo_serialization import jsonutils
@@ -45,29 +47,9 @@ EVENT = 'EVENT'
# for).
WAITING_STATES = (WAITING, PENDING)
_ALL_STATES = (WAITING, PENDING, RUNNING, SUCCESS, FAILURE, EVENT)
_STOP_TIMER_STATES = (RUNNING, SUCCESS, FAILURE)
# Transitions that a request state can go through.
_ALLOWED_TRANSITIONS = (
# Used when a executor starts to publish a request to a selected worker.
(WAITING, PENDING),
# When a request expires (isn't able to be processed by any worker).
(WAITING, FAILURE),
# Worker has started executing a request.
(PENDING, RUNNING),
# Worker failed to construct/process a request to run (either the worker
# did not transition to RUNNING in the given timeout or the worker itself
# had some type of failure before RUNNING started).
#
# Also used by the executor if the request was attempted to be published
# but that did publishing process did not work out.
(PENDING, FAILURE),
# Execution failed due to some type of remote failure.
(RUNNING, FAILURE),
# Execution succeeded & has completed.
(RUNNING, SUCCESS),
)
# Once these states have been entered a request can no longer be
# automatically expired.
STOP_TIMER_STATES = (RUNNING, SUCCESS, FAILURE)
# Remote task actions.
EXECUTE = 'execute'
@@ -108,7 +90,54 @@ NO_RESULT = object()
LOG = logging.getLogger(__name__)
def make_an_event(new_state):
"""Turns a new/target state into an event name."""
return ('on_%s' % new_state).lower()
def build_a_machine(freeze=True):
"""Builds a state machine that requests are allowed to go through."""
m = machines.FiniteMachine()
for st in (WAITING, PENDING, RUNNING):
m.add_state(st)
for st in (SUCCESS, FAILURE):
m.add_state(st, terminal=True)
# When a executor starts to publish a request to a selected worker but the
# executor has not recved confirmation from that worker that anything has
# happened yet.
m.default_start_state = WAITING
m.add_transition(WAITING, PENDING, make_an_event(PENDING))
# When a request expires (isn't able to be processed by any worker).
m.add_transition(WAITING, FAILURE, make_an_event(FAILURE))
# Worker has started executing a request.
m.add_transition(PENDING, RUNNING, make_an_event(RUNNING))
# Worker failed to construct/process a request to run (either the worker
# did not transition to RUNNING in the given timeout or the worker itself
# had some type of failure before RUNNING started).
#
# Also used by the executor if the request was attempted to be published
# but that did publishing process did not work out.
m.add_transition(PENDING, FAILURE, make_an_event(FAILURE))
# Execution failed due to some type of remote failure.
m.add_transition(RUNNING, FAILURE, make_an_event(FAILURE))
# Execution succeeded & has completed.
m.add_transition(RUNNING, SUCCESS, make_an_event(SUCCESS))
# No further changes allowed.
if freeze:
m.freeze()
return m
def failure_to_dict(failure):
"""Attempts to convert a failure object into a jsonifyable dictionary."""
failure_dict = failure.to_dict()
try:
# it's possible the exc_args can't be serialized as JSON
@@ -215,6 +244,21 @@ class Request(Message):
Every request is created in the WAITING state and is expired within the
given timeout if it does not transition out of the (WAITING, PENDING)
states.
State machine a request goes through as it progresses (or expires)::
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] | . | . | . | . |
| PENDING | on_failure | FAILURE | . | . |
| PENDING | on_running | RUNNING | . | . |
| RUNNING | on_failure | FAILURE | . | . |
| RUNNING | on_success | SUCCESS | . | . |
| SUCCESS[$] | . | . | . | . |
| WAITING[^] | on_failure | FAILURE | . | . |
| WAITING[^] | on_pending | PENDING | . | . |
+------------+------------+---------+----------+---------+
"""
#: String constant representing this message type.
@@ -269,13 +313,19 @@ class Request(Message):
self._failures = failures
self._watch = timeutils.StopWatch(duration=timeout).start()
self._lock = threading.Lock()
self.state = WAITING
self._machine = build_a_machine()
self._machine.initialize()
self.task = task
self.uuid = uuid
self.created_on = timeutils.now()
self.future = futurist.Future()
self.future.atom = task
@property
def current_state(self):
"""Current state the request is in."""
return self._machine.current_state
def set_result(self, result):
"""Sets the responses futures result."""
self.future.set_result((self._event, result))
@@ -291,7 +341,7 @@ class Request(Message):
state for more then the given timeout (it is not considered to be
expired in any other state).
"""
if self.state in WAITING_STATES:
if self._machine.current_state in WAITING_STATES:
return self._watch.expired()
return False
@@ -346,23 +396,25 @@ class Request(Message):
"""Transitions the request to a new state.
If transition was performed, it returns True. If transition
should was ignored, it returns False. If transition was not
was ignored, it returns False. If transition was not
valid (and will not be performed), it raises an InvalidState
exception.
"""
old_state = self.state
old_state = self._machine.current_state
if old_state == new_state:
return False
pair = (old_state, new_state)
if pair not in _ALLOWED_TRANSITIONS:
try:
self._machine.process_event(make_an_event(new_state))
except (machine_excp.NotFound, machine_excp.InvalidState) as e:
raise excp.InvalidState("Request transition from %s to %s is"
" not allowed" % pair)
if new_state in _STOP_TIMER_STATES:
self._watch.stop()
self.state = new_state
LOG.debug("Transitioned '%s' from %s state to %s state", self,
old_state, new_state)
return True
" not allowed: %s" % (old_state,
new_state, e))
else:
if new_state in STOP_TIMER_STATES:
self._watch.stop()
LOG.debug("Transitioned '%s' from %s state to %s state", self,
old_state, new_state)
return True
@classmethod
def validate(cls, data):
@@ -435,7 +487,7 @@ class Response(Message):
'properties': {
'state': {
"type": "string",
"enum": list(_ALL_STATES),
"enum": list(build_a_machine().states) + [EVENT],
},
'data': {
"anyOf": [

View File

@@ -121,8 +121,8 @@ class TestProtocol(test.TestCase):
def test_request_transitions(self):
request = self.request()
self.assertEqual(pr.WAITING, request.state)
self.assertIn(request.state, pr.WAITING_STATES)
self.assertEqual(pr.WAITING, request.current_state)
self.assertIn(request.current_state, pr.WAITING_STATES)
self.assertRaises(excp.InvalidState, request.transition, pr.SUCCESS)
self.assertFalse(request.transition(pr.WAITING))
self.assertTrue(request.transition(pr.PENDING))