 9478226141
			
		
	
	9478226141
	
	
	
		
			
			This allows those who were using it to still continue using it until 2.0 where it will be removed; this makes it possible for those users to get off that code in a way that will be easily do-able (without totally breaking there code-bases, until we do that in the 2.0 release). This also removes all internal usage of that stop watch class so that the library itself will not reference it anymore. Change-Id: If313d8e7b9bdc8741db2e2e1dfb381aa3260b971
		
			
				
	
	
		
			522 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			522 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| #    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import abc
 | |
| import collections
 | |
| import threading
 | |
| 
 | |
| import fasteners
 | |
| import futurist
 | |
| from oslo_utils import reflection
 | |
| from oslo_utils import timeutils
 | |
| import six
 | |
| 
 | |
| from taskflow.engines.action_engine import executor
 | |
| from taskflow import exceptions as excp
 | |
| from taskflow import logging
 | |
| from taskflow.types import failure as ft
 | |
| from taskflow.utils import schema_utils as su
 | |
| 
 | |
| # NOTE(skudriashev): This is protocol states and events, which are not
 | |
| # related to task states.
 | |
| WAITING = 'WAITING'
 | |
| PENDING = 'PENDING'
 | |
| RUNNING = 'RUNNING'
 | |
| SUCCESS = 'SUCCESS'
 | |
| FAILURE = 'FAILURE'
 | |
| EVENT = 'EVENT'
 | |
| 
 | |
| # During these states the expiry is active (once out of these states the expiry
 | |
| # no longer matters, since we have no way of knowing how long a task will run
 | |
| # for).
 | |
| WAITING_STATES = (WAITING, PENDING)
 | |
| 
 | |
| _ALL_STATES = (WAITING, PENDING, RUNNING, SUCCESS, FAILURE, EVENT)
 | |
| _STOP_TIMER_STATES = (RUNNING, SUCCESS, FAILURE)
 | |
| 
 | |
| # Transitions that a request state can go through.
 | |
| _ALLOWED_TRANSITIONS = (
 | |
|     # Used when a executor starts to publish a request to a selected worker.
 | |
|     (WAITING, PENDING),
 | |
|     # When a request expires (isn't able to be processed by any worker).
 | |
|     (WAITING, FAILURE),
 | |
|     # Worker has started executing a request.
 | |
|     (PENDING, RUNNING),
 | |
|     # Worker failed to construct/process a request to run (either the worker
 | |
|     # did not transition to RUNNING in the given timeout or the worker itself
 | |
|     # had some type of failure before RUNNING started).
 | |
|     #
 | |
|     # Also used by the executor if the request was attempted to be published
 | |
|     # but that did publishing process did not work out.
 | |
|     (PENDING, FAILURE),
 | |
|     # Execution failed due to some type of remote failure.
 | |
|     (RUNNING, FAILURE),
 | |
|     # Execution succeeded & has completed.
 | |
|     (RUNNING, SUCCESS),
 | |
| )
 | |
| 
 | |
| # Remote task actions.
 | |
| EXECUTE = 'execute'
 | |
| REVERT = 'revert'
 | |
| 
 | |
| # Remote task action to event map.
 | |
| ACTION_TO_EVENT = {
 | |
|     EXECUTE: executor.EXECUTED,
 | |
|     REVERT: executor.REVERTED
 | |
| }
 | |
| 
 | |
| # NOTE(skudriashev): A timeout which specifies request expiration period.
 | |
| REQUEST_TIMEOUT = 60
 | |
| 
 | |
| # NOTE(skudriashev): A timeout which controls for how long a queue can be
 | |
| # unused before it is automatically deleted. Unused means the queue has no
 | |
| # consumers, the queue has not been redeclared, the `queue.get` has not been
 | |
| # invoked for a duration of at least the expiration period. In our case this
 | |
| # period is equal to the request timeout, once request is expired - queue is
 | |
| # no longer needed.
 | |
| QUEUE_EXPIRE_TIMEOUT = REQUEST_TIMEOUT
 | |
| 
 | |
| # Workers notify period.
 | |
| NOTIFY_PERIOD = 5
 | |
| 
 | |
