Remove zake
The zake library was archived a few years back and is no longer installable due to its usage of pkg_resources . Use real zookeeper instance for these tests. Change-Id: If806bef4c7aea4704dab8b98833dc8b44e30ef1d Signed-off-by: Takashi Kajinami <kajinamit@oss.nttdata.com>
This commit is contained in:
@@ -13,6 +13,13 @@
|
||||
- ^releasenotes/.*$
|
||||
- ^\.pre-commit-config\.yaml$
|
||||
|
||||
- job:
|
||||
name: taskflow-functional-zookeeper
|
||||
parent: taskflow-functional
|
||||
vars:
|
||||
tox_environment:
|
||||
PIFPAF_DAEMON: zookeeper
|
||||
|
||||
- job:
|
||||
name: taskflow-functional-redis
|
||||
parent: taskflow-functional
|
||||
@@ -39,9 +46,11 @@
|
||||
- release-notes-jobs-python3
|
||||
check:
|
||||
jobs:
|
||||
- taskflow-functional-zookeeper
|
||||
- taskflow-functional-redis
|
||||
- taskflow-functional-etcd
|
||||
gate:
|
||||
jobs:
|
||||
- taskflow-functional-zookeeper
|
||||
- taskflow-functional-redis
|
||||
- taskflow-functional-etcd
|
||||
|
||||
@@ -19,3 +19,5 @@ libpq-dev [platform:dpkg]
|
||||
redis [platform:rpm tests-functional-redis]
|
||||
redis-server [platform:dpkg tests-functional-redis]
|
||||
redis-sentinel [platform:dpkg tests-functional-redis]
|
||||
zookeeperd [platform:dpkg tests-functional-zookeeper]
|
||||
# NOTE(tkajinam): zookeeper package is not available in CentOS/Fedora
|
||||
|
||||
@@ -4,3 +4,5 @@ backend_services_map:
|
||||
- redis-server
|
||||
- redis-sentinel
|
||||
etcd: []
|
||||
zookeeper:
|
||||
- zookeeper
|
||||
|
||||
@@ -4,3 +4,4 @@ backend_services_map:
|
||||
- redis
|
||||
- redis-sentinel
|
||||
etcd: []
|
||||
zookeeper: []
|
||||
|
||||
@@ -18,4 +18,10 @@
|
||||
enabled: no
|
||||
become: yes
|
||||
loop: "{{ backend_services_map[taskflow_backend_daemon] }}"
|
||||
|
||||
- name: Remove ZOO_LOG_DIR
|
||||
lineinfile:
|
||||
path: /etc/zookeeper/conf/environment
|
||||
regexp: '^ZOO_LOG_DIR=.*'
|
||||
state: absent
|
||||
become: yes
|
||||
when: taskflow_backend_daemon == 'zookeeper'
|
||||
|
||||
@@ -28,10 +28,9 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir))
|
||||
sys.path.insert(0, top_dir)
|
||||
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.jobs import backends
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# In this example we show how a jobboard can be used to post work for other
|
||||
@@ -155,7 +154,8 @@ def main():
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
with contextlib.closing(fake_client.FakeClient()) as c:
|
||||
client = kazoo_utils.make_client({})
|
||||
with contextlib.closing(client) as c:
|
||||
created = []
|
||||
for i in range(0, PRODUCERS):
|
||||
p = threading_utils.daemon_thread(producer, i + 1, c)
|
||||
|
||||
@@ -32,7 +32,6 @@ sys.path.insert(0, top_dir)
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.conductors import backends as conductors
|
||||
from taskflow import engines
|
||||
@@ -41,6 +40,7 @@ from taskflow.patterns import linear_flow
|
||||
from taskflow.persistence import backends as persistence
|
||||
from taskflow.persistence import models
|
||||
from taskflow import task
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# INTRO: This examples shows how a worker/producer can post desired work (jobs)
|
||||
@@ -215,10 +215,8 @@ def main():
|
||||
# This ensures that the needed backend setup/data directories/schema
|
||||
# upgrades and so on... exist before they are attempted to be used...
|
||||
conn.upgrade()
|
||||
fc1 = fake_client.FakeClient()
|
||||
# Done like this to share the same client storage location so the correct
|
||||
# zookeeper features work across clients...
|
||||
fc2 = fake_client.FakeClient(storage=fc1.storage)
|
||||
fc1 = kazoo_utils.make_client({})
|
||||
fc2 = kazoo_utils.make_client({})
|
||||
entities = [
|
||||
generate_reviewer(fc1, saver),
|
||||
generate_conductor(fc2, saver),
|
||||
|
||||
@@ -32,7 +32,11 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import testtools
|
||||
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
|
||||
ROOT_DIR = os.path.abspath(
|
||||
os.path.dirname(
|
||||
@@ -45,6 +49,9 @@ ROOT_DIR = os.path.abspath(
|
||||
UUID_RE = re.compile('XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
|
||||
.replace('X', '[0-9a-f]'))
|
||||
|
||||
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
||||
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
||||
|
||||
|
||||
def safe_filename(filename):
|
||||
# Translates a filename into a method name, returns falsey if not
|
||||
@@ -114,6 +121,7 @@ class ExampleAdderMeta(type):
|
||||
return type.__new__(cls, name, parents, dct)
|
||||
|
||||
|
||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||
class ExamplesTestCase(test.TestCase, metaclass=ExampleAdderMeta):
|
||||
"""Runs the examples, and checks the outputs against expected outputs."""
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
|
||||
import contextlib
|
||||
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.jobs import backends
|
||||
from taskflow.jobs.backends import impl_redis
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
@@ -35,18 +33,6 @@ class BackendFetchingTest(test.TestCase):
|
||||
with contextlib.closing(backends.fetch('test', conf)) as be:
|
||||
self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard)
|
||||
|
||||
def test_zk_entry_point_existing_client(self):
|
||||
existing_client = fake_client.FakeClient()
|
||||
conf = {
|
||||
'board': 'zookeeper',
|
||||
}
|
||||
kwargs = {
|
||||
'client': existing_client,
|
||||
}
|
||||
with contextlib.closing(backends.fetch('test', conf, **kwargs)) as be:
|
||||
self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard)
|
||||
self.assertIs(existing_client, be._client)
|
||||
|
||||
def test_redis_entry_point_text(self):
|
||||
conf = 'redis'
|
||||
with contextlib.closing(backends.fetch('test', conf)) as be:
|
||||
|
||||
@@ -20,8 +20,6 @@ from kazoo.recipe import watchers
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
import testtools
|
||||
from zake import fake_client
|
||||
from zake import utils as zake_utils
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
@@ -39,7 +37,6 @@ FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
|
||||
TEST_PATH_TPL = '/taskflow/board-test/%s'
|
||||
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
||||
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
||||
TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER
|
||||
LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
|
||||
|
||||
|
||||
@@ -118,43 +115,26 @@ class ZookeeperBoardTestMixin(base.BoardTestMixin):
|
||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||
class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
def create_board(self, persistence=None):
|
||||
|
||||
def cleanup_path(client, path):
|
||||
if not client.connected:
|
||||
return
|
||||
client.delete(path, recursive=True)
|
||||
|
||||
client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy())
|
||||
path = TEST_PATH_TPL % (uuidutils.generate_uuid())
|
||||
board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
self.addCleanup(self.close_client, client)
|
||||
self.addCleanup(cleanup_path, client, path)
|
||||
self.path = TEST_PATH_TPL % uuidutils.generate_uuid()
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'test-board', {'path': self.path},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
self.addCleanup(cleanup_path, client, self.path)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(kazoo_utils.finalize_client, client)
|
||||
return (client, board)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client, self.board = self.create_board()
|
||||
|
||||
|
||||
class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
def create_board(self, persistence=None):
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(self.close_client, client)
|
||||
return (client, board)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client, self.board = self.create_board()
|
||||
self.bad_paths = [self.board.path, self.board.trash_path]
|
||||
self.bad_paths.extend(zake_utils.partition_path(self.board.path))
|
||||
|
||||
def test_posting_owner_lost(self):
|
||||
|
||||
with base.connect_close(self.board):
|
||||
@@ -168,12 +148,11 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
# Forcefully delete the owner from the backend storage to make
|
||||
# sure the job becomes unclaimed (this may happen if some admin
|
||||
# manually deletes the lock).
|
||||
paths = list(self.client.storage.paths.items())
|
||||
for (path, value) in paths:
|
||||
if path in self.bad_paths:
|
||||
continue
|
||||
if path.endswith('lock'):
|
||||
value['data'] = misc.binary_encode(jsonutils.dumps({}))
|
||||
children = self.client.get_children(self.path)
|
||||
for p in children:
|
||||
if p.endswith(LOCK_POSTFIX):
|
||||
self.client.set(k_paths.join(self.path, p),
|
||||
misc.binary_encode(jsonutils.dumps({})))
|
||||
self.assertEqual(states.UNCLAIMED, j.state)
|
||||
|
||||
def test_posting_state_lock_lost(self):
|
||||
@@ -189,12 +168,10 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
# Forcefully delete the lock from the backend storage to make
|
||||
# sure the job becomes unclaimed (this may happen if some admin
|
||||
# manually deletes the lock).
|
||||
paths = list(self.client.storage.paths.items())
|
||||
for (path, value) in paths:
|
||||
if path in self.bad_paths:
|
||||
continue
|
||||
if path.endswith("lock"):
|
||||
self.client.storage.pop(path)
|
||||
children = self.client.get_children(self.path)
|
||||
for p in children:
|
||||
if p.endswith(LOCK_POSTFIX):
|
||||
self.client.delete(k_paths.join(self.path, p))
|
||||
self.assertEqual(states.UNCLAIMED, j.state)
|
||||
|
||||
def test_trashing_claimed_job(self):
|
||||
@@ -210,17 +187,8 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
with self.flush(self.client):
|
||||
self.board.trash(j, self.board.name)
|
||||
|
||||
trashed = []
|
||||
jobs = []
|
||||
paths = list(self.client.storage.paths.items())
|
||||
for (path, value) in paths:
|
||||
if path in self.bad_paths:
|
||||
continue
|
||||
if path.find(TRASH_FOLDER) > -1:
|
||||
trashed.append(path)
|
||||
elif (path.find(self.board._job_base) > -1
|
||||
and not path.endswith(LOCK_POSTFIX)):
|
||||
jobs.append(path)
|
||||
trashed = self.client.get_children(self.board.trash_path)
|
||||
jobs = self.client.get_children(self.path)
|
||||
|
||||
self.assertEqual(1, len(trashed))
|
||||
self.assertEqual(0, len(jobs))
|
||||
@@ -240,16 +208,11 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
|
||||
# Remove paths that got created due to the running process that we are
|
||||
# not interested in...
|
||||
paths = {}
|
||||
for (path, data) in self.client.storage.paths.items():
|
||||
if path in self.bad_paths:
|
||||
continue
|
||||
paths[path] = data
|
||||
|
||||
children = self.client.get_children(self.path)
|
||||
# Check the actual data that was posted.
|
||||
self.assertEqual(1, len(paths))
|
||||
path_key = list(paths.keys())[0]
|
||||
self.assertTrue(len(paths[path_key]['data']) > 0)
|
||||
self.assertEqual(1, len(children))
|
||||
child = self.client.get(k_paths.join(self.path, children[0]))
|
||||
self.assertTrue(len(child[0]) > 0)
|
||||
self.assertEqual({
|
||||
'uuid': posted_job.uuid,
|
||||
'name': posted_job.name,
|
||||
@@ -259,7 +222,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
},
|
||||
'priority': 'NORMAL',
|
||||
'details': {},
|
||||
}, jsonutils.loads(misc.binary_decode(paths[path_key]['data'])))
|
||||
}, jsonutils.loads(misc.binary_decode(child[0])))
|
||||
|
||||
def test_register_entity(self):
|
||||
conductor_name = "conductor-abc@localhost:4123"
|
||||
@@ -269,14 +232,12 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
with base.connect_close(self.board):
|
||||
self.board.register_entity(entity_instance)
|
||||
# Check '.entity' node has been created
|
||||
self.assertIn(self.board.entity_path, self.client.storage.paths)
|
||||
self.client.get_children(self.board.entity_path)
|
||||
|
||||
conductor_entity_path = k_paths.join(self.board.entity_path,
|
||||
'conductor',
|
||||
conductor_name)
|
||||
self.assertIn(conductor_entity_path, self.client.storage.paths)
|
||||
conductor_data = (
|
||||
self.client.storage.paths[conductor_entity_path]['data'])
|
||||
conductor_data = self.client.get(conductor_entity_path)[0]
|
||||
self.assertTrue(len(conductor_data) > 0)
|
||||
self.assertEqual({
|
||||
'name': conductor_name,
|
||||
@@ -291,47 +252,3 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
|
||||
self.assertRaises(excp.NotImplementedError,
|
||||
self.board.register_entity,
|
||||
entity_instance_2)
|
||||
|
||||
def test_connect_check_compatible(self):
|
||||
# Valid version
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'test-board', {'check_compatible': True},
|
||||
client=client)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(self.close_client, client)
|
||||
|
||||
with base.connect_close(board):
|
||||
pass
|
||||
|
||||
# Invalid version, no check
|
||||
client = fake_client.FakeClient(server_version=(3, 2, 0))
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'test-board', {'check_compatible': False},
|
||||
client=client)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(self.close_client, client)
|
||||
|
||||
with base.connect_close(board):
|
||||
pass
|
||||
|
||||
# Invalid version, check_compatible=True
|
||||
client = fake_client.FakeClient(server_version=(3, 2, 0))
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'test-board', {'check_compatible': True},
|
||||
client=client)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(self.close_client, client)
|
||||
|
||||
self.assertRaises(excp.IncompatibleVersion, board.connect)
|
||||
|
||||
# Invalid version, check_compatible='False'
|
||||
client = fake_client.FakeClient(server_version=(3, 2, 0))
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'test-board', {'check_compatible': 'False'},
|
||||
client=client)
|
||||
self.addCleanup(board.close)
|
||||
self.addCleanup(self.close_client, client)
|
||||
|
||||
with base.connect_close(board):
|
||||
pass
|
||||
|
||||
@@ -17,7 +17,6 @@ import contextlib
|
||||
from kazoo import exceptions as kazoo_exceptions
|
||||
from oslo_utils import uuidutils
|
||||
import testtools
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import backends
|
||||
@@ -74,25 +73,3 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin):
|
||||
conf = {'connection': 'zookeeper:'}
|
||||
with contextlib.closing(backends.fetch(conf)) as be:
|
||||
self.assertIsInstance(be, impl_zookeeper.ZkBackend)
|
||||
|
||||
|
||||
@testtools.skipIf(_ZOOKEEPER_AVAILABLE, 'zookeeper is available')
|
||||
class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin):
|
||||
def _get_connection(self):
|
||||
return self._backend.get_connection()
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
conf = {
|
||||
"path": "/taskflow",
|
||||
}
|
||||
self.client = fake_client.FakeClient()
|
||||
self.client.start()
|
||||
self._backend = impl_zookeeper.ZkBackend(conf, client=self.client)
|
||||
conn = self._backend.get_connection()
|
||||
conn.upgrade()
|
||||
|
||||
def test_zk_persistence_entry_point(self):
|
||||
conf = {'connection': 'zookeeper:'}
|
||||
with contextlib.closing(backends.fetch(conf)) as be:
|
||||
self.assertIsInstance(be, impl_zookeeper.ZkBackend)
|
||||
|
||||
@@ -17,8 +17,9 @@ import contextlib
|
||||
import threading
|
||||
|
||||
import futurist
|
||||
from oslo_utils import uuidutils
|
||||
import testscenarios
|
||||
from zake import fake_client
|
||||
import testtools
|
||||
|
||||
from taskflow.conductors import backends
|
||||
from taskflow import engines
|
||||
@@ -29,17 +30,14 @@ from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import states as st
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import persistence_utils as pu
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def close_many(*closeables):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for c in closeables:
|
||||
c.close()
|
||||
TEST_PATH_TPL = '/taskflow/conductor-test/%s'
|
||||
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
||||
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
||||
|
||||
|
||||
def test_factory(blowup):
|
||||
@@ -69,10 +67,10 @@ def single_factory():
|
||||
|
||||
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
['board', 'persistence', 'conductor'])
|
||||
|
||||
|
||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||
class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
test_utils.EngineTestBase, test.TestCase):
|
||||
scenarios = [
|
||||
@@ -88,30 +86,40 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
]
|
||||
|
||||
def make_components(self):
|
||||
client = fake_client.FakeClient()
|
||||
def cleanup_path(client, path):
|
||||
if not client.connected:
|
||||
return
|
||||
client.delete(path, recursive=True)
|
||||
|
||||
client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy())
|
||||
path = TEST_PATH_TPL % uuidutils.generate_uuid()
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'testing',
|
||||
{'path': path},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
|
||||
self.addCleanup(cleanup_path, client, path)
|
||||
self.addCleanup(kazoo_utils.finalize_client, client)
|
||||
|
||||
conductor_kwargs = self.conductor_kwargs.copy()
|
||||
conductor_kwargs['persistence'] = persistence
|
||||
conductor = backends.fetch(self.kind, 'testing', board,
|
||||
**conductor_kwargs)
|
||||
return ComponentBundle(board, client, persistence, conductor)
|
||||
return ComponentBundle(board, persistence, conductor)
|
||||
|
||||
def test_connection(self):
|
||||
components = self.make_components()
|
||||
components.conductor.connect()
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
self.assertTrue(components.board.connected)
|
||||
self.assertTrue(components.client.connected)
|
||||
self.assertFalse(components.board.connected)
|
||||
self.assertFalse(components.client.connected)
|
||||
|
||||
def test_run_empty(self):
|
||||
components = self.make_components()
|
||||
components.conductor.connect()
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
components.conductor.stop()
|
||||
@@ -143,7 +151,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
on_job_consumed)
|
||||
components.conductor.notifier.register("job_abandoned",
|
||||
on_job_abandoned)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence)
|
||||
@@ -175,7 +183,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
consumed_event.set()
|
||||
|
||||
components.board.notifier.register(base.REMOVAL, on_consume)
|
||||
with close_many(components.client, components.conductor):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(
|
||||
lambda: components.conductor.run(max_dispatches=5))
|
||||
t.start()
|
||||
@@ -217,7 +225,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
on_job_consumed)
|
||||
components.conductor.notifier.register("job_abandoned",
|
||||
on_job_abandoned)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence)
|
||||
@@ -249,7 +257,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
consumed_event.set()
|
||||
|
||||
components.board.notifier.register(base.REMOVAL, on_consume)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence)
|
||||
@@ -281,7 +289,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
store = {'x': True, 'y': False, 'z': None}
|
||||
|
||||
components.board.notifier.register(base.REMOVAL, on_consume)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence)
|
||||
@@ -314,7 +322,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
store = {'x': True, 'y': False, 'z': None}
|
||||
|
||||
components.board.notifier.register(base.REMOVAL, on_consume)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence,
|
||||
@@ -348,7 +356,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
job_store = {'z': None}
|
||||
|
||||
components.board.notifier.register(base.REMOVAL, on_consume)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence,
|
||||
@@ -400,7 +408,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
on_job_abandoned)
|
||||
components.conductor.notifier.register("running_start",
|
||||
on_running_start)
|
||||
with close_many(components.conductor, components.client):
|
||||
with contextlib.closing(components.conductor):
|
||||
t = threading_utils.daemon_thread(components.conductor.run)
|
||||
t.start()
|
||||
lb, fd = pu.temporary_flow_detail(components.persistence)
|
||||
@@ -418,13 +426,14 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
|
||||
self.assertFalse(consumed_event.is_set())
|
||||
|
||||
|
||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||
class NonBlockingExecutorTest(test.TestCase):
|
||||
def test_bad_wait_timeout(self):
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'testing',
|
||||
test_utils.ZK_TEST_CONFIG.copy(),
|
||||
persistence=persistence)
|
||||
self.assertRaises(ValueError,
|
||||
backends.fetch,
|
||||
'nonblocking', 'testing', board,
|
||||
@@ -433,10 +442,10 @@ class NonBlockingExecutorTest(test.TestCase):
|
||||
|
||||
def test_bad_factory(self):
|
||||
persistence = impl_memory.MemoryBackend()
|
||||
client = fake_client.FakeClient()
|
||||
board = impl_zookeeper.ZookeeperJobBoard('testing', {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
board = impl_zookeeper.ZookeeperJobBoard(
|
||||
'testing',
|
||||
test_utils.ZK_TEST_CONFIG.copy(),
|
||||
persistence=persistence)
|
||||
self.assertRaises(ValueError,
|
||||
backends.fetch,
|
||||
'nonblocking', 'testing', board,
|
||||
|
||||
@@ -19,11 +19,12 @@ import time
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import reflection
|
||||
from zake import fake_client
|
||||
import testtools
|
||||
|
||||
import taskflow.engines
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.jobs import backends as jobs
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
from taskflow.listeners import claims
|
||||
from taskflow.listeners import logging as logging_listeners
|
||||
from taskflow.listeners import timing
|
||||
@@ -34,10 +35,15 @@ from taskflow import task
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils
|
||||
|
||||
|
||||
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
|
||||
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
|
||||
|
||||
|
||||
_LOG_LEVELS = frozenset([
|
||||
logging.CRITICAL,
|
||||
logging.DEBUG,
|
||||
@@ -70,6 +76,7 @@ class EngineMakerMixin:
|
||||
return e
|
||||
|
||||
|
||||
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
|
||||
class TestClaimListener(test.TestCase, EngineMakerMixin):
|
||||
def _make_dummy_flow(self, count):
|
||||
f = lf.Flow('root')
|
||||
@@ -79,7 +86,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = fake_client.FakeClient()
|
||||
self.client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy())
|
||||
self.addCleanup(self.client.stop)
|
||||
self.board = jobs.fetch('test', 'zookeeper', client=self.client)
|
||||
self.addCleanup(self.board.close)
|
||||
@@ -92,7 +99,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
|
||||
if children:
|
||||
arrived.set()
|
||||
|
||||
self.client.ChildrenWatch("/taskflow", set_on_children)
|
||||
self.client.ChildrenWatch("/taskflow/jobs", set_on_children)
|
||||
job = self.board.post('test-1')
|
||||
|
||||
# Make sure it arrived and claimed before doing further work...
|
||||
@@ -105,22 +112,20 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
|
||||
return job
|
||||
|
||||
def _destroy_locks(self):
|
||||
children = self.client.storage.get_children("/taskflow",
|
||||
only_direct=False)
|
||||
children = self.client.get_children("/taskflow/jobs")
|
||||
removed = 0
|
||||
for p, data in children.items():
|
||||
for p in children:
|
||||
if p.endswith(".lock"):
|
||||
self.client.storage.pop(p)
|
||||
self.client.delete("/taskflow/jobs/" + p)
|
||||
removed += 1
|
||||
return removed
|
||||
|
||||
def _change_owner(self, new_owner):
|
||||
children = self.client.storage.get_children("/taskflow",
|
||||
only_direct=False)
|
||||
children = self.client.get_children("/taskflow/jobs")
|
||||
altered = 0
|
||||
for p, data in children.items():
|
||||
for p in children:
|
||||
if p.endswith(".lock"):
|
||||
self.client.set(p, misc.binary_encode(
|
||||
self.client.set("/taskflow/jobs/" + p, misc.binary_encode(
|
||||
jsonutils.dumps({'owner': new_owner})))
|
||||
altered += 1
|
||||
return altered
|
||||
|
||||
@@ -57,10 +57,10 @@ def wrap_all_failures():
|
||||
raise exceptions.WrappedFailure([failure.Failure()])
|
||||
|
||||
|
||||
def zookeeper_available(min_version, timeout=3):
|
||||
def zookeeper_available(min_version, timeout=30):
|
||||
client = kazoo_utils.make_client(ZK_TEST_CONFIG.copy())
|
||||
try:
|
||||
# NOTE(imelnikov): 3 seconds we should be enough for localhost
|
||||
# NOTE(imelnikov): 30 seconds we should be enough for localhost
|
||||
client.start(timeout=float(timeout))
|
||||
if min_version:
|
||||
zk_ver = client.server_version()
|
||||
|
||||
@@ -25,7 +25,6 @@ PyMySQL>=0.7.6 # MIT License
|
||||
psycopg2>=2.8.0 # LGPL/ZPL
|
||||
|
||||
# test
|
||||
zake>=0.1.6 # Apache-2.0
|
||||
pydotplus>=2.0.2 # MIT License
|
||||
oslotest>=3.2.0 # Apache-2.0
|
||||
testtools>=2.2.0 # MIT
|
||||
|
||||
Reference in New Issue
Block a user