Files
deb-python-taskflow/taskflow/tests/unit/conductor/test_blocking.py
Joshua Harlow eaad725508 Allow loading conductors via entrypoints
Add an easy to use helper function that is exposed to
fetch a conductor in a way that matches how engines
and persistence backends and jobboards are loaded.

This also adjusts the single threaded conductor to
be now named blocking conductor and exposes an entrypoint
backend fetch() function to load current and future
conductors.

Change-Id: Id146d847c329d3e8510a8f24c3ec8b918680ddb5
2015-02-16 18:30:18 -08:00

152 lines
5.9 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
from zake import fake_client
from taskflow.conductors import backends
from taskflow import engines
from taskflow.jobs.backends import impl_zookeeper
from taskflow.jobs import base
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import states as st
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.utils import persistence_utils as pu
from taskflow.utils import threading_utils
@contextlib.contextmanager
def close_many(*closeables):
try:
yield
finally:
for c in closeables:
c.close()
def test_factory(blowup):
f = lf.Flow("test")
if not blowup:
f.add(test_utils.ProgressingTask('test1'))
else:
f.add(test_utils.FailingTask("test1"))
return f
ComponentBundle = collections.namedtuple('ComponentBundle',
['board', 'client',
'persistence', 'conductor'])
class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
KIND = 'blocking'
def make_components(self, name='testing', wait_timeout=0.1):
client = fake_client.FakeClient()
persistence = impl_memory.MemoryBackend()
board = impl_zookeeper.ZookeeperJobBoard(name, {},
client=client,
persistence=persistence)
conductor = backends.fetch(self.KIND, name, board,
persistence=persistence,
wait_timeout=wait_timeout)
return ComponentBundle(board, client, persistence, conductor)
def test_connection(self):
components = self.make_components()
components.conductor.connect()
with close_many(components.conductor, components.client):
self.assertTrue(components.board.connected)
self.assertTrue(components.client.connected)
self.assertFalse(components.board.connected)
self.assertFalse(components.client.connected)
def test_run_empty(self):
components = self.make_components()
components.conductor.connect()
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
components.conductor.stop()
self.assertTrue(
components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
t.join()
def test_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading_utils.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
[False], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
def test_fail_run(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading_utils.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
[True], {},
backend=components.persistence)
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
with contextlib.closing(persistence.get_connection()) as conn:
lb = conn.get_logbook(lb.uuid)
fd = lb.find(fd.uuid)
self.assertIsNotNone(fd)
self.assertEqual(st.REVERTED, fd.state)