| # Message types.
 | |
| NOTIFY = 'NOTIFY'
 | |
| REQUEST = 'REQUEST'
 | |
| RESPONSE = 'RESPONSE'
 | |
| 
 | |
| LOG = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| @six.add_metaclass(abc.ABCMeta)
 | |
| class Message(object):
 | |
|     """Base class for all message types."""
 | |
| 
 | |
|     def __str__(self):
 | |
|         cls_name = reflection.get_class_name(self, fully_qualified=False)
 | |
|         return "<%s> %s" % (cls_name, self.to_dict())
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def to_dict(self):
 | |
|         """Return json-serializable message representation."""
 | |
| 
 | |
| 
 | |
| class Notify(Message):
 | |
|     """Represents notify message type."""
 | |
| 
 | |
|     #: String constant representing this message type.
 | |
|     TYPE = NOTIFY
 | |
| 
 | |
|     # NOTE(harlowja): the executor (the entity who initially requests a worker
 | |
|     # to send back a notification response) schema is different than the
 | |
|     # worker response schema (that's why there are two schemas here).
 | |
| 
 | |
|     #: Expected notify *response* message schema (in json schema format).
 | |
|     RESPONSE_SCHEMA = {
 | |
|         "type": "object",
 | |
|         'properties': {
 | |
|             'topic': {
 | |
|                 "type": "string",
 | |
|             },
 | |
|             'tasks': {
 | |
|                 "type": "array",
 | |
|                 "items": {
 | |
|                     "type": "string",
 | |
|                 },
 | |
|             }
 | |
|         },
 | |
|         "required": ["topic", 'tasks'],
 | |
|         "additionalProperties": False,
 | |
|     }
 | |
| 
 | |
|     #: Expected *sender* request message schema (in json schema format).
 | |
|     SENDER_SCHEMA = {
 | |
|         "type": "object",
 | |
|         "additionalProperties": False,
 | |
|     }
 | |
| 
 | |
|     def __init__(self, **data):
 | |
|         self._data = data
 | |
| 
 | |
|     def to_dict(self):
 | |
|         return self._data
 | |
| 
 | |
|     @classmethod
 | |
|     def validate(cls, data, response):
 | |
|         if response:
 | |
|             schema = cls.RESPONSE_SCHEMA
 | |
|         else:
 | |
|             schema = cls.SENDER_SCHEMA
 | |
|         try:
 | |
|             su.schema_validate(data, schema)
 | |
|         except su.ValidationError as e:
 | |
|             cls_name = reflection.get_class_name(cls, fully_qualified=False)
 | |
|             if response:
 | |
|                 excp.raise_with_cause(excp.InvalidFormat,
 | |
|                                       "%s message response data not of the"
 | |
|                                       " expected format: %s" % (cls_name,
 | |
|                                                                 e.message),
 | |
|                                       cause=e)
 | |
|             else:
 | |
|                 excp.raise_with_cause(excp.InvalidFormat,
 | |
|                                       "%s message sender data not of the"
 | |
|                                       " expected format: %s" % (cls_name,
 | |
|                                                                 e.message),
 | |
|                                       cause=e)
 | |
| 
 | |
| 
 | |
| _WorkUnit = collections.namedtuple('_WorkUnit', ['task_cls', 'task_name',
 | |
|                                                  'action', 'arguments'])
 | |
| 
 | |
| 
 | |
| class Request(Message):
 | |
|     """Represents request with execution results.
 | |
| 
 | |
|     Every request is created in the WAITING state and is expired within the
 | |
|     given timeout if it does not transition out of the (WAITING, PENDING)
 | |
|     states.
 | |
|     """
 | |
| 
 | |
|     #: String constant representing this message type.
 | |
|     TYPE = REQUEST
 | |
| 
 | |
|     #: Expected message schema (in json schema format).
 | |
