Merge "Default to using a thread-safe storage unit"

This commit is contained in:
Jenkins 2015-02-07 19:11:20 +00:00 committed by Gerrit Code Review
commit a2bb00c68a
9 changed files with 26 additions and 79 deletions

View File

@ -242,9 +242,9 @@ This stage starts by setting up the storage needed for all atoms in the
previously created graph, ensuring that corresponding
:py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects
are created for each node in the graph. Once this is done final validation
occurs on the requirements that are needed to start execution and what storage
provides. If there is any atom or flow requirements not satisfied then
execution will not be allowed to continue.
occurs on the requirements that are needed to start execution and what
:py:class:`~taskflow.storage.Storage` provides. If there is any atom or flow
requirements not satisfied then execution will not be allowed to continue.
Execution
---------
@ -311,7 +311,8 @@ atoms result will be examined and finalized using a
:py:class:`~taskflow.engines.action_engine.completer.Completer` implementation.
It typically will persist results to a provided persistence backend (saved
into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect
and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the
:py:class:`~taskflow.storage.Storage` helper) and reflect
the new state of the atom. At this point what typically happens falls into two
categories, one for if that atom failed and one for if it did not. If the atom
failed it may be set to a new intention such as ``RETRY`` or

View File

@ -279,6 +279,11 @@ Implementations
.. automodule:: taskflow.persistence.backends.impl_sqlalchemy
.. automodule:: taskflow.persistence.backends.impl_zookeeper
Storage
=======
.. automodule:: taskflow.storage
Hierarchy
=========

View File

@ -28,7 +28,6 @@ from taskflow.engines.action_engine import runtime
from taskflow.engines import base
from taskflow import exceptions as exc
from taskflow import states
from taskflow import storage as atom_storage
from taskflow.types import failure
from taskflow.utils import lock_utils
from taskflow.utils import misc
@ -218,7 +217,6 @@ class ActionEngine(base.Engine):
class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner."""
_storage_factory = atom_storage.SingleThreadedStorage
def __init__(self, flow, flow_detail, backend, options):
super(SerialActionEngine, self).__init__(flow, flow_detail,
@ -276,8 +274,6 @@ String (case insensitive) Executor used
.. |cf| replace:: concurrent.futures
"""
_storage_factory = atom_storage.MultiThreadedStorage
# One of these types should match when a object (non-string) is provided
# for the 'executor' option.
#

View File

@ -19,6 +19,7 @@ import abc
import six
from taskflow import storage
from taskflow.types import notifier
from taskflow.utils import deprecation
from taskflow.utils import misc
@ -74,11 +75,7 @@ class Engine(object):
@misc.cachedproperty
def storage(self):
"""The storage unit for this flow."""
return self._storage_factory(self._flow_detail, self._backend)
@abc.abstractproperty
def _storage_factory(self):
"""Storage factory that will be used to generate storage objects."""
return storage.Storage(self._flow_detail, backend=self._backend)
@abc.abstractmethod
def compile(self):

View File

