Rework the in-memory backend

This avoids storing direct copies of incoming objects and
makes sure that we always merge incoming objects (if a
saved object already exists) or create a copy of the
incoming object if it does not exist when storing.

On retrieval we also always return copies instead of
returning the data that is stored internally to avoid the
problems that can be hard to detect when users (engine
or other) modify those source objects.

Fixes bug 1365830

Also fixes a retry test case issue that was discovered due
to this more easily useable/understandable memory backend
changes...

Change-Id: I2afdda7beb71e35f7e12d9fd7ccf90b6c5447274
This commit is contained in:
Joshua Harlow 2014-09-25 18:27:05 -07:00
parent 22415f7c1d
commit 84b387f8bb
4 changed files with 211 additions and 73 deletions

View File

@ -15,16 +15,97 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import functools
import six import six
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import logging from taskflow import logging
from taskflow.persistence.backends import base from taskflow.persistence.backends import base
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class _Memory(object):
"""Where the data is really stored."""
def __init__(self):
self.log_books = {}
self.flow_details = {}
self.atom_details = {}
def clear_all(self):
self.log_books.clear()
self.flow_details.clear()
self.atom_details.clear()
class _MemoryHelper(object):
"""Helper functionality for the memory backends & connections."""
def __init__(self, memory):
self._memory = memory
@staticmethod
def _fetch_clone_args(incoming):
if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)):
# We keep our own copy of the added contents of the following
# types so we don't need the clone to retain them directly...
return {
'retain_contents': False,
}
return {}
def construct(self, uuid, container):
"""Reconstructs a object from the given uuid and storage container."""
source = container[uuid]
clone_kwargs = self._fetch_clone_args(source)
clone = source['object'].copy(**clone_kwargs)
rebuilder = source.get('rebuilder')
if rebuilder:
for component in map(rebuilder, source['components']):
clone.add(component)
return clone
def merge(self, incoming, saved_info=None):
"""Merges the incoming object into the local memories copy."""
if saved_info is None:
if isinstance(incoming, logbook.LogBook):
saved_info = self._memory.log_books.setdefault(
incoming.uuid, {})
elif isinstance(incoming, logbook.FlowDetail):
saved_info = self._memory.flow_details.setdefault(
incoming.uuid, {})
elif isinstance(incoming, logbook.AtomDetail):
saved_info = self._memory.atom_details.setdefault(
incoming.uuid, {})
else:
raise TypeError("Unknown how to merge type '%s'"
% type(incoming))
try:
saved_info['object'].merge(incoming)
except KeyError:
clone_kwargs = self._fetch_clone_args(incoming)
saved_info['object'] = incoming.copy(**clone_kwargs)
if isinstance(incoming, logbook.LogBook):
flow_details = saved_info.setdefault('components', set())
if 'rebuilder' not in saved_info:
saved_info['rebuilder'] = functools.partial(
self.construct, container=self._memory.flow_details)
for flow_detail in incoming:
flow_details.add(self.merge(flow_detail))
elif isinstance(incoming, logbook.FlowDetail):
atom_details = saved_info.setdefault('components', set())
if 'rebuilder' not in saved_info:
saved_info['rebuilder'] = functools.partial(
self.construct, container=self._memory.atom_details)
for atom_detail in incoming:
atom_details.add(self.merge(atom_detail))
return incoming.uuid
class MemoryBackend(base.Backend): class MemoryBackend(base.Backend):
"""A in-memory (non-persistent) backend. """A in-memory (non-persistent) backend.
@ -33,21 +114,28 @@ class MemoryBackend(base.Backend):
""" """
def __init__(self, conf=None): def __init__(self, conf=None):
super(MemoryBackend, self).__init__(conf) super(MemoryBackend, self).__init__(conf)
self._log_books = {} self._memory = _Memory()
self._flow_details = {} self._helper = _MemoryHelper(self._memory)
self._atom_details = {} self._lock = lock_utils.ReaderWriterLock()
def _construct_from(self, container):
return dict((uuid, self._helper.construct(uuid, container))
for uuid in six.iterkeys(container))
@property @property
def log_books(self): def log_books(self):
return self._log_books with self._lock.read_lock():
return self._construct_from(self._memory.log_books)
@property @property
def flow_details(self): def flow_details(self):
return self._flow_details with self._lock.read_lock():
return self._construct_from(self._memory.flow_details)
@property @property
def atom_details(self): def atom_details(self):
return self._atom_details with self._lock.read_lock():
return self._construct_from(self._memory.atom_details)
def get_connection(self): def get_connection(self):
return Connection(self) return Connection(self)
@ -57,8 +145,13 @@ class MemoryBackend(base.Backend):
class Connection(base.Connection): class Connection(base.Connection):
"""A connection to an in-memory backend."""
def __init__(self, backend): def __init__(self, backend):
self._backend = backend self._backend = backend
self._helper = backend._helper
self._memory = backend._memory
self._lock = backend._lock
def upgrade(self): def upgrade(self):
pass pass
@ -74,78 +167,70 @@ class Connection(base.Connection):
pass pass
def clear_all(self): def clear_all(self):
count = 0 with self._lock.write_lock():
for book_uuid in list(six.iterkeys(self.backend.log_books)): self._memory.clear_all()
self.destroy_logbook(book_uuid)
count += 1
return count
def destroy_logbook(self, book_uuid): def destroy_logbook(self, book_uuid):
try: with self._lock.write_lock():
# Do the same cascading delete that the sql layer does. try:
lb = self.backend.log_books.pop(book_uuid) # Do the same cascading delete that the sql layer does.
for fd in lb: book_info = self._memory.log_books.pop(book_uuid)
self.backend.flow_details.pop(fd.uuid, None) except KeyError:
for ad in fd: raise exc.NotFound("No logbook found with uuid '%s'"
self.backend.atom_details.pop(ad.uuid, None) % book_uuid)
except KeyError: else:
raise exc.NotFound("No logbook found with id: %s" % book_uuid) while book_info['components']:
flow_uuid = book_info['components'].pop()
flow_info = self._memory.flow_details.pop(flow_uuid)
while flow_info['components']:
atom_uuid = flow_info['components'].pop()
self._memory.atom_details.pop(atom_uuid)
def update_atom_details(self, atom_detail): def update_atom_details(self, atom_detail):
try: with self._lock.write_lock():
e_ad = self.backend.atom_details[atom_detail.uuid] try:
except KeyError: atom_info = self._memory.atom_details[atom_detail.uuid]
raise exc.NotFound("No atom details found with id: %s" return self._helper.construct(
% atom_detail.uuid) self._helper.merge(atom_detail, saved_info=atom_info),
return e_ad.merge(atom_detail, deep_copy=True) self._memory.atom_details)
except KeyError:
def _save_flowdetail_atoms(self, e_fd, flow_detail): raise exc.NotFound("No atom details found with uuid '%s'"
for atom_detail in flow_detail: % atom_detail.uuid)
e_ad = e_fd.find(atom_detail.uuid)
if e_ad is None:
e_fd.add(atom_detail)
self.backend.atom_details[atom_detail.uuid] = atom_detail
else:
e_ad.merge(atom_detail, deep_copy=True)
def update_flow_details(self, flow_detail): def update_flow_details(self, flow_detail):
try: with self._lock.write_lock():
e_fd = self.backend.flow_details[flow_detail.uuid] try:
except KeyError: flow_info = self._memory.flow_details[flow_detail.uuid]
raise exc.NotFound("No flow details found with id: %s" return self._helper.construct(
% flow_detail.uuid) self._helper.merge(flow_detail, saved_info=flow_info),
e_fd.merge(flow_detail, deep_copy=True) self._memory.flow_details)
self._save_flowdetail_atoms(e_fd, flow_detail) except KeyError:
return e_fd raise exc.NotFound("No flow details found with uuid '%s'"
% flow_detail.uuid)
def save_logbook(self, book): def save_logbook(self, book):
# Get a existing logbook model (or create it if it isn't there). with self._lock.write_lock():
try: return self._helper.construct(self._helper.merge(book),
e_lb = self.backend.log_books[book.uuid] self._memory.log_books)
except KeyError:
e_lb = logbook.LogBook(book.name, uuid=book.uuid)
self.backend.log_books[e_lb.uuid] = e_lb
e_lb.merge(book, deep_copy=True)
# Add anything in to the new logbook that isn't already in the existing
# logbook.
for flow_detail in book:
try:
e_fd = self.backend.flow_details[flow_detail.uuid]
except KeyError:
e_fd = logbook.FlowDetail(flow_detail.name, flow_detail.uuid)
e_lb.add(e_fd)
self.backend.flow_details[e_fd.uuid] = e_fd
e_fd.merge(flow_detail, deep_copy=True)
self._save_flowdetail_atoms(e_fd, flow_detail)
return e_lb
def get_logbook(self, book_uuid): def get_logbook(self, book_uuid):
try: with self._lock.read_lock():
return self.backend.log_books[book_uuid] try:
except KeyError: return self._helper.construct(book_uuid,
raise exc.NotFound("No logbook found with id: %s" % book_uuid) self._memory.log_books)
except KeyError:
raise exc.NotFound("No logbook found with uuid '%s'"
% book_uuid)
def get_logbooks(self): def get_logbooks(self):
for lb in list(six.itervalues(self.backend.log_books)): # Don't hold locks while iterating...
yield lb with self._lock.read_lock():
book_uuids = set(six.iterkeys(self._memory.log_books))
for book_uuid in book_uuids:
try:
with self._lock.read_lock():
book = self._helper.construct(book_uuid,
self._memory.log_books)
yield book
except KeyError:
pass