|     SCHEMA = {
 | |
|         "type": "object",
 | |
|         'properties': {
 | |
|             # These two are typically only sent on revert actions (that is
 | |
|             # why are are not including them in the required section).
 | |
|             'result': {},
 | |
|             'failures': {
 | |
|                 "type": "object",
 | |
|             },
 | |
|             'task_cls': {
 | |
|                 'type': 'string',
 | |
|             },
 | |
|             'task_name': {
 | |
|                 'type': 'string',
 | |
|             },
 | |
|             'task_version': {
 | |
|                 "oneOf": [
 | |
|                     {
 | |
|                         "type": "string",
 | |
|                     },
 | |
|                     {
 | |
|                         "type": "array",
 | |
|                     },
 | |
|                 ],
 | |
|             },
 | |
|             'action': {
 | |
|                 "type": "string",
 | |
|                 "enum": list(six.iterkeys(ACTION_TO_EVENT)),
 | |
|             },
 | |
|             # Keyword arguments that end up in the revert() or execute()
 | |
|             # method of the remote task.
 | |
|             'arguments': {
 | |
|                 "type": "object",
 | |
|             },
 | |
|         },
 | |
|         'required': ['task_cls', 'task_name', 'task_version', 'action'],
 | |
|     }
 | |
| 
 | |
|     def __init__(self, task, uuid, action, arguments, timeout, **kwargs):
 | |
|         self._task = task
 | |
|         self._uuid = uuid
 | |
|         self._action = action
 | |
|         self._event = ACTION_TO_EVENT[action]
 | |
|         self._arguments = arguments
 | |
|         self._kwargs = kwargs
 | |
|         self._watch = timeutils.StopWatch(duration=timeout).start()
 | |
|         self._state = WAITING
 | |
|         self._lock = threading.Lock()
 | |
|         self._created_on = timeutils.utcnow()
 | |
|         self._result = futurist.Future()
 | |
|         self._result.atom = task
 | |
|         self._notifier = task.notifier
 | |
| 
 | |
|     @property
 | |
|     def result(self):
 | |
|         return self._result
 | |
| 
 | |
|     @property
 | |
|     def notifier(self):
 | |
|         return self._notifier
 | |
| 
 | |
|     @property
 | |
|     def uuid(self):
 | |
|         return self._uuid
 | |
| 
 | |
|     @property
 | |
|     def task(self):
 | |
|         return self._task
 | |
| 
 | |
|     @property
 | |
|     def state(self):
 | |
|         return self._state
 | |
| 
 | |
|     @property
 | |
|     def created_on(self):
 | |
|         return self._created_on
 | |
| 
 | |
|     @property
 | |
|     def expired(self):
 | |
|         """Check if request has expired.
 | |
| 
 | |
|         When new request is created its state is set to the WAITING, creation
 | |
|         time is stored and timeout is given via constructor arguments.
 | |
| 
 | |
|         Request is considered to be expired when it is in the WAITING/PENDING
 | |
|         state for more then the given timeout (it is not considered to be
 | |
|         expired in any other state).
 | |
|         """
 | |
|         if self._state in WAITING_STATES:
 | |
|             return self._watch.expired()
 | |
|         return False
 | |
| 
 | |
|     def to_dict(self):
 | |
|         """Return json-serializable request.
 | |
| 
 | |
|         To convert requests that have failed due to some exception this will
 | |
|         convert all `failure.Failure` objects into dictionaries (which will
 | |
|         then be reconstituted by the receiver).
 | |
|         """
 | |
|         request = {
 | |
|             'task_cls': reflection.get_class_name(self._task),
 | |
|             'task_name': self._task.name,
 | |
|             'task_version': self._task.version,
 | |
|             'action': self._action,
 | |
|             'arguments': self._arguments,
 | |
|         }
 | |
|         if 'result' in self._kwargs:
 | |
|             result = self._kwargs['result']
 | |
|             if isinstance(result, ft.Failure):
 | |
|                 request['result'] = ('failure', result.to_dict())
 | |
|             else:
 | |
|                 request['result'] = ('success', result)
 | |
|         if 'failures' in self._kwargs:
 | |
|             failures = self._kwargs['failures']
 | |
|             request['failures'] = {}
 | |
|             for task, failure in six.iteritems(failures):
 | |
|                 request['failures'][task] = failure.to_dict()
 | |
|         return request
 | |
| 
 | |
|     def set_result(self, result):
 | |
|         self.result.set_result((self._event, result))
 | |