@ -17,7 +17,6 @@
from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
from taskflow import storage as t_storage
class WorkerBasedActionEngine(engine.ActionEngine):
@ -46,8 +45,6 @@ class WorkerBasedActionEngine(engine.ActionEngine):
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
"""
_storage_factory = t_storage.SingleThreadedStorage
def __init__(self, flow, flow_detail, backend, options):
super(WorkerBasedActionEngine, self).__init__(flow, flow_detail,
backend, options)

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
from oslo_utils import reflection
@ -107,9 +106,8 @@ def _item_from_first_of(providers, looking_for):
" extraction" % (looking_for, providers))
@six.add_metaclass(abc.ABCMeta)
class Storage(object):
"""Interface between engines and logbook.
"""Interface between engines and logbook and its backend (if any).
This class provides a simple interface to save atoms of a given flow and
associated activity and results to persistence layer (logbook,
@ -119,15 +117,21 @@ class Storage(object):
"""
injector_name = '_TaskFlow_INJECTOR'
"""Injector task detail name.
This task detail is a **special** detail that will be automatically
created and saved to store **persistent** injected values (name conflicts
with it must be avoided) that are *global* to the flow being executed.
"""
def __init__(self, flow_detail, backend=None):
self._result_mappings = {}
self._reverse_mapping = {}
self._backend = backend
self._flowdetail = flow_detail
self._lock = self._lock_cls()
self._transients = {}
self._injected_args = {}
self._lock = lock_utils.ReaderWriterLock()
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping.
@ -150,16 +154,6 @@ class Storage(object):
self._set_result_mapping(injector_td.name,
dict((name, name) for name in names))
@abc.abstractproperty
def _lock_cls(self):
"""Lock class used to generate reader/writer locks.
These locks are used for protecting read/write access to the
underlying storage backend when internally mutating operations occur.
They ensure that we read and write data in a consistent manner when
being used in a multithreaded situation.
"""
def _with_connection(self, functor, *args, **kwargs):
# NOTE(harlowja): Activate the given function with a backend
# connection, if a backend is provided in the first place, otherwise
@ -771,13 +765,3 @@ class Storage(object):
histories.append((ad.name,
self._translate_into_history(ad)))
return histories
class MultiThreadedStorage(Storage):
"""Storage that uses locks to protect against concurrent access."""
_lock_cls = lock_utils.ReaderWriterLock
class SingleThreadedStorage(Storage):
"""Storage that uses dummy locks when you really don't need locks."""
_lock_cls = lock_utils.DummyReaderWriterLock

View File

@ -35,7 +35,7 @@ class _RunnerTestMixin(object):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
store = storage.SingleThreadedStorage(flow_detail)
store = storage.Storage(flow_detail)
# This ensures the tasks exist in storage...
for task in compilation.execution_graph:
store.ensure_atom(task)

View File

@ -49,17 +49,14 @@ class StorageTestMixin(object):
for t in threads:
t.join()
def _get_storage(self, flow_detail=None, threaded=False):
def _get_storage(self, flow_detail=None):
if flow_detail is None:
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
storage_cls = storage.SingleThreadedStorage
if threaded:
storage_cls = storage.MultiThreadedStorage
return storage_cls(flow_detail=flow_detail, backend=self.backend)
return storage.Storage(flow_detail=flow_detail, backend=self.backend)
def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.SingleThreadedStorage(flow_detail=flow_detail)
s = storage.Storage(flow_detail=flow_detail)
s.ensure_atom(test_utils.NoopTask('my_task'))
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
@ -311,7 +308,7 @@ class StorageTestMixin(object):
})
def test_many_thread_ensure_same_task(self):
s = self._get_storage(threaded=True)
s = self._get_storage()
def ensure_my_task():
s.ensure_atom(test_utils.NoopTask('my_task'))
@ -325,7 +322,7 @@ class StorageTestMixin(object):
self.assertEqual(1, len(s._flowdetail))
def test_many_thread_inject(self):
s = self._get_storage(threaded=True)
s = self._get_storage()
def inject_values(values):
s.inject(values)

View File

@ -259,36 +259,6 @@ class ReaderWriterLock(object):
self._cond.release()
class DummyReaderWriterLock(object):
"""A dummy reader/writer lock.
This dummy lock doesn't lock anything but provides the same functions as a
normal reader/writer lock class and can be useful in unit tests or other
similar scenarios (do *not* use it if locking is actually required).
"""
@contextlib.contextmanager
def write_lock(self):
yield self
@contextlib.contextmanager
def read_lock(self):
yield self
@property
def owner(self):
return None
def is_reader(self):
return False
def is_writer(self, check_pending=True):
return False
@property
def has_pending_writers(self):
return False
class MultiLock(object):
"""A class which attempts to obtain & release many locks at once.