Retrieve the store from flowdetails as well, if it exists

Gives users a more permanent way to provide an initial set of
arguments to a flow.

Change-Id: Ib9c3d60882548120d467a645bbac9be78408bac3
Implements: blueprint flow-details-keep-store
This commit is contained in:
Greg Hill 2015-10-28 09:07:58 -05:00
parent 31764bfb96
commit 5ce07b2de1
4 changed files with 213 additions and 5 deletions

View File

@ -175,6 +175,66 @@ might look like:
time.sleep(coffee_break_time)
...
There are a few ways to provide arguments to the flow. The first option is to
add a ``store`` to the flowdetail object in the
:py:class:`logbook <taskflow.persistence.models.LogBook>`.
You can also provide a ``store`` in the
:py:class:`job <taskflow.jobs.base.Job>` itself when posting it to the
job board. If both ``store`` values are found, they will be combined,
with the :py:class:`job <taskflow.jobs.base.Job>` ``store``
overriding the :py:class:`logbook <taskflow.persistence.models.LogBook>`
``store``.
.. code-block:: python
import uuid
from taskflow import engines
from taskflow.persistence import backends as persistence_backends
from taskflow.persistence import models
from taskflow.jobs import backends as job_backends
...
persistence = persistence_backends.fetch({
"connection': "mysql",
"user": ...,
"password": ...,
})
board = job_backends.fetch('my-board', {
"board": "zookeeper",
}, persistence=persistence)
book = models.LogBook('my-book', uuid.uuid4())
flow_detail = models.FlowDetail('my-job', uuid.uuid4())
book.add(flow_detail)
connection = persistence.get_connection()
connection.save_logbook(book)
flow_detail.meta['store'] = {'a': 1, 'c': 3}
job_details = {
"flow_uuid": flow_detail.uuid,
"store": {'a': 2, 'b': 1}
}
engines.save_factory_details(flow_detail, flow_factory,
factory_args=[],
factory_kwargs={},
backend=persistence)
jobboard = get_jobboard(zk_client)
jobboard.connect()
job = jobboard.post('my-job', book=book, details=job_details)
# the flow global parameters are now the combined store values
# {'a': 2, 'b': 1', 'c': 3}
...
Types
=====

View File

@ -112,10 +112,14 @@ class Conductor(object):
def _engine_from_job(self, job):
"""Extracts an engine from a job (via some manner)."""
flow_detail = self._flow_detail_from_job(job)
if job.details and 'store' in job.details:
store = dict(job.details["store"])
else:
store = {}
if flow_detail.meta and 'store' in flow_detail.meta:
store.update(flow_detail.meta['store'])
if job.details and 'store' in job.details:
store.update(job.details["store"])
engine = engines.load_from_detail(flow_detail, store=store,
engine=self._engine,
backend=self._persistence,

View File

@ -53,6 +53,12 @@ def test_factory(blowup):
return f
def test_store_factory():
f = lf.Flow("test")
f.add(test_utils.TaskMultiArg('task1'))
return f
def single_factory():
return futurist.ThreadPoolExecutor(max_workers=1)
@ -229,6 +235,137 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
self.assertIsNotNone(fd)
self.assertEqual(st.REVERTED, fd.state)
def test_missing_store(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_store_factory,
[], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertIsNone(fd.state)
def test_job_store(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
store = {'x': True, 'y': False, 'z': None}
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_store_factory,
[], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid,
'store': store})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
def test_flowdetails_store(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
store = {'x': True, 'y': False, 'z': None}
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence,
meta={'store': store})
engines.save_factory_details(fd, test_store_factory,
[], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
def test_combined_store(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
flow_store = {'x': True, 'y': False}
job_store = {'z': None}
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence,
meta={'store': flow_store})
engines.save_factory_details(fd, test_store_factory,
[], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid,
'store': job_store})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
class NonBlockingExecutorTest(test.TestCase):
def test_bad_wait_timeout(self):

View File

@ -37,7 +37,7 @@ def temporary_log_book(backend=None):
return book
def temporary_flow_detail(backend=None):
def temporary_flow_detail(backend=None, meta=None):
"""Creates a temporary flow detail and logbook in the given backend.
Mainly useful for tests and other use cases where a temporary flow detail
@ -45,7 +45,14 @@ def temporary_flow_detail(backend=None):
"""
flow_id = uuidutils.generate_uuid()
book = temporary_log_book(backend)
book.add(models.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
flow_detail = models.FlowDetail(name='tmp-flow-detail', uuid=flow_id)
if meta is not None:
if flow_detail.meta is None:
flow_detail.meta = {}
flow_detail.meta.update(meta)
book.add(flow_detail)
if backend is not None:
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)