| 
 | |
|     def transition_and_log_error(self, new_state, logger=None):
 | |
|         """Transitions *and* logs an error if that transitioning raises.
 | |
| 
 | |
|         This overlays the transition function and performs nearly the same
 | |
|         functionality but instead of raising if the transition was not valid
 | |
|         it logs a warning to the provided logger and returns False to
 | |
|         indicate that the transition was not performed (note that this
 | |
|         is *different* from the transition function where False means
 | |
|         ignored).
 | |
|         """
 | |
|         if logger is None:
 | |
|             logger = LOG
 | |
|         moved = False
 | |
|         try:
 | |
|             moved = self.transition(new_state)
 | |
|         except excp.InvalidState:
 | |
|             logger.warn("Failed to transition '%s' to %s state.", self,
 | |
|                         new_state, exc_info=True)
 | |
|         return moved
 | |
| 
 | |
|     @fasteners.locked
 | |
|     def transition(self, new_state):
 | |
|         """Transitions the request to a new state.
 | |
| 
 | |
|         If transition was performed, it returns True. If transition
 | |
|         should was ignored, it returns False. If transition was not
 | |
|         valid (and will not be performed), it raises an InvalidState
 | |
|         exception.
 | |
|         """
 | |
|         old_state = self._state
 | |
|         if old_state == new_state:
 | |
|             return False
 | |
|         pair = (old_state, new_state)
 | |
|         if pair not in _ALLOWED_TRANSITIONS:
 | |
|             raise excp.InvalidState("Request transition from %s to %s is"
 | |
|                                     " not allowed" % pair)
 | |
|         if new_state in _STOP_TIMER_STATES:
 | |
|             self._watch.stop()
 | |
|         self._state = new_state
 | |
|         LOG.debug("Transitioned '%s' from %s state to %s state", self,
 | |
|                   old_state, new_state)
 | |
|         return True
 | |
| 
 | |
|     @classmethod
 | |
|     def validate(cls, data):
 | |
|         try:
 | |
|             su.schema_validate(data, cls.SCHEMA)
 | |
|         except su.ValidationError as e:
 | |
|             cls_name = reflection.get_class_name(cls, fully_qualified=False)
 | |
|             excp.raise_with_cause(excp.InvalidFormat,
 | |
|                                   "%s message response data not of the"
 | |
|                                   " expected format: %s" % (cls_name,
 | |
|                                                             e.message),
 | |
|                                   cause=e)
 | |
|         else:
 | |
|             # Validate all failure dictionaries that *may* be present...
 | |
|             failures = []
 | |
|             if 'failures' in data:
 | |
|                 failures.extend(six.itervalues(data['failures']))
 | |
|             result = data.get('result')
 | |
|             if result is not None:
 | |
|                 result_data_type, result_data = result
 | |
|                 if result_data_type == 'failure':
 | |
|                     failures.append(result_data)
 | |
|             for fail_data in failures:
 | |
|                 ft.Failure.validate(fail_data)
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_dict(data, task_uuid=None):
 | |
|         """Parses **validated** data into a work unit.
 | |
| 
 | |
|         All :py:class:`~taskflow.types.failure.Failure` objects that have been
 | |
|         converted to dict(s) on the remote side will now converted back
 | |
|         to py:class:`~taskflow.types.failure.Failure` objects.
 | |
|         """
 | |
|         task_cls = data['task_cls']
 | |
|         task_name = data['task_name']
 | |
|         action = data['action']
 | |
|         arguments = data.get('arguments', {})
 | |
|         result = data.get('result')
 | |
|         failures = data.get('failures')
 | |
|         # These arguments will eventually be given to the task executor
 | |
|         # so they need to be in a format it will accept (and using keyword
 | |
|         # argument names that it accepts)...
 | |
|         arguments = {
 | |
|             'arguments': arguments,
 | |
|         }
 | |
|         if task_uuid is not None:
 | |
|             arguments['task_uuid'] = task_uuid
 | |
|         if result is not None:
 | |
|             result_data_type, result_data = result
 | |
|             if result_data_type == 'failure':
 | |
