diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst index 0bab644c..7db76341 100644 --- a/doc/source/persistence.rst +++ b/doc/source/persistence.rst @@ -283,6 +283,10 @@ Interfaces .. automodule:: taskflow.persistence.backends .. automodule:: taskflow.persistence.base + +Models +====== + .. automodule:: taskflow.persistence.logbook Implementations diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index db830b95..c7770feb 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -30,6 +30,9 @@ from taskflow.types import failure as ft LOG = logging.getLogger(__name__) +# Internal helpers... + + def _copy_function(deep_copy): if deep_copy: return copy.deepcopy @@ -64,20 +67,27 @@ def _fix_meta(data): class LogBook(object): - """A container of flow details, a name and associated metadata. + """A collection of flow details and associated metadata. Typically this class contains a collection of flow detail entries for a given engine (or job) so that those entities can track what 'work' has been completed for resumption, reverting and miscellaneous tracking purposes. - The data contained within this class need *not* be backed by the backend - storage in real time. The data in this class will only be guaranteed to be - persisted when a save occurs via some backend connection. + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save occurs via some backend + connection. - NOTE(harlowja): the naming of this class is analogous to a ships log or a - similar type of record used in detailing work that been completed (or work - that has not been completed). + NOTE(harlowja): the naming of this class is analogous to a ship's log or a + similar type of record used in detailing work that has been completed (or + work that has not been completed). + + :ivar created_at: A ``datetime.datetime`` object of when this logbook + was created. + :ivar updated_at: A ``datetime.datetime`` object of when this logbook + was last updated at. + :ivar meta: A dictionary of meta-data associated with this logbook. """ def __init__(self, name, uuid=None): if uuid: @@ -91,7 +101,11 @@ class LogBook(object): self.meta = {} def add(self, fd): - """Adds a new entry to the underlying logbook. + """Adds a new flow detail into this logbook. + + NOTE(harlowja): if an existing flow detail exists with the same + uuid the existing one will be overwritten with the newly provided + one. Does not *guarantee* that the details will be immediately saved. """ @@ -99,12 +113,29 @@ class LogBook(object): self.updated_at = timeutils.utcnow() def find(self, flow_uuid): + """Locate the flow detail corresponding to the given uuid. + + :returns: the flow detail with that uuid + :rtype: :py:class:`.FlowDetail` (or ``None`` if not found) + """ return self._flowdetails_by_id.get(flow_uuid, None) def merge(self, lb, deep_copy=False): """Merges the current object state with the given ones state. - NOTE(harlowja): Does not merge the flow details contained in either. + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Also note that this does **not** merge + the flow details contained in either. + + :returns: this logbook (freshly merged with the incoming object) + :rtype: :py:class:`.LogBook` """ if lb is self: return self @@ -118,26 +149,35 @@ class LogBook(object): return self def to_dict(self, marshal_time=False): - """Translates the internal state of this object to a dictionary. + """Translates the internal state of this object to a ``dict``. - NOTE(harlowja): Does not include the contained flow details. + NOTE(harlowja): The returned ``dict`` does **not** include any + contained flow details. + + :returns: this logbook in ``dict`` form """ if not marshal_time: marshal_fn = lambda x: x else: marshal_fn = _safe_marshal_time - data = { + return { 'name': self.name, 'meta': self.meta, 'uuid': self.uuid, 'updated_at': marshal_fn(self.updated_at), 'created_at': marshal_fn(self.created_at), } - return data @classmethod def from_dict(cls, data, unmarshal_time=False): - """Translates the given dictionary into an instance of this class.""" + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new logbook + :rtype: :py:class:`.LogBook` + """ if not unmarshal_time: unmarshal_fn = lambda x: x else: @@ -150,10 +190,12 @@ class LogBook(object): @property def uuid(self): + """The unique identifer of this logbook.""" return self._uuid @property def name(self): + """The name of this logbook.""" return self._name def __iter__(self): @@ -164,7 +206,19 @@ class LogBook(object): return len(self._flowdetails_by_id) def copy(self, retain_contents=True): - """Copies/clones this log book.""" + """Copies this logbook. + + Creates a shallow copy of this logbook. If this logbook contains + flow details and ``retain_contents`` is truthy (the default) then + the flow details container will be shallow copied (the flow details + contained there-in will **not** be copied). If ``retain_contents`` is + falsey then the copied logbook will have **no** contained flow + details (but it will have the rest of the local objects attributes + copied). + + :returns: a new logbook + :rtype: :py:class:`.LogBook` + """ clone = copy.copy(self) if not retain_contents: clone._flowdetails_by_id = {} @@ -176,15 +230,19 @@ class LogBook(object): class FlowDetail(object): - """A container of atom details, a name and associated metadata. + """A collection of atom details and associated metadata. Typically this class contains a collection of atom detail entries that represent the atoms in a given flow structure (along with any other needed metadata relevant to that flow). - The data contained within this class need *not* be backed by the backend - storage in real time. The data in this class will only be guaranteed to be - persisted when a save/update occurs via some backend connection. + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save (or update) occurs via some backend + connection. + + :ivar state: The state of the flow associated with this flow detail. + :ivar meta: A dictionary of meta-data associated with this flow detail. """ def __init__(self, name, uuid): self._uuid = uuid @@ -194,7 +252,18 @@ class FlowDetail(object): self.meta = {} def update(self, fd): - """Updates the objects state to be the same as the given one.""" + """Updates the objects state to be the same as the given one. + + This will assign the private and public attributes of the given + flow detail directly to this object (replacing any existing + attributes in this object; even if they are the **same**). + + NOTE(harlowja): If the provided object is this object itself + then **no** update is done. + + :returns: this flow detail + :rtype: :py:class:`.FlowDetail` + """ if fd is self: return self self._atomdetails_by_id = fd._atomdetails_by_id @@ -203,9 +272,21 @@ class FlowDetail(object): return self def merge(self, fd, deep_copy=False): - """Merges the current object state with the given ones state. + """Merges the current object state with the given one's state. - NOTE(harlowja): Does not merge the atom details contained in either. + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Also this does **not** merge the atom + details contained in either. + + :returns: this flow detail (freshly merged with the incoming object) + :rtype: :py:class:`.FlowDetail` """ if fd is self: return self @@ -218,7 +299,19 @@ class FlowDetail(object): return self def copy(self, retain_contents=True): - """Copies/clones this flow detail.""" + """Copies this flow detail. + + Creates a shallow copy of this flow detail. If this detail contains + flow details and ``retain_contents`` is truthy (the default) then + the atom details container will be shallow copied (the atom details + contained there-in will **not** be copied). If ``retain_contents`` is + falsey then the copied flow detail will have **no** contained atom + details (but it will have the rest of the local objects attributes + copied). + + :returns: a new flow detail + :rtype: :py:class:`.FlowDetail` + """ clone = copy.copy(self) if not retain_contents: clone._atomdetails_by_id = {} @@ -229,9 +322,12 @@ class FlowDetail(object): return clone def to_dict(self): - """Translates the internal state of this object to a dictionary. + """Translates the internal state of this object to a ``dict``. - NOTE(harlowja): Does not include the contained atom details. + NOTE(harlowja): The returned ``dict`` does **not** include any + contained atom details. + + :returns: this flow detail in ``dict`` form """ return { 'name': self.name, @@ -242,24 +338,46 @@ class FlowDetail(object): @classmethod def from_dict(cls, data): - """Translates the given data into an instance of this class.""" + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new flow detail + :rtype: :py:class:`.FlowDetail` + """ obj = cls(data['name'], data['uuid']) obj.state = data.get('state') obj.meta = _fix_meta(data) return obj def add(self, ad): + """Adds a new atom detail into this flow detail. + + NOTE(harlowja): if an existing atom detail exists with the same + uuid the existing one will be overwritten with the newly provided + one. + + Does not *guarantee* that the details will be immediately saved. + """ self._atomdetails_by_id[ad.uuid] = ad def find(self, ad_uuid): + """Locate the atom detail corresponding to the given uuid. + + :returns: the atom detail with that uuid + :rtype: :py:class:`.AtomDetail` (or ``None`` if not found) + """ return self._atomdetails_by_id.get(ad_uuid) @property def uuid(self): + """The unique identifer of this flow detail.""" return self._uuid @property def name(self): + """The name of this flow detail.""" return self._name def __iter__(self): @@ -272,39 +390,48 @@ class FlowDetail(object): @six.add_metaclass(abc.ABCMeta) class AtomDetail(object): - """A base container of atom specific runtime information and metadata. + """A collection of atom specific runtime information and metadata. - This is a base class that contains attributes that are used to connect - a atom to the persistence layer during, after, or before it is running - including any results it may have produced, any state that it may be - in (failed for example), any exception that occurred when running and any - associated stacktrace that may have occurring during that exception being - thrown and any other metadata that should be stored along-side the details - about the connected atom. + This is a base **abstract** class that contains attributes that are used + to connect a atom to the persistence layer before, during, or after it is + running. It includes any results it may have produced, any state that it + may be in (for example ``FAILURE``), any exception that occurred when + running, and any associated stacktrace that may have occurring during an + exception being thrown. It may also contain any other metadata that + should also be stored along-side the details about the connected atom. - The data contained within this class need *not* backed by the backend - storage in real time. The data in this class will only be guaranteed to be - persisted when a save/update occurs via some backend connection. + The data contained within this class need **not** be persisted to the + backend storage in real time. The data in this class will only be + guaranteed to be persisted when a save (or update) occurs via some backend + connection. + + :ivar state: The state of the atom associated with this atom detail. + :ivar intention: The execution strategy of the atom associated + with this atom detail (used by an engine/others to + determine if the associated atom needs to be + executed, reverted, retried and so-on). + :ivar meta: A dictionary of meta-data associated with this atom detail. + :ivar version: A version tuple or string that represents the + atom version this atom detail is associated with (typically + used for introspection and any data migration + strategies). + :ivar results: Any results the atom produced from either its + ``execute`` method or from other sources. + :ivar failure: If the atom failed (possibly due to its ``execute`` + method raising) this will be a + :py:class:`~taskflow.types.failure.Failure` object that + represents that failure (if there was no failure this + will be set to none). """ + def __init__(self, name, uuid): self._uuid = uuid self._name = name - # TODO(harlowja): decide if these should be passed in and therefore - # immutable or let them be assigned? - # - # The state the atom was last in. self.state = None - # The intention of action that would be applied to the atom. self.intention = states.EXECUTE - # The results it may have produced (useful for reverting). self.results = None - # An Failure object that holds exception the atom may have thrown - # (or part of it), useful for knowing what failed. self.failure = None self.meta = {} - # The version of the atom this atom details was associated with which - # is quite useful for determining what versions of atoms this detail - # information can be associated with. self.version = None @property @@ -318,7 +445,18 @@ class AtomDetail(object): return self.results def update(self, ad): - """Updates the objects state to be the same as the given one.""" + """Updates the object's state to be the same as the given one. + + This will assign the private and public attributes of the given + atom detail directly to this object (replacing any existing + attributes in this object; even if they are the **same**). + + NOTE(harlowja): If the provided object is this object itself + then **no** update is done. + + :returns: this atom detail + :rtype: :py:class:`.AtomDetail` + """ if ad is self: return self self.state = ad.state @@ -331,7 +469,24 @@ class AtomDetail(object): @abc.abstractmethod def merge(self, other, deep_copy=False): - """Merges the current object state with the given ones state.""" + """Merges the current object state with the given ones state. + + If ``deep_copy`` is provided as truthy then the + local object will use ``copy.deepcopy`` to replace this objects + local attributes with the provided objects attributes (**only** if + there is a difference between this objects attributes and the + provided attributes). If ``deep_copy`` is falsey (the default) then a + reference copy will occur instead when a difference is detected. + + NOTE(harlowja): If the provided object is this object itself + then **no** merging is done. Do note that **no** results are merged + in this method. That operation **must** to be the responsibilty of + subclasses to implement and override this abstract method + and provide that merging themselves as they see fit. + + :returns: this atom detail (freshly merged with the incoming object) + :rtype: :py:class:`.AtomDetail` + """ copy_fn = _copy_function(deep_copy) # NOTE(imelnikov): states and intentions are just strings, # so there is no need to copy them (strings are immutable in python). @@ -353,15 +508,15 @@ class AtomDetail(object): self.version = copy_fn(other.version) return self - @abc.abstractmethod - def to_dict(self): - """Translates the internal state of this object to a dictionary.""" - @abc.abstractmethod def put(self, state, result): """Puts a result (acquired in the given state) into this detail.""" - def _to_dict_shared(self): + def to_dict(self): + """Translates the internal state of this object to a ``dict``. + + :returns: this atom detail in ``dict`` form + """ if self.failure: failure = self.failure.to_dict() else: @@ -377,42 +532,75 @@ class AtomDetail(object): 'uuid': self.uuid, } - def _from_dict_shared(self, data): - self.state = data.get('state') - self.intention = data.get('intention') - self.results = data.get('results') - self.version = data.get('version') - self.meta = _fix_meta(data) + @classmethod + def from_dict(cls, data): + """Translates the given ``dict`` into an instance of this class. + + NOTE(harlowja): the ``dict`` provided should come from a prior + call to :meth:`.to_dict`. + + :returns: a new atom detail + :rtype: :py:class:`.AtomDetail` + """ + obj = cls(data['name'], data['uuid']) + obj.state = data.get('state') + obj.intention = data.get('intention') + obj.results = data.get('results') + obj.version = data.get('version') + obj.meta = _fix_meta(data) failure = data.get('failure') if failure: - self.failure = ft.Failure.from_dict(failure) + obj.failure = ft.Failure.from_dict(failure) + return obj @property def uuid(self): + """The unique identifer of this atom detail.""" return self._uuid @property def name(self): + """The name of this atom detail.""" return self._name @abc.abstractmethod def reset(self, state): - """Resets detail results and failures.""" + """Resets this atom detail and sets ``state`` attribute value.""" + + @abc.abstractmethod + def copy(self): + """Copies this atom detail.""" class TaskDetail(AtomDetail): - """This class represents a task detail for flow task object.""" + """A task detail (an atom detail typically associated with a |tt| atom). - def __init__(self, name, uuid): - super(TaskDetail, self).__init__(name, uuid) + .. |tt| replace:: :py:class:`~taskflow.task.BaseTask` + """ def reset(self, state): + """Resets this task detail and sets ``state`` attribute value. + + This sets any previously set ``results`` and ``failure`` attributes + back to ``None`` and sets the state to the provided one, as well as + setting this task details ``intention`` attribute to ``EXECUTE``. + """ self.results = None self.failure = None self.state = state self.intention = states.EXECUTE def put(self, state, result): + """Puts a result (acquired in the given state) into this detail. + + If the result is a :py:class:`~taskflow.types.failure.Failure` object + then the ``failure`` attribute will be set (and the ``results`` + attribute will be set to ``None``); if the result is not a + :py:class:`~taskflow.types.failure.Failure` object then the + ``results`` attribute will be set (and the ``failure`` attribute + will be set to ``None``). In either case the ``state`` + attribute will be set to the provided state. + """ self.state = state if _was_failure(state, result): self.failure = result @@ -421,38 +609,52 @@ class TaskDetail(AtomDetail): self.results = result self.failure = None - @classmethod - def from_dict(cls, data): - """Translates the given data into an instance of this class.""" - obj = cls(data['name'], data['uuid']) - obj._from_dict_shared(data) - return obj - - def to_dict(self): - """Translates the internal state of this object to a dictionary.""" - return self._to_dict_shared() - def merge(self, other, deep_copy=False): - """Merges the current object state with the given ones state.""" + """Merges the current task detail with the given one. + + NOTE(harlowja): This merge does **not** copy and replace + the ``results`` attribute if it differs. Instead the current + objects ``results`` attribute directly becomes (via assignment) the + other objects ``results`` attribute. Also note that if the provided + object is this object itself then **no** merging is done. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if this is copied at a deeper level (for example by + using ``copy.deepcopy`` or by using ``copy.copy``). + + :returns: this task detail (freshly merged with the incoming object) + :rtype: :py:class:`.TaskDetail` + """ if not isinstance(other, TaskDetail): raise exc.NotImplementedError("Can only merge with other" " task details") if other is self: return self super(TaskDetail, self).merge(other, deep_copy=deep_copy) - copy_fn = _copy_function(deep_copy) if self.results != other.results: - self.results = copy_fn(other.results) + self.results = other.results return self def copy(self): - """Copies/clones this task detail.""" + """Copies this task detail. + + Creates a shallow copy of this task detail (any meta-data and + version information that this object maintains is shallow + copied via ``copy.copy``). + + NOTE(harlowja): This copy does **not** perform ``copy.copy`` on + the ``results`` attribute of this object (before assigning to the + copy). Instead the current objects ``results`` attribute directly + becomes (via assignment) the copied objects ``results`` attribute. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if this is copied at a deeper level (for example by + using ``copy.deepcopy`` or by using ``copy.copy``). + + :returns: a new task detail + :rtype: :py:class:`.TaskDetail` + """ clone = copy.copy(self) - # Just directly assign to the clone (do **not** copy). - # - # See: https://bugs.launchpad.net/taskflow/+bug/1452978 for - # what happens if this is cloned/copied (even using copy.copy to - # try to do a shallow copy). clone.results = self.results if self.meta: clone.meta = self.meta.copy() @@ -462,19 +664,53 @@ class TaskDetail(AtomDetail): class RetryDetail(AtomDetail): - """This class represents a retry detail for retry controller object.""" + """A retry detail (an atom detail typically associated with a |rt| atom). + + .. |rt| replace:: :py:class:`~taskflow.retry.Retry` + """ + def __init__(self, name, uuid): super(RetryDetail, self).__init__(name, uuid) self.results = [] def reset(self, state): + """Resets this retry detail and sets ``state`` attribute value. + + This sets any previously added ``results`` back to an empty list + and resets the ``failure`` attribute back to ``None`` and sets the + state to the provided one, as well as setting this atom + details ``intention`` attribute to ``EXECUTE``. + """ self.results = [] self.failure = None self.state = state self.intention = states.EXECUTE def copy(self): - """Copies/clones this retry detail.""" + """Copies this retry detail. + + Creates a shallow copy of this retry detail (any meta-data and + version information that this object maintains is shallow + copied via ``copy.copy``). + + NOTE(harlowja): This copy does **not** copy + the incoming objects ``results`` attribute. Instead this + objects ``results`` attribute list is iterated over and a new list + is constructed with each ``(data, failures)`` element in that list + having its ``failures`` (a dictionary of each named + :py:class:`~taskflow.types.failure.Failure` object that + occured) copied but its ``data`` is left untouched. After + this is done that new list becomes (via assignment) the cloned + objects ``results`` attribute. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if the ``data`` in ``results`` is copied at a + deeper level (for example by using ``copy.deepcopy`` or by + using ``copy.copy``). + + :returns: a new retry detail + :rtype: :py:class:`.RetryDetail` + """ clone = copy.copy(self) results = [] # NOTE(imelnikov): we can't just deep copy Failures, as they @@ -493,6 +729,7 @@ class RetryDetail(AtomDetail): @property def last_results(self): + """The last result that was produced.""" try: return self.results[-1][0] except IndexError as e: @@ -500,12 +737,30 @@ class RetryDetail(AtomDetail): @property def last_failures(self): + """The last failure dictionary that was produced. + + NOTE(harlowja): This is **not** the same as the + local ``failure`` attribute as the obtained failure dictionary in + the ``results`` attribute (which is what this returns) is from + associated atom failures (which is different from the directly + related failure of the retry unit associated with this + atom detail). + """ try: return self.results[-1][1] except IndexError as e: raise exc.NotFound("Last failures not found", e) def put(self, state, result): + """Puts a result (acquired in the given state) into this detail. + + If the result is a :py:class:`~taskflow.types.failure.Failure` object + then the ``failure`` attribute will be set; if the result is not a + :py:class:`~taskflow.types.failure.Failure` object then the + ``results`` attribute will be appended to (and the ``failure`` + attribute will be set to ``None``). In either case the ``state`` + attribute will be set to the provided state. + """ # Do not clean retry history (only on reset does this happen). self.state = state if _was_failure(state, result): @@ -516,7 +771,7 @@ class RetryDetail(AtomDetail): @classmethod def from_dict(cls, data): - """Translates the given data into an instance of this class.""" + """Translates the given ``dict`` into an instance of this class.""" def decode_results(results): if not results: @@ -529,13 +784,12 @@ class RetryDetail(AtomDetail): new_results.append((data, new_failures)) return new_results - obj = cls(data['name'], data['uuid']) - obj._from_dict_shared(data) + obj = super(RetryDetail, cls).from_dict(data) obj.results = decode_results(obj.results) return obj def to_dict(self): - """Translates the internal state of this object to a dictionary.""" + """Translates the internal state of this object to a ``dict``.""" def encode_results(results): if not results: @@ -548,12 +802,33 @@ class RetryDetail(AtomDetail): new_results.append((data, new_failures)) return new_results - base = self._to_dict_shared() + base = super(RetryDetail, self).to_dict() base['results'] = encode_results(base.get('results')) return base def merge(self, other, deep_copy=False): - """Merges the current object state with the given ones state.""" + """Merges the current retry detail with the given one. + + NOTE(harlowja): This merge does **not** deep copy + the incoming objects ``results`` attribute (if it differs). Instead + the incoming objects ``results`` attribute list is **always** iterated + over and a new list is constructed with + each ``(data, failures)`` element in that list having + its ``failures`` (a dictionary of each named + :py:class:`~taskflow.types.failure.Failure` objects that + occurred) copied but its ``data`` is left untouched. After + this is done that new list becomes (via assignment) this + objects ``results`` attribute. Also note that if the provided object + is this object itself then **no** merging is done. + + See: https://bugs.launchpad.net/taskflow/+bug/1452978 for + what happens if the ``data`` in ``results`` is copied at a + deeper level (for example by using ``copy.deepcopy`` or by + using ``copy.copy``). + + :returns: this retry detail (freshly merged with the incoming object) + :rtype: :py:class:`.RetryDetail` + """ if not isinstance(other, RetryDetail): raise exc.NotImplementedError("Can only merge with other" " retry details")