Merge "Rework the in-memory backend"
This commit is contained in:
commit
3fed19dd0d
@ -15,16 +15,97 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from taskflow import exceptions as exc
|
from taskflow import exceptions as exc
|
||||||
from taskflow import logging
|
from taskflow import logging
|
||||||
from taskflow.persistence.backends import base
|
from taskflow.persistence.backends import base
|
||||||
from taskflow.persistence import logbook
|
from taskflow.persistence import logbook
|
||||||
|
from taskflow.utils import lock_utils
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class _Memory(object):
|
||||||
|
"""Where the data is really stored."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.log_books = {}
|
||||||
|
self.flow_details = {}
|
||||||
|
self.atom_details = {}
|
||||||
|
|
||||||
|
def clear_all(self):
|
||||||
|
self.log_books.clear()
|
||||||
|
self.flow_details.clear()
|
||||||
|
self.atom_details.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class _MemoryHelper(object):
|
||||||
|
"""Helper functionality for the memory backends & connections."""
|
||||||
|
|
||||||
|
def __init__(self, memory):
|
||||||
|
self._memory = memory
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _fetch_clone_args(incoming):
|
||||||
|
if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)):
|
||||||
|
# We keep our own copy of the added contents of the following
|
||||||
|
# types so we don't need the clone to retain them directly...
|
||||||
|
return {
|
||||||
|
'retain_contents': False,
|
||||||
|
}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def construct(self, uuid, container):
|
||||||
|
"""Reconstructs a object from the given uuid and storage container."""
|
||||||
|
source = container[uuid]
|
||||||
|
clone_kwargs = self._fetch_clone_args(source)
|
||||||
|
clone = source['object'].copy(**clone_kwargs)
|
||||||
|
rebuilder = source.get('rebuilder')
|
||||||
|
if rebuilder:
|
||||||
|
for component in map(rebuilder, source['components']):
|
||||||
|
clone.add(component)
|
||||||
|
return clone
|
||||||
|
|
||||||
|
def merge(self, incoming, saved_info=None):
|
||||||
|
"""Merges the incoming object into the local memories copy."""
|
||||||
|
if saved_info is None:
|
||||||
|
if isinstance(incoming, logbook.LogBook):
|
||||||
|
saved_info = self._memory.log_books.setdefault(
|
||||||
|
incoming.uuid, {})
|
||||||
|
elif isinstance(incoming, logbook.FlowDetail):
|
||||||
|
saved_info = self._memory.flow_details.setdefault(
|
||||||
|
incoming.uuid, {})
|
||||||
|
elif isinstance(incoming, logbook.AtomDetail):
|
||||||
|
saved_info = self._memory.atom_details.setdefault(
|
||||||
|
incoming.uuid, {})
|
||||||
|
else:
|
||||||
|
raise TypeError("Unknown how to merge type '%s'"
|
||||||
|
% type(incoming))
|
||||||
|
try:
|
||||||
|
saved_info['object'].merge(incoming)
|
||||||
|
except KeyError:
|
||||||
|
clone_kwargs = self._fetch_clone_args(incoming)
|
||||||
|
saved_info['object'] = incoming.copy(**clone_kwargs)
|
||||||
|
if isinstance(incoming, logbook.LogBook):
|
||||||
|
flow_details = saved_info.setdefault('components', set())
|
||||||
|
if 'rebuilder' not in saved_info:
|
||||||
|
saved_info['rebuilder'] = functools.partial(
|
||||||
|
self.construct, container=self._memory.flow_details)
|
||||||
|
for flow_detail in incoming:
|
||||||
|
flow_details.add(self.merge(flow_detail))
|
||||||
|
elif isinstance(incoming, logbook.FlowDetail):
|
||||||
|
atom_details = saved_info.setdefault('components', set())
|
||||||
|
if 'rebuilder' not in saved_info:
|
||||||
|
saved_info['rebuilder'] = functools.partial(
|
||||||
|
self.construct, container=self._memory.atom_details)
|
||||||
|
for atom_detail in incoming:
|
||||||
|
atom_details.add(self.merge(atom_detail))
|
||||||
|
return incoming.uuid
|
||||||
|
|
||||||
|
|
||||||
class MemoryBackend(base.Backend):
|
class MemoryBackend(base.Backend):
|
||||||
"""A in-memory (non-persistent) backend.
|
"""A in-memory (non-persistent) backend.
|
||||||
|
|
||||||
@ -33,21 +114,28 @@ class MemoryBackend(base.Backend):
|
|||||||
"""
|
"""
|
||||||
def __init__(self, conf=None):
|
def __init__(self, conf=None):
|
||||||
super(MemoryBackend, self).__init__(conf)
|
super(MemoryBackend, self).__init__(conf)
|
||||||
self._log_books = {}
|
self._memory = _Memory()
|
||||||
self._flow_details = {}
|
self._helper = _MemoryHelper(self._memory)
|
||||||
self._atom_details = {}
|
self._lock = lock_utils.ReaderWriterLock()
|
||||||
|
|
||||||
|
def _construct_from(self, container):
|
||||||
|
return dict((uuid, self._helper.construct(uuid, container))
|
||||||
|
for uuid in six.iterkeys(container))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def log_books(self):
|
def log_books(self):
|
||||||
return self._log_books
|
with self._lock.read_lock():
|
||||||
|
return self._construct_from(self._memory.log_books)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def flow_details(self):
|
def flow_details(self):
|
||||||
return self._flow_details
|
with self._lock.read_lock():
|
||||||
|
return self._construct_from(self._memory.flow_details)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def atom_details(self):
|
def atom_details(self):
|
||||||
return self._atom_details
|
with self._lock.read_lock():
|
||||||
|
return self._construct_from(self._memory.atom_details)
|
||||||
|
|
||||||
def get_connection(self):
|
def get_connection(self):
|
||||||
return Connection(self)
|
return Connection(self)
|
||||||
@ -57,8 +145,13 @@ class MemoryBackend(base.Backend):
|
|||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
class Connection(base.Connection):
|
||||||
|
"""A connection to an in-memory backend."""
|
||||||
|
|
||||||
def __init__(self, backend):
|
def __init__(self, backend):
|
||||||
self._backend = backend
|
self._backend = backend
|
||||||
|
self._helper = backend._helper
|
||||||
|
self._memory = backend._memory
|
||||||
|
self._lock = backend._lock
|
||||||
|
|
||||||
def upgrade(self):
|
def upgrade(self):
|
||||||
pass
|
pass
|
||||||
@ -74,78 +167,70 @@ class Connection(base.Connection):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def clear_all(self):
|
def clear_all(self):
|
||||||
count = 0
|
with self._lock.write_lock():
|
||||||
for book_uuid in list(six.iterkeys(self.backend.log_books)):
|
self._memory.clear_all()
|
||||||
self.destroy_logbook(book_uuid)
|
|
||||||
count += 1
|
|
||||||
return count
|
|
||||||
|
|
||||||
def destroy_logbook(self, book_uuid):
|
def destroy_logbook(self, book_uuid):
|
||||||
|
with self._lock.write_lock():
|
||||||
try:
|
try:
|
||||||
# Do the same cascading delete that the sql layer does.
|
# Do the same cascading delete that the sql layer does.
|
||||||
lb = self.backend.log_books.pop(book_uuid)
|
book_info = self._memory.log_books.pop(book_uuid)
|
||||||
for fd in lb:
|
|
||||||
self.backend.flow_details.pop(fd.uuid, None)
|
|
||||||
for ad in fd:
|
|
||||||
self.backend.atom_details.pop(ad.uuid, None)
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
|
raise exc.NotFound("No logbook found with uuid '%s'"
|
||||||
|
% book_uuid)
|
||||||
|
else:
|
||||||
|
while book_info['components']:
|
||||||
|
flow_uuid = book_info['components'].pop()
|
||||||
|
flow_info = self._memory.flow_details.pop(flow_uuid)
|
||||||
|
while flow_info['components']:
|
||||||
|
atom_uuid = flow_info['components'].pop()
|
||||||
|
self._memory.atom_details.pop(atom_uuid)
|
||||||
|
|
||||||
def update_atom_details(self, atom_detail):
|
def update_atom_details(self, atom_detail):
|
||||||
|
with self._lock.write_lock():
|
||||||
try:
|
try:
|
||||||
e_ad = self.backend.atom_details[atom_detail.uuid]
|
atom_info = self._memory.atom_details[atom_detail.uuid]
|
||||||
|
return self._helper.construct(
|
||||||
|
self._helper.merge(atom_detail, saved_info=atom_info),
|
||||||
|
self._memory.atom_details)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise exc.NotFound("No atom details found with id: %s"
|
raise exc.NotFound("No atom details found with uuid '%s'"
|
||||||
% atom_detail.uuid)
|
% atom_detail.uuid)
|
||||||
return e_ad.merge(atom_detail, deep_copy=True)
|
|
||||||
|
|
||||||
def _save_flowdetail_atoms(self, e_fd, flow_detail):
|
|
||||||
for atom_detail in flow_detail:
|
|
||||||
e_ad = e_fd.find(atom_detail.uuid)
|
|
||||||
if e_ad is None:
|
|
||||||
e_fd.add(atom_detail)
|
|
||||||
self.backend.atom_details[atom_detail.uuid] = atom_detail
|
|
||||||
else:
|
|
||||||
e_ad.merge(atom_detail, deep_copy=True)
|
|
||||||
|
|
||||||
def update_flow_details(self, flow_detail):
|
def update_flow_details(self, flow_detail):
|
||||||
|
with self._lock.write_lock():
|
||||||
try:
|
try:
|
||||||
e_fd = self.backend.flow_details[flow_detail.uuid]
|
flow_info = self._memory.flow_details[flow_detail.uuid]
|
||||||
|
return self._helper.construct(
|
||||||
|
self._helper.merge(flow_detail, saved_info=flow_info),
|
||||||
|
self._memory.flow_details)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise exc.NotFound("No flow details found with id: %s"
|
raise exc.NotFound("No flow details found with uuid '%s'"
|
||||||
% flow_detail.uuid)
|
% flow_detail.uuid)
|
||||||
e_fd.merge(flow_detail, deep_copy=True)
|
|
||||||
self._save_flowdetail_atoms(e_fd, flow_detail)
|
|
||||||
return e_fd
|
|
||||||
|
|
||||||
def save_logbook(self, book):
|
def save_logbook(self, book):
|
||||||
# Get a existing logbook model (or create it if it isn't there).
|
with self._lock.write_lock():
|
||||||
try:
|
return self._helper.construct(self._helper.merge(book),
|
||||||
e_lb = self.backend.log_books[book.uuid]
|
self._memory.log_books)
|
||||||
except KeyError:
|
|
||||||
e_lb = logbook.LogBook(book.name, uuid=book.uuid)
|
|
||||||
self.backend.log_books[e_lb.uuid] = e_lb
|
|
||||||
|
|
||||||
e_lb.merge(book, deep_copy=True)
|
|
||||||
# Add anything in to the new logbook that isn't already in the existing
|
|
||||||
# logbook.
|
|
||||||
for flow_detail in book:
|
|
||||||
try:
|
|
||||||
e_fd = self.backend.flow_details[flow_detail.uuid]
|
|
||||||
except KeyError:
|
|
||||||
e_fd = logbook.FlowDetail(flow_detail.name, flow_detail.uuid)
|
|
||||||
e_lb.add(e_fd)
|
|
||||||
self.backend.flow_details[e_fd.uuid] = e_fd
|
|
||||||
e_fd.merge(flow_detail, deep_copy=True)
|
|
||||||
self._save_flowdetail_atoms(e_fd, flow_detail)
|
|
||||||
return e_lb
|
|
||||||
|
|
||||||
def get_logbook(self, book_uuid):
|
def get_logbook(self, book_uuid):
|
||||||
|
with self._lock.read_lock():
|
||||||
try:
|
try:
|
||||||
return self.backend.log_books[book_uuid]
|
return self._helper.construct(book_uuid,
|
||||||
|
self._memory.log_books)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
|
raise exc.NotFound("No logbook found with uuid '%s'"
|
||||||
|
% book_uuid)
|
||||||
|
|
||||||
def get_logbooks(self):
|
def get_logbooks(self):
|
||||||
for lb in list(six.itervalues(self.backend.log_books)):
|
# Don't hold locks while iterating...
|
||||||
yield lb
|
with self._lock.read_lock():
|
||||||
|
book_uuids = set(six.iterkeys(self._memory.log_books))
|
||||||
|
for book_uuid in book_uuids:
|
||||||
|
try:
|
||||||
|
with self._lock.read_lock():
|
||||||
|
book = self._helper.construct(book_uuid,
|
||||||
|
self._memory.log_books)
|
||||||
|
yield book
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
@ -137,7 +137,7 @@ class LogBook(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_dict(cls, data, unmarshal_time=False):
|
def from_dict(cls, data, unmarshal_time=False):
|
||||||
"""Translates the given data into an instance of this class."""
|
"""Translates the given dictionary into an instance of this class."""
|
||||||
if not unmarshal_time:
|
if not unmarshal_time:
|
||||||
unmarshal_fn = lambda x: x
|
unmarshal_fn = lambda x: x
|
||||||
else:
|
else:
|
||||||
@ -163,6 +163,17 @@ class LogBook(object):
|
|||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self._flowdetails_by_id)
|
return len(self._flowdetails_by_id)
|
||||||
|
|
||||||
|
def copy(self, retain_contents=True):
|
||||||
|
"""Copies/clones this log book."""
|
||||||
|
clone = copy.copy(self)
|
||||||
|
if not retain_contents:
|
||||||
|
clone._flowdetails_by_id = {}
|
||||||
|
else:
|
||||||
|
clone._flowdetails_by_id = self._flowdetails_by_id.copy()
|
||||||
|
if self.meta:
|
||||||
|
clone.meta = self.meta.copy()
|
||||||
|
return clone
|
||||||
|
|
||||||
|
|
||||||
class FlowDetail(object):
|
class FlowDetail(object):
|
||||||
"""A container of atom details, a name and associated metadata.
|
"""A container of atom details, a name and associated metadata.
|
||||||
@ -186,7 +197,7 @@ class FlowDetail(object):
|
|||||||
"""Updates the objects state to be the same as the given one."""
|
"""Updates the objects state to be the same as the given one."""
|
||||||
if fd is self:
|
if fd is self:
|
||||||
return self
|
return self
|
||||||
self._atomdetails_by_id = dict(fd._atomdetails_by_id)
|
self._atomdetails_by_id = fd._atomdetails_by_id
|
||||||
self.state = fd.state
|
self.state = fd.state
|
||||||
self.meta = fd.meta
|
self.meta = fd.meta
|
||||||
return self
|
return self
|
||||||
@ -206,6 +217,17 @@ class FlowDetail(object):
|
|||||||
self.state = fd.state
|
self.state = fd.state
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def copy(self, retain_contents=True):
|
||||||
|
"""Copies/clones this flow detail."""
|
||||||
|
clone = copy.copy(self)
|
||||||
|
if not retain_contents:
|
||||||
|
clone._atomdetails_by_id = {}
|
||||||
|
else:
|
||||||
|
clone._atomdetails_by_id = self._atomdetails_by_id.copy()
|
||||||
|
if self.meta:
|
||||||
|
clone.meta = self.meta.copy()
|
||||||
|
return clone
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
"""Translates the internal state of this object to a dictionary.
|
"""Translates the internal state of this object to a dictionary.
|
||||||
|
|
||||||
@ -380,6 +402,7 @@ class AtomDetail(object):
|
|||||||
|
|
||||||
class TaskDetail(AtomDetail):
|
class TaskDetail(AtomDetail):
|
||||||
"""This class represents a task detail for flow task object."""
|
"""This class represents a task detail for flow task object."""
|
||||||
|
|
||||||
def __init__(self, name, uuid):
|
def __init__(self, name, uuid):
|
||||||
super(TaskDetail, self).__init__(name, uuid)
|
super(TaskDetail, self).__init__(name, uuid)
|
||||||
|
|
||||||
@ -410,6 +433,7 @@ class TaskDetail(AtomDetail):
|
|||||||
return self._to_dict_shared()
|
return self._to_dict_shared()
|
||||||
|
|
||||||
def merge(self, other, deep_copy=False):
|
def merge(self, other, deep_copy=False):
|
||||||
|
"""Merges the current object state with the given ones state."""
|
||||||
if not isinstance(other, TaskDetail):
|
if not isinstance(other, TaskDetail):
|
||||||
raise exc.NotImplementedError("Can only merge with other"
|
raise exc.NotImplementedError("Can only merge with other"
|
||||||
" task details")
|
" task details")
|
||||||
@ -421,6 +445,16 @@ class TaskDetail(AtomDetail):
|
|||||||
self.results = copy_fn(other.results)
|
self.results = copy_fn(other.results)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def copy(self):
|
||||||
|
"""Copies/clones this task detail."""
|
||||||
|
clone = copy.copy(self)
|
||||||
|
clone.results = copy.copy(self.results)
|
||||||
|
if self.meta:
|
||||||
|
clone.meta = self.meta.copy()
|
||||||
|
if self.version:
|
||||||
|
clone.version = copy.copy(self.version)
|
||||||
|
return clone
|
||||||
|
|
||||||
|
|
||||||
class RetryDetail(AtomDetail):
|
class RetryDetail(AtomDetail):
|
||||||
"""This class represents a retry detail for retry controller object."""
|
"""This class represents a retry detail for retry controller object."""
|
||||||
@ -434,6 +468,24 @@ class RetryDetail(AtomDetail):
|
|||||||
self.state = state
|
self.state = state
|
||||||
self.intention = states.EXECUTE
|
self.intention = states.EXECUTE
|
||||||
|
|
||||||
|
def copy(self):
|
||||||
|
"""Copies/clones this retry detail."""
|
||||||
|
clone = copy.copy(self)
|
||||||
|
results = []
|
||||||
|
# NOTE(imelnikov): we can't just deep copy Failures, as they
|
||||||
|
# contain tracebacks, which are not copyable.
|
||||||
|
for (data, failures) in self.results:
|
||||||
|
copied_failures = {}
|
||||||
|
for (key, failure) in six.iteritems(failures):
|
||||||
|
copied_failures[key] = failure
|
||||||
|
results.append((data, copied_failures))
|
||||||
|
clone.results = results
|
||||||
|
if self.meta:
|
||||||
|
clone.meta = self.meta.copy()
|
||||||
|
if self.version:
|
||||||
|
clone.version = copy.copy(self.version)
|
||||||
|
return clone
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_results(self):
|
def last_results(self):
|
||||||
try:
|
try:
|
||||||
@ -496,6 +548,7 @@ class RetryDetail(AtomDetail):
|
|||||||
return base
|
return base
|
||||||
|
|
||||||
def merge(self, other, deep_copy=False):
|
def merge(self, other, deep_copy=False):
|
||||||
|
"""Merges the current object state with the given ones state."""
|
||||||
if not isinstance(other, RetryDetail):
|
if not isinstance(other, RetryDetail):
|
||||||
raise exc.NotImplementedError("Can only merge with other"
|
raise exc.NotImplementedError("Can only merge with other"
|
||||||
" retry details")
|
" retry details")
|
||||||
|
@ -559,7 +559,7 @@ class RetryTest(utils.EngineTestBase):
|
|||||||
# we execute retry
|
# we execute retry
|
||||||
engine.storage.save('flow-1_retry', 1)
|
engine.storage.save('flow-1_retry', 1)
|
||||||
# task fails
|
# task fails
|
||||||
fail = failure.Failure.from_exception(RuntimeError('foo')),
|
fail = failure.Failure.from_exception(RuntimeError('foo'))
|
||||||
engine.storage.save('task1', fail, state=st.FAILURE)
|
engine.storage.save('task1', fail, state=st.FAILURE)
|
||||||
if when == 'task fails':
|
if when == 'task fails':
|
||||||
return engine
|
return engine
|
||||||
|
@ -74,7 +74,7 @@ def _are_equal_exc_info_tuples(ei1, ei2):
|
|||||||
|
|
||||||
|
|
||||||
class Failure(object):
|
class Failure(object):
|
||||||
"""Object that represents failure.
|
"""An immutable object that represents failure.
|
||||||
|
|
||||||
Failure objects encapsulate exception information so that they can be
|
Failure objects encapsulate exception information so that they can be
|
||||||
re-used later to re-raise, inspect, examine, log, print, serialize,
|
re-used later to re-raise, inspect, examine, log, print, serialize,
|
||||||
|
Loading…
Reference in New Issue
Block a user