diff --git a/.zuul.yaml b/.zuul.yaml index 6eb155648..0113708fb 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -13,6 +13,13 @@ - ^releasenotes/.*$ - ^\.pre-commit-config\.yaml$ +- job: + name: taskflow-functional-zookeeper + parent: taskflow-functional + vars: + tox_environment: + PIFPAF_DAEMON: zookeeper + - job: name: taskflow-functional-redis parent: taskflow-functional @@ -39,9 +46,11 @@ - release-notes-jobs-python3 check: jobs: + - taskflow-functional-zookeeper - taskflow-functional-redis - taskflow-functional-etcd gate: jobs: + - taskflow-functional-zookeeper - taskflow-functional-redis - taskflow-functional-etcd diff --git a/bindep.txt b/bindep.txt index e2d278974..c9d9729d1 100644 --- a/bindep.txt +++ b/bindep.txt @@ -19,3 +19,5 @@ libpq-dev [platform:dpkg] redis [platform:rpm tests-functional-redis] redis-server [platform:dpkg tests-functional-redis] redis-sentinel [platform:dpkg tests-functional-redis] +zookeeperd [platform:dpkg tests-functional-zookeeper] +# NOTE(tkajinam): zookeeper package is not available in CentOS/Fedora diff --git a/playbooks/tests/functional/Debian.yaml b/playbooks/tests/functional/Debian.yaml index 47e11046c..3f37a404e 100644 --- a/playbooks/tests/functional/Debian.yaml +++ b/playbooks/tests/functional/Debian.yaml @@ -4,3 +4,5 @@ backend_services_map: - redis-server - redis-sentinel etcd: [] + zookeeper: + - zookeeper diff --git a/playbooks/tests/functional/RedHat.yaml b/playbooks/tests/functional/RedHat.yaml index 37fe5a668..604a3c4cb 100644 --- a/playbooks/tests/functional/RedHat.yaml +++ b/playbooks/tests/functional/RedHat.yaml @@ -4,3 +4,4 @@ backend_services_map: - redis - redis-sentinel etcd: [] + zookeeper: [] diff --git a/playbooks/tests/functional/pre.yml b/playbooks/tests/functional/pre.yml index 5e4e02afb..fbc46fd7b 100644 --- a/playbooks/tests/functional/pre.yml +++ b/playbooks/tests/functional/pre.yml @@ -18,4 +18,10 @@ enabled: no become: yes loop: "{{ backend_services_map[taskflow_backend_daemon] }}" - + - name: Remove ZOO_LOG_DIR + lineinfile: + path: /etc/zookeeper/conf/environment + regexp: '^ZOO_LOG_DIR=.*' + state: absent + become: yes + when: taskflow_backend_daemon == 'zookeeper' diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py index aa197cfee..b7f7b374e 100644 --- a/taskflow/examples/jobboard_produce_consume_colors.py +++ b/taskflow/examples/jobboard_produce_consume_colors.py @@ -28,10 +28,9 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) -from zake import fake_client - from taskflow import exceptions as excp from taskflow.jobs import backends +from taskflow.utils import kazoo_utils from taskflow.utils import threading_utils # In this example we show how a jobboard can be used to post work for other @@ -155,7 +154,8 @@ def main(): except ImportError: pass - with contextlib.closing(fake_client.FakeClient()) as c: + client = kazoo_utils.make_client({}) + with contextlib.closing(client) as c: created = [] for i in range(0, PRODUCERS): p = threading_utils.daemon_thread(producer, i + 1, c) diff --git a/taskflow/examples/tox_conductor.py b/taskflow/examples/tox_conductor.py index ed79c3bc2..159ee8b12 100644 --- a/taskflow/examples/tox_conductor.py +++ b/taskflow/examples/tox_conductor.py @@ -32,7 +32,6 @@ sys.path.insert(0, top_dir) from oslo_utils import timeutils from oslo_utils import uuidutils -from zake import fake_client from taskflow.conductors import backends as conductors from taskflow import engines @@ -41,6 +40,7 @@ from taskflow.patterns import linear_flow from taskflow.persistence import backends as persistence from taskflow.persistence import models from taskflow import task +from taskflow.utils import kazoo_utils from taskflow.utils import threading_utils # INTRO: This examples shows how a worker/producer can post desired work (jobs) @@ -215,10 +215,8 @@ def main(): # This ensures that the needed backend setup/data directories/schema # upgrades and so on... exist before they are attempted to be used... conn.upgrade() - fc1 = fake_client.FakeClient() - # Done like this to share the same client storage location so the correct - # zookeeper features work across clients... - fc2 = fake_client.FakeClient(storage=fc1.storage) + fc1 = kazoo_utils.make_client({}) + fc2 = kazoo_utils.make_client({}) entities = [ generate_reviewer(fc1, saver), generate_conductor(fc2, saver), diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py index a8bdbfdd6..75f07816d 100644 --- a/taskflow/tests/test_examples.py +++ b/taskflow/tests/test_examples.py @@ -32,7 +32,11 @@ import re import subprocess import sys +import testtools + +from taskflow.jobs.backends import impl_zookeeper from taskflow import test +from taskflow.tests import utils as test_utils ROOT_DIR = os.path.abspath( os.path.dirname( @@ -45,6 +49,9 @@ ROOT_DIR = os.path.abspath( UUID_RE = re.compile('XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX' .replace('X', '[0-9a-f]')) +ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( + impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) + def safe_filename(filename): # Translates a filename into a method name, returns falsey if not @@ -114,6 +121,7 @@ class ExampleAdderMeta(type): return type.__new__(cls, name, parents, dct) +@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class ExamplesTestCase(test.TestCase, metaclass=ExampleAdderMeta): """Runs the examples, and checks the outputs against expected outputs.""" diff --git a/taskflow/tests/unit/jobs/test_entrypoint.py b/taskflow/tests/unit/jobs/test_entrypoint.py index 656589233..d96e33872 100644 --- a/taskflow/tests/unit/jobs/test_entrypoint.py +++ b/taskflow/tests/unit/jobs/test_entrypoint.py @@ -14,8 +14,6 @@ import contextlib -from zake import fake_client - from taskflow.jobs import backends from taskflow.jobs.backends import impl_redis from taskflow.jobs.backends import impl_zookeeper @@ -35,18 +33,6 @@ class BackendFetchingTest(test.TestCase): with contextlib.closing(backends.fetch('test', conf)) as be: self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard) - def test_zk_entry_point_existing_client(self): - existing_client = fake_client.FakeClient() - conf = { - 'board': 'zookeeper', - } - kwargs = { - 'client': existing_client, - } - with contextlib.closing(backends.fetch('test', conf, **kwargs)) as be: - self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard) - self.assertIs(existing_client, be._client) - def test_redis_entry_point_text(self): conf = 'redis' with contextlib.closing(backends.fetch('test', conf)) as be: diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 3cb0fddbd..123bb0346 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -20,8 +20,6 @@ from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import uuidutils import testtools -from zake import fake_client -from zake import utils as zake_utils from taskflow import exceptions as excp from taskflow.jobs.backends import impl_zookeeper @@ -39,7 +37,6 @@ FLUSH_PATH_TPL = '/taskflow/flush-test/%s' TEST_PATH_TPL = '/taskflow/board-test/%s' ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) -TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX @@ -118,43 +115,26 @@ class ZookeeperBoardTestMixin(base.BoardTestMixin): @testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin): def create_board(self, persistence=None): - def cleanup_path(client, path): if not client.connected: return client.delete(path, recursive=True) client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy()) - path = TEST_PATH_TPL % (uuidutils.generate_uuid()) - board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path}, - client=client, - persistence=persistence) - self.addCleanup(self.close_client, client) - self.addCleanup(cleanup_path, client, path) + self.path = TEST_PATH_TPL % uuidutils.generate_uuid() + board = impl_zookeeper.ZookeeperJobBoard( + 'test-board', {'path': self.path}, + client=client, + persistence=persistence) + self.addCleanup(cleanup_path, client, self.path) self.addCleanup(board.close) + self.addCleanup(kazoo_utils.finalize_client, client) return (client, board) def setUp(self): super().setUp() self.client, self.board = self.create_board() - -class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): - def create_board(self, persistence=None): - client = fake_client.FakeClient() - board = impl_zookeeper.ZookeeperJobBoard('test-board', {}, - client=client, - persistence=persistence) - self.addCleanup(board.close) - self.addCleanup(self.close_client, client) - return (client, board) - - def setUp(self): - super().setUp() - self.client, self.board = self.create_board() - self.bad_paths = [self.board.path, self.board.trash_path] - self.bad_paths.extend(zake_utils.partition_path(self.board.path)) - def test_posting_owner_lost(self): with base.connect_close(self.board): @@ -168,12 +148,11 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Forcefully delete the owner from the backend storage to make # sure the job becomes unclaimed (this may happen if some admin # manually deletes the lock). - paths = list(self.client.storage.paths.items()) - for (path, value) in paths: - if path in self.bad_paths: - continue - if path.endswith('lock'): - value['data'] = misc.binary_encode(jsonutils.dumps({})) + children = self.client.get_children(self.path) + for p in children: + if p.endswith(LOCK_POSTFIX): + self.client.set(k_paths.join(self.path, p), + misc.binary_encode(jsonutils.dumps({}))) self.assertEqual(states.UNCLAIMED, j.state) def test_posting_state_lock_lost(self): @@ -189,12 +168,10 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Forcefully delete the lock from the backend storage to make # sure the job becomes unclaimed (this may happen if some admin # manually deletes the lock). - paths = list(self.client.storage.paths.items()) - for (path, value) in paths: - if path in self.bad_paths: - continue - if path.endswith("lock"): - self.client.storage.pop(path) + children = self.client.get_children(self.path) + for p in children: + if p.endswith(LOCK_POSTFIX): + self.client.delete(k_paths.join(self.path, p)) self.assertEqual(states.UNCLAIMED, j.state) def test_trashing_claimed_job(self): @@ -210,17 +187,8 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): with self.flush(self.client): self.board.trash(j, self.board.name) - trashed = [] - jobs = [] - paths = list(self.client.storage.paths.items()) - for (path, value) in paths: - if path in self.bad_paths: - continue - if path.find(TRASH_FOLDER) > -1: - trashed.append(path) - elif (path.find(self.board._job_base) > -1 - and not path.endswith(LOCK_POSTFIX)): - jobs.append(path) + trashed = self.client.get_children(self.board.trash_path) + jobs = self.client.get_children(self.path) self.assertEqual(1, len(trashed)) self.assertEqual(0, len(jobs)) @@ -240,16 +208,11 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): # Remove paths that got created due to the running process that we are # not interested in... - paths = {} - for (path, data) in self.client.storage.paths.items(): - if path in self.bad_paths: - continue - paths[path] = data - + children = self.client.get_children(self.path) # Check the actual data that was posted. - self.assertEqual(1, len(paths)) - path_key = list(paths.keys())[0] - self.assertTrue(len(paths[path_key]['data']) > 0) + self.assertEqual(1, len(children)) + child = self.client.get(k_paths.join(self.path, children[0])) + self.assertTrue(len(child[0]) > 0) self.assertEqual({ 'uuid': posted_job.uuid, 'name': posted_job.name, @@ -259,7 +222,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): }, 'priority': 'NORMAL', 'details': {}, - }, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) + }, jsonutils.loads(misc.binary_decode(child[0]))) def test_register_entity(self): conductor_name = "conductor-abc@localhost:4123" @@ -269,14 +232,12 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): with base.connect_close(self.board): self.board.register_entity(entity_instance) # Check '.entity' node has been created - self.assertIn(self.board.entity_path, self.client.storage.paths) + self.client.get_children(self.board.entity_path) conductor_entity_path = k_paths.join(self.board.entity_path, 'conductor', conductor_name) - self.assertIn(conductor_entity_path, self.client.storage.paths) - conductor_data = ( - self.client.storage.paths[conductor_entity_path]['data']) + conductor_data = self.client.get(conductor_entity_path)[0] self.assertTrue(len(conductor_data) > 0) self.assertEqual({ 'name': conductor_name, @@ -291,47 +252,3 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): self.assertRaises(excp.NotImplementedError, self.board.register_entity, entity_instance_2) - - def test_connect_check_compatible(self): - # Valid version - client = fake_client.FakeClient() - board = impl_zookeeper.ZookeeperJobBoard( - 'test-board', {'check_compatible': True}, - client=client) - self.addCleanup(board.close) - self.addCleanup(self.close_client, client) - - with base.connect_close(board): - pass - - # Invalid version, no check - client = fake_client.FakeClient(server_version=(3, 2, 0)) - board = impl_zookeeper.ZookeeperJobBoard( - 'test-board', {'check_compatible': False}, - client=client) - self.addCleanup(board.close) - self.addCleanup(self.close_client, client) - - with base.connect_close(board): - pass - - # Invalid version, check_compatible=True - client = fake_client.FakeClient(server_version=(3, 2, 0)) - board = impl_zookeeper.ZookeeperJobBoard( - 'test-board', {'check_compatible': True}, - client=client) - self.addCleanup(board.close) - self.addCleanup(self.close_client, client) - - self.assertRaises(excp.IncompatibleVersion, board.connect) - - # Invalid version, check_compatible='False' - client = fake_client.FakeClient(server_version=(3, 2, 0)) - board = impl_zookeeper.ZookeeperJobBoard( - 'test-board', {'check_compatible': 'False'}, - client=client) - self.addCleanup(board.close) - self.addCleanup(self.close_client, client) - - with base.connect_close(board): - pass diff --git a/taskflow/tests/unit/persistence/test_zk_persistence.py b/taskflow/tests/unit/persistence/test_zk_persistence.py index afe80bd7f..07ca692a9 100644 --- a/taskflow/tests/unit/persistence/test_zk_persistence.py +++ b/taskflow/tests/unit/persistence/test_zk_persistence.py @@ -17,7 +17,6 @@ import contextlib from kazoo import exceptions as kazoo_exceptions from oslo_utils import uuidutils import testtools -from zake import fake_client from taskflow import exceptions as exc from taskflow.persistence import backends @@ -74,25 +73,3 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): conf = {'connection': 'zookeeper:'} with contextlib.closing(backends.fetch(conf)) as be: self.assertIsInstance(be, impl_zookeeper.ZkBackend) - - -@testtools.skipIf(_ZOOKEEPER_AVAILABLE, 'zookeeper is available') -class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin): - def _get_connection(self): - return self._backend.get_connection() - - def setUp(self): - super().setUp() - conf = { - "path": "/taskflow", - } - self.client = fake_client.FakeClient() - self.client.start() - self._backend = impl_zookeeper.ZkBackend(conf, client=self.client) - conn = self._backend.get_connection() - conn.upgrade() - - def test_zk_persistence_entry_point(self): - conf = {'connection': 'zookeeper:'} - with contextlib.closing(backends.fetch(conf)) as be: - self.assertIsInstance(be, impl_zookeeper.ZkBackend) diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py index 645a95d2f..23acdd209 100644 --- a/taskflow/tests/unit/test_conductors.py +++ b/taskflow/tests/unit/test_conductors.py @@ -17,8 +17,9 @@ import contextlib import threading import futurist +from oslo_utils import uuidutils import testscenarios -from zake import fake_client +import testtools from taskflow.conductors import backends from taskflow import engines @@ -29,17 +30,14 @@ from taskflow.persistence.backends import impl_memory from taskflow import states as st from taskflow import test from taskflow.tests import utils as test_utils +from taskflow.utils import kazoo_utils from taskflow.utils import persistence_utils as pu from taskflow.utils import threading_utils -@contextlib.contextmanager -def close_many(*closeables): - try: - yield - finally: - for c in closeables: - c.close() +TEST_PATH_TPL = '/taskflow/conductor-test/%s' +ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( + impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) def test_factory(blowup): @@ -69,10 +67,10 @@ def single_factory(): ComponentBundle = collections.namedtuple('ComponentBundle', - ['board', 'client', - 'persistence', 'conductor']) + ['board', 'persistence', 'conductor']) +@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class ManyConductorTest(testscenarios.TestWithScenarios, test_utils.EngineTestBase, test.TestCase): scenarios = [ @@ -88,30 +86,40 @@ class ManyConductorTest(testscenarios.TestWithScenarios, ] def make_components(self): - client = fake_client.FakeClient() + def cleanup_path(client, path): + if not client.connected: + return + client.delete(path, recursive=True) + + client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy()) + path = TEST_PATH_TPL % uuidutils.generate_uuid() persistence = impl_memory.MemoryBackend() - board = impl_zookeeper.ZookeeperJobBoard('testing', {}, - client=client, - persistence=persistence) + board = impl_zookeeper.ZookeeperJobBoard( + 'testing', + {'path': path}, + client=client, + persistence=persistence) + + self.addCleanup(cleanup_path, client, path) + self.addCleanup(kazoo_utils.finalize_client, client) + conductor_kwargs = self.conductor_kwargs.copy() conductor_kwargs['persistence'] = persistence conductor = backends.fetch(self.kind, 'testing', board, **conductor_kwargs) - return ComponentBundle(board, client, persistence, conductor) + return ComponentBundle(board, persistence, conductor) def test_connection(self): components = self.make_components() components.conductor.connect() - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): self.assertTrue(components.board.connected) - self.assertTrue(components.client.connected) self.assertFalse(components.board.connected) - self.assertFalse(components.client.connected) def test_run_empty(self): components = self.make_components() components.conductor.connect() - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() components.conductor.stop() @@ -143,7 +151,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, on_job_consumed) components.conductor.notifier.register("job_abandoned", on_job_abandoned) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) @@ -175,7 +183,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, consumed_event.set() components.board.notifier.register(base.REMOVAL, on_consume) - with close_many(components.client, components.conductor): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread( lambda: components.conductor.run(max_dispatches=5)) t.start() @@ -217,7 +225,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, on_job_consumed) components.conductor.notifier.register("job_abandoned", on_job_abandoned) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) @@ -249,7 +257,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, consumed_event.set() components.board.notifier.register(base.REMOVAL, on_consume) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) @@ -281,7 +289,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, store = {'x': True, 'y': False, 'z': None} components.board.notifier.register(base.REMOVAL, on_consume) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) @@ -314,7 +322,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, store = {'x': True, 'y': False, 'z': None} components.board.notifier.register(base.REMOVAL, on_consume) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence, @@ -348,7 +356,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, job_store = {'z': None} components.board.notifier.register(base.REMOVAL, on_consume) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence, @@ -400,7 +408,7 @@ class ManyConductorTest(testscenarios.TestWithScenarios, on_job_abandoned) components.conductor.notifier.register("running_start", on_running_start) - with close_many(components.conductor, components.client): + with contextlib.closing(components.conductor): t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) @@ -418,13 +426,14 @@ class ManyConductorTest(testscenarios.TestWithScenarios, self.assertFalse(consumed_event.is_set()) +@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class NonBlockingExecutorTest(test.TestCase): def test_bad_wait_timeout(self): persistence = impl_memory.MemoryBackend() - client = fake_client.FakeClient() - board = impl_zookeeper.ZookeeperJobBoard('testing', {}, - client=client, - persistence=persistence) + board = impl_zookeeper.ZookeeperJobBoard( + 'testing', + test_utils.ZK_TEST_CONFIG.copy(), + persistence=persistence) self.assertRaises(ValueError, backends.fetch, 'nonblocking', 'testing', board, @@ -433,10 +442,10 @@ class NonBlockingExecutorTest(test.TestCase): def test_bad_factory(self): persistence = impl_memory.MemoryBackend() - client = fake_client.FakeClient() - board = impl_zookeeper.ZookeeperJobBoard('testing', {}, - client=client, - persistence=persistence) + board = impl_zookeeper.ZookeeperJobBoard( + 'testing', + test_utils.ZK_TEST_CONFIG.copy(), + persistence=persistence) self.assertRaises(ValueError, backends.fetch, 'nonblocking', 'testing', board, diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 10156f55c..ac11910af 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -19,11 +19,12 @@ import time from oslo_serialization import jsonutils from oslo_utils import reflection -from zake import fake_client +import testtools import taskflow.engines from taskflow import exceptions as exc from taskflow.jobs import backends as jobs +from taskflow.jobs.backends import impl_zookeeper from taskflow.listeners import claims from taskflow.listeners import logging as logging_listeners from taskflow.listeners import timing @@ -34,10 +35,15 @@ from taskflow import task from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils +from taskflow.utils import kazoo_utils from taskflow.utils import misc from taskflow.utils import persistence_utils +ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( + impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) + + _LOG_LEVELS = frozenset([ logging.CRITICAL, logging.DEBUG, @@ -70,6 +76,7 @@ class EngineMakerMixin: return e +@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class TestClaimListener(test.TestCase, EngineMakerMixin): def _make_dummy_flow(self, count): f = lf.Flow('root') @@ -79,7 +86,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): def setUp(self): super().setUp() - self.client = fake_client.FakeClient() + self.client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy()) self.addCleanup(self.client.stop) self.board = jobs.fetch('test', 'zookeeper', client=self.client) self.addCleanup(self.board.close) @@ -92,7 +99,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): if children: arrived.set() - self.client.ChildrenWatch("/taskflow", set_on_children) + self.client.ChildrenWatch("/taskflow/jobs", set_on_children) job = self.board.post('test-1') # Make sure it arrived and claimed before doing further work... @@ -105,22 +112,20 @@ class TestClaimListener(test.TestCase, EngineMakerMixin): return job def _destroy_locks(self): - children = self.client.storage.get_children("/taskflow", - only_direct=False) + children = self.client.get_children("/taskflow/jobs") removed = 0 - for p, data in children.items(): + for p in children: if p.endswith(".lock"): - self.client.storage.pop(p) + self.client.delete("/taskflow/jobs/" + p) removed += 1 return removed def _change_owner(self, new_owner): - children = self.client.storage.get_children("/taskflow", - only_direct=False) + children = self.client.get_children("/taskflow/jobs") altered = 0 - for p, data in children.items(): + for p in children: if p.endswith(".lock"): - self.client.set(p, misc.binary_encode( + self.client.set("/taskflow/jobs/" + p, misc.binary_encode( jsonutils.dumps({'owner': new_owner}))) altered += 1 return altered diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index a495535d3..fe755677d 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -57,10 +57,10 @@ def wrap_all_failures(): raise exceptions.WrappedFailure([failure.Failure()]) -def zookeeper_available(min_version, timeout=3): +def zookeeper_available(min_version, timeout=30): client = kazoo_utils.make_client(ZK_TEST_CONFIG.copy()) try: - # NOTE(imelnikov): 3 seconds we should be enough for localhost + # NOTE(imelnikov): 30 seconds we should be enough for localhost client.start(timeout=float(timeout)) if min_version: zk_ver = client.server_version() diff --git a/test-requirements.txt b/test-requirements.txt index f6be06590..afe63abbc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -25,7 +25,6 @@ PyMySQL>=0.7.6 # MIT License psycopg2>=2.8.0 # LGPL/ZPL # test -zake>=0.1.6 # Apache-2.0 pydotplus>=2.0.2 # MIT License oslotest>=3.2.0 # Apache-2.0 testtools>=2.2.0 # MIT