|                 arguments['result'] = ft.Failure.from_dict(result_data)
 | |
|             else:
 | |
|                 arguments['result'] = result_data
 | |
|         if failures is not None:
 | |
|             arguments['failures'] = {}
 | |
|             for task, fail_data in six.iteritems(failures):
 | |
|                 arguments['failures'][task] = ft.Failure.from_dict(fail_data)
 | |
|         return _WorkUnit(task_cls, task_name, action, arguments)
 | |
| 
 | |
| 
 | |
| class Response(Message):
 | |
|     """Represents response message type."""
 | |
| 
 | |
|     #: String constant representing this message type.
 | |
|     TYPE = RESPONSE
 | |
| 
 | |
|     #: Expected message schema (in json schema format).
 | |
|     SCHEMA = {
 | |
|         "type": "object",
 | |
|         'properties': {
 | |
|             'state': {
 | |
|                 "type": "string",
 | |
|                 "enum": list(_ALL_STATES),
 | |
|             },
 | |
|             'data': {
 | |
|                 "anyOf": [
 | |
|                     {
 | |
|                         "$ref": "#/definitions/event",
 | |
|                     },
 | |
|                     {
 | |
|                         "$ref": "#/definitions/completion",
 | |
|                     },
 | |
|                     {
 | |
|                         "$ref": "#/definitions/empty",
 | |
|                     },
 | |
|                 ],
 | |
|             },
 | |
|         },
 | |
|         "required": ["state", 'data'],
 | |
|         "additionalProperties": False,
 | |
|         "definitions": {
 | |
|             "event": {
 | |
|                 "type": "object",
 | |
|                 "properties": {
 | |
|                     'event_type': {
 | |
|                         'type': 'string',
 | |
|                     },
 | |
|                     'details': {
 | |
|                         'type': 'object',
 | |
|                     },
 | |
|                 },
 | |
|                 "required": ["event_type", 'details'],
 | |
|                 "additionalProperties": False,
 | |
|             },
 | |
|             # Used when sending *only* request state changes (and no data is
 | |
|             # expected).
 | |
|             "empty": {
 | |
|                 "type": "object",
 | |
|                 "additionalProperties": False,
 | |
|             },
 | |
|             "completion": {
 | |
|                 "type": "object",
 | |
|                 "properties": {
 | |
|                     # This can be any arbitrary type that a task returns, so
 | |
|                     # thats why we can't be strict about what type it is since
 | |
|                     # any of the json serializable types are allowed.
 | |
|                     "result": {},
 | |
|                 },
 | |
|                 "required": ["result"],
 | |
|                 "additionalProperties": False,
 | |
|             },
 | |
|         },
 | |
|     }
 | |
| 
 | |
|     def __init__(self, state, **data):
 | |
|         self._state = state
 | |
|         self._data = data
 | |
| 
 | |
|     @classmethod
 | |
|     def from_dict(cls, data):
 | |
|         state = data['state']
 | |
|         data = data['data']
 | |
|         if state == FAILURE and 'result' in data:
 | |
|             data['result'] = ft.Failure.from_dict(data['result'])
 | |
|         return cls(state, **data)
 | |
| 
 | |
|     @property
 | |
|     def state(self):
 | |
|         return self._state
 | |
| 
 | |
|     @property
 | |
|     def data(self):
 | |
|         return self._data
 | |
| 
 | |
|     def to_dict(self):
 | |
|         return dict(state=self._state, data=self._data)
 | |
| 
 | |
|     @classmethod
 | |
|     def validate(cls, data):
 | |
|         try:
 | |
|             su.schema_validate(data, cls.SCHEMA)
 | |
|         except su.ValidationError as e:
 | |
|             cls_name = reflection.get_class_name(cls, fully_qualified=False)
 | |
|             excp.raise_with_cause(excp.InvalidFormat,
 | |
|                                   "%s message response data not of the"
 | |
|                                   " expected format: %s" % (cls_name,
 | |
|                                                             e.message),
 | |
|                                   cause=e)
 | |
|         else:
 | |
|             state = data['state']
 | |
|             if state == FAILURE and 'result' in data:
 | |
|                 ft.Failure.validate(data['result'])
 |