View File

@ -137,7 +137,7 @@ class LogBook(object):
@classmethod @classmethod
def from_dict(cls, data, unmarshal_time=False): def from_dict(cls, data, unmarshal_time=False):
"""Translates the given data into an instance of this class.""" """Translates the given dictionary into an instance of this class."""
if not unmarshal_time: if not unmarshal_time:
unmarshal_fn = lambda x: x unmarshal_fn = lambda x: x
else: else:
@ -163,6 +163,17 @@ class LogBook(object):
def __len__(self): def __len__(self):
return len(self._flowdetails_by_id) return len(self._flowdetails_by_id)
def copy(self, retain_contents=True):
"""Copies/clones this log book."""
clone = copy.copy(self)
if not retain_contents:
clone._flowdetails_by_id = {}
else:
clone._flowdetails_by_id = self._flowdetails_by_id.copy()
if self.meta:
clone.meta = self.meta.copy()
return clone
class FlowDetail(object): class FlowDetail(object):
"""A container of atom details, a name and associated metadata. """A container of atom details, a name and associated metadata.
@ -186,7 +197,7 @@ class FlowDetail(object):
"""Updates the objects state to be the same as the given one.""" """Updates the objects state to be the same as the given one."""
if fd is self: if fd is self:
return self return self
self._atomdetails_by_id = dict(fd._atomdetails_by_id) self._atomdetails_by_id = fd._atomdetails_by_id
self.state = fd.state self.state = fd.state
self.meta = fd.meta self.meta = fd.meta
return self return self
@ -206,6 +217,17 @@ class FlowDetail(object):
self.state = fd.state self.state = fd.state
return self return self
def copy(self, retain_contents=True):
"""Copies/clones this flow detail."""
clone = copy.copy(self)
if not retain_contents:
clone._atomdetails_by_id = {}
else:
clone._atomdetails_by_id = self._atomdetails_by_id.copy()
if self.meta:
clone.meta = self.meta.copy()
return clone
def to_dict(self): def to_dict(self):
"""Translates the internal state of this object to a dictionary. """Translates the internal state of this object to a dictionary.
@ -380,6 +402,7 @@ class AtomDetail(object):
class TaskDetail(AtomDetail): class TaskDetail(AtomDetail):
"""This class represents a task detail for flow task object.""" """This class represents a task detail for flow task object."""
def __init__(self, name, uuid): def __init__(self, name, uuid):
super(TaskDetail, self).__init__(name, uuid) super(TaskDetail, self).__init__(name, uuid)
@ -410,6 +433,7 @@ class TaskDetail(AtomDetail):
return self._to_dict_shared() return self._to_dict_shared()
def merge(self, other, deep_copy=False): def merge(self, other, deep_copy=False):
"""Merges the current object state with the given ones state."""
if not isinstance(other, TaskDetail): if not isinstance(other, TaskDetail):
raise exc.NotImplementedError("Can only merge with other" raise exc.NotImplementedError("Can only merge with other"
" task details") " task details")
@ -421,6 +445,16 @@ class TaskDetail(AtomDetail):
self.results = copy_fn(other.results) self.results = copy_fn(other.results)
return self return self
def copy(self):
"""Copies/clones this task detail."""
clone = copy.copy(self)
clone.results = copy.copy(self.results)
if self.meta:
clone.meta = self.meta.copy()
if self.version:
clone.version = copy.copy(self.version)
return clone
class RetryDetail(AtomDetail): class RetryDetail(AtomDetail):
"""This class represents a retry detail for retry controller object.""" """This class represents a retry detail for retry controller object."""
@ -434,6 +468,24 @@ class RetryDetail(AtomDetail):
self.state = state self.state = state
self.intention = states.EXECUTE self.intention = states.EXECUTE
def copy(self):
"""Copies/clones this retry detail."""
clone = copy.copy(self)
results = []
# NOTE(imelnikov): we can't just deep copy Failures, as they
# contain tracebacks, which are not copyable.
for (data, failures) in self.results:
copied_failures = {}
for (key, failure) in six.iteritems(failures):
copied_failures[key] = failure
results.append((data, copied_failures))
clone.results = results
if self.meta:
clone.meta = self.meta.copy()
if self.version:
clone.version = copy.copy(self.version)
return clone
@property @property
def last_results(self): def last_results(self):
try: try:
@ -496,6 +548,7 @@ class RetryDetail(AtomDetail):
return base return base
def merge(self, other, deep_copy=False): def merge(self, other, deep_copy=False):
"""Merges the current object state with the given ones state."""
if not isinstance(other, RetryDetail): if not isinstance(other, RetryDetail):
raise exc.NotImplementedError("Can only merge with other" raise exc.NotImplementedError("Can only merge with other"
" retry details") " retry details")

View File

@ -559,7 +559,7 @@ class RetryTest(utils.EngineTestBase):
# we execute retry # we execute retry
engine.storage.save('flow-1_retry', 1) engine.storage.save('flow-1_retry', 1)
# task fails # task fails
fail = failure.Failure.from_exception(RuntimeError('foo')), fail = failure.Failure.from_exception(RuntimeError('foo'))
engine.storage.save('task1', fail, state=st.FAILURE) engine.storage.save('task1', fail, state=st.FAILURE)
if when == 'task fails': if when == 'task fails':
return engine return engine

View File

@ -74,7 +74,7 @@ def _are_equal_exc_info_tuples(ei1, ei2):
class Failure(object): class Failure(object):
"""Object that represents failure. """An immutable object that represents failure.
Failure objects encapsulate exception information so that they can be Failure objects encapsulate exception information so that they can be
re-used later to re-raise, inspect, examine, log, print, serialize, re-used later to re-raise, inspect, examine, log, print, serialize,