Default to using a thread-safe storage unit

Instead of having a nominally useful single-threaded storage
unit that uses a dummy r/w lock and a multi-threaded storage
unit just have the storage unit by default protect itself from
multi-threading calls being used on it (with the appropriate
reader/writer locks being activated to make this work
correctly).

Change-Id: Ib6879edb465156a8e54fd5b4002550d1cec49137
This commit is contained in:
Joshua Harlow
2015-02-05 16:31:33 -08:00
committed by Joshua Harlow
parent 2cd90745ea
commit 08a1846396
9 changed files with 26 additions and 79 deletions

View File

@@ -242,9 +242,9 @@ This stage starts by setting up the storage needed for all atoms in the
previously created graph, ensuring that corresponding previously created graph, ensuring that corresponding
:py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects :py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects
are created for each node in the graph. Once this is done final validation are created for each node in the graph. Once this is done final validation
occurs on the requirements that are needed to start execution and what storage occurs on the requirements that are needed to start execution and what
provides. If there is any atom or flow requirements not satisfied then :py:class:`~taskflow.storage.Storage` provides. If there is any atom or flow
execution will not be allowed to continue. requirements not satisfied then execution will not be allowed to continue.
Execution Execution
--------- ---------
@@ -311,7 +311,8 @@ atoms result will be examined and finalized using a
:py:class:`~taskflow.engines.action_engine.completer.Completer` implementation. :py:class:`~taskflow.engines.action_engine.completer.Completer` implementation.
It typically will persist results to a provided persistence backend (saved It typically will persist results to a provided persistence backend (saved
into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the
:py:class:`~taskflow.storage.Storage` helper) and reflect
the new state of the atom. At this point what typically happens falls into two the new state of the atom. At this point what typically happens falls into two
categories, one for if that atom failed and one for if it did not. If the atom categories, one for if that atom failed and one for if it did not. If the atom
failed it may be set to a new intention such as ``RETRY`` or failed it may be set to a new intention such as ``RETRY`` or

View File

@@ -279,6 +279,11 @@ Implementations
.. automodule:: taskflow.persistence.backends.impl_sqlalchemy .. automodule:: taskflow.persistence.backends.impl_sqlalchemy
.. automodule:: taskflow.persistence.backends.impl_zookeeper .. automodule:: taskflow.persistence.backends.impl_zookeeper
Storage
=======
.. automodule:: taskflow.storage
Hierarchy Hierarchy
========= =========

View File

@@ -28,7 +28,6 @@ from taskflow.engines.action_engine import runtime
from taskflow.engines import base from taskflow.engines import base
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import states from taskflow import states
from taskflow import storage as atom_storage
from taskflow.types import failure from taskflow.types import failure
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
from taskflow.utils import misc from taskflow.utils import misc
@@ -218,7 +217,6 @@ class ActionEngine(base.Engine):
class SerialActionEngine(ActionEngine): class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner.""" """Engine that runs tasks in serial manner."""
_storage_factory = atom_storage.SingleThreadedStorage
def __init__(self, flow, flow_detail, backend, options): def __init__(self, flow, flow_detail, backend, options):
super(SerialActionEngine, self).__init__(flow, flow_detail, super(SerialActionEngine, self).__init__(flow, flow_detail,
@@ -276,8 +274,6 @@ String (case insensitive) Executor used
.. |cf| replace:: concurrent.futures .. |cf| replace:: concurrent.futures
""" """
_storage_factory = atom_storage.MultiThreadedStorage
# One of these types should match when a object (non-string) is provided # One of these types should match when a object (non-string) is provided
# for the 'executor' option. # for the 'executor' option.
# #

View File

@@ -19,6 +19,7 @@ import abc
import six import six
from taskflow import storage
from taskflow.types import notifier from taskflow.types import notifier
from taskflow.utils import deprecation from taskflow.utils import deprecation
from taskflow.utils import misc from taskflow.utils import misc
@@ -74,11 +75,7 @@ class Engine(object):
@misc.cachedproperty @misc.cachedproperty
def storage(self): def storage(self):
"""The storage unit for this flow.""" """The storage unit for this flow."""
return self._storage_factory(self._flow_detail, self._backend) return storage.Storage(self._flow_detail, backend=self._backend)
@abc.abstractproperty
def _storage_factory(self):
"""Storage factory that will be used to generate storage objects."""
@abc.abstractmethod @abc.abstractmethod
def compile(self): def compile(self):

View File

@@ -17,7 +17,6 @@
from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import protocol as pr
from taskflow import storage as t_storage
class WorkerBasedActionEngine(engine.ActionEngine): class WorkerBasedActionEngine(engine.ActionEngine):
@@ -46,8 +45,6 @@ class WorkerBasedActionEngine(engine.ActionEngine):
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
""" """
_storage_factory = t_storage.SingleThreadedStorage
def __init__(self, flow, flow_detail, backend, options): def __init__(self, flow, flow_detail, backend, options):
super(WorkerBasedActionEngine, self).__init__(flow, flow_detail, super(WorkerBasedActionEngine, self).__init__(flow, flow_detail,
backend, options) backend, options)

View File

@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc
import contextlib import contextlib
from oslo_utils import reflection from oslo_utils import reflection
@@ -107,9 +106,8 @@ def _item_from_first_of(providers, looking_for):
" extraction" % (looking_for, providers)) " extraction" % (looking_for, providers))
@six.add_metaclass(abc.ABCMeta)
class Storage(object): class Storage(object):
"""Interface between engines and logbook. """Interface between engines and logbook and its backend (if any).
This class provides a simple interface to save atoms of a given flow and This class provides a simple interface to save atoms of a given flow and
associated activity and results to persistence layer (logbook, associated activity and results to persistence layer (logbook,
@@ -119,15 +117,21 @@ class Storage(object):
""" """
injector_name = '_TaskFlow_INJECTOR' injector_name = '_TaskFlow_INJECTOR'
"""Injector task detail name.
This task detail is a **special** detail that will be automatically
created and saved to store **persistent** injected values (name conflicts
with it must be avoided) that are *global* to the flow being executed.
"""
def __init__(self, flow_detail, backend=None): def __init__(self, flow_detail, backend=None):
self._result_mappings = {} self._result_mappings = {}
self._reverse_mapping = {} self._reverse_mapping = {}
self._backend = backend self._backend = backend
self._flowdetail = flow_detail self._flowdetail = flow_detail
self._lock = self._lock_cls()
self._transients = {} self._transients = {}
self._injected_args = {} self._injected_args = {}
self._lock = lock_utils.ReaderWriterLock()
# NOTE(imelnikov): failure serialization looses information, # NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping. # so we cache failures here, in atom name -> failure mapping.
@@ -150,16 +154,6 @@ class Storage(object):
self._set_result_mapping(injector_td.name, self._set_result_mapping(injector_td.name,
dict((name, name) for name in names)) dict((name, name) for name in names))
@abc.abstractproperty
def _lock_cls(self):
"""Lock class used to generate reader/writer locks.
These locks are used for protecting read/write access to the
underlying storage backend when internally mutating operations occur.
They ensure that we read and write data in a consistent manner when
being used in a multithreaded situation.
"""
def _with_connection(self, functor, *args, **kwargs): def _with_connection(self, functor, *args, **kwargs):
# NOTE(harlowja): Activate the given function with a backend # NOTE(harlowja): Activate the given function with a backend
# connection, if a backend is provided in the first place, otherwise # connection, if a backend is provided in the first place, otherwise
@@ -771,13 +765,3 @@ class Storage(object):
histories.append((ad.name, histories.append((ad.name,
self._translate_into_history(ad))) self._translate_into_history(ad)))
return histories return histories
class MultiThreadedStorage(Storage):
"""Storage that uses locks to protect against concurrent access."""
_lock_cls = lock_utils.ReaderWriterLock
class SingleThreadedStorage(Storage):
"""Storage that uses dummy locks when you really don't need locks."""
_lock_cls = lock_utils.DummyReaderWriterLock

View File

@@ -35,7 +35,7 @@ class _RunnerTestMixin(object):
def _make_runtime(self, flow, initial_state=None): def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler(flow).compile() compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow) flow_detail = pu.create_flow_detail(flow)
store = storage.SingleThreadedStorage(flow_detail) store = storage.Storage(flow_detail)
# This ensures the tasks exist in storage... # This ensures the tasks exist in storage...
for task in compilation.execution_graph: for task in compilation.execution_graph:
store.ensure_atom(task) store.ensure_atom(task)

View File

@@ -49,17 +49,14 @@ class StorageTestMixin(object):
for t in threads: for t in threads:
t.join() t.join()
def _get_storage(self, flow_detail=None, threaded=False): def _get_storage(self, flow_detail=None):
if flow_detail is None: if flow_detail is None:
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend) _lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
storage_cls = storage.SingleThreadedStorage return storage.Storage(flow_detail=flow_detail, backend=self.backend)
if threaded:
storage_cls = storage.MultiThreadedStorage
return storage_cls(flow_detail=flow_detail, backend=self.backend)
def test_non_saving_storage(self): def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend) _lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.SingleThreadedStorage(flow_detail=flow_detail) s = storage.Storage(flow_detail=flow_detail)
s.ensure_atom(test_utils.NoopTask('my_task')) s.ensure_atom(test_utils.NoopTask('my_task'))
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task'))) self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
@@ -311,7 +308,7 @@ class StorageTestMixin(object):
}) })
def test_many_thread_ensure_same_task(self): def test_many_thread_ensure_same_task(self):
s = self._get_storage(threaded=True) s = self._get_storage()
def ensure_my_task(): def ensure_my_task():
s.ensure_atom(test_utils.NoopTask('my_task')) s.ensure_atom(test_utils.NoopTask('my_task'))
@@ -325,7 +322,7 @@ class StorageTestMixin(object):
self.assertEqual(1, len(s._flowdetail)) self.assertEqual(1, len(s._flowdetail))
def test_many_thread_inject(self): def test_many_thread_inject(self):
s = self._get_storage(threaded=True) s = self._get_storage()
def inject_values(values): def inject_values(values):
s.inject(values) s.inject(values)

View File

@@ -241,36 +241,6 @@ class ReaderWriterLock(object):
self._cond.release() self._cond.release()
class DummyReaderWriterLock(object):
"""A dummy reader/writer lock.
This dummy lock doesn't lock anything but provides the same functions as a
normal reader/writer lock class and can be useful in unit tests or other
similar scenarios (do *not* use it if locking is actually required).
"""
@contextlib.contextmanager
def write_lock(self):
yield self
@contextlib.contextmanager
def read_lock(self):
yield self
@property
def owner(self):
return None
def is_reader(self):
return False
def is_writer(self, check_pending=True):
return False
@property
def has_pending_writers(self):
return False
class MultiLock(object): class MultiLock(object):
"""A class which attempts to obtain & release many locks at once. """A class which attempts to obtain & release many locks at once.