diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py new file mode 100644 index 00000000..6995870f --- /dev/null +++ b/taskflow/tests/unit/jobs/base.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import mock +import threading +import time + +from kazoo.recipe import watchers + +from taskflow import exceptions as excp +from taskflow.openstack.common import uuidutils +from taskflow.persistence.backends import impl_dir +from taskflow import states +from taskflow.utils import misc +from taskflow.utils import persistence_utils as p_utils + + +@contextlib.contextmanager +def connect_close(*args): + try: + for a in args: + a.connect() + yield + finally: + for a in args: + a.close() + + +@contextlib.contextmanager +def flush(client, path=None): + # This uses the linearity guarantee of zookeeper (and associated libraries) + # to create a temporary node, wait until a watcher notifies it's created, + # then yield back for more work, and then at the end of that work delete + # the created node. This ensures that the operations done in the yield + # of this context manager will be applied and all watchers will have fired + # before this context manager exits. + if not path: + path = "/tmp-%s" % uuidutils.generate_uuid() + created = threading.Event() + deleted = threading.Event() + + def on_created(data, stat): + if stat is not None: + created.set() + return False # cause this watcher to cease to exist + + def on_deleted(data, stat): + if stat is None: + deleted.set() + return False # cause this watcher to cease to exist + + watchers.DataWatch(client, path, func=on_created) + client.create(path) + created.wait() + try: + yield + finally: + watchers.DataWatch(client, path, func=on_deleted) + client.delete(path, recursive=True) + deleted.wait() + + +class BoardTestMixin(object): + def test_connect(self): + self.assertFalse(self.board.connected) + with connect_close(self.board): + self.assertTrue(self.board.connected) + + @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." + "millis_to_datetime") + def test_posting_dates(self, mock_dt): + epoch = misc.millis_to_datetime(0) + mock_dt.return_value = epoch + + with connect_close(self.board): + j = self.board.post('test', p_utils.temporary_log_book()) + self.assertEqual(epoch, j.created_on) + self.assertEqual(epoch, j.last_modified) + + self.assertTrue(mock_dt.called) + + def test_board_iter(self): + with connect_close(self.board): + it = self.board.iterjobs() + self.assertEqual(it.board, self.board) + self.assertFalse(it.only_unclaimed) + self.assertFalse(it.ensure_fresh) + + def test_board_iter_empty(self): + with connect_close(self.board): + jobs_found = list(self.board.iterjobs()) + self.assertEqual([], jobs_found) + + def test_fresh_iter(self): + with connect_close(self.board): + book = p_utils.temporary_log_book() + self.board.post('test', book) + jobs = list(self.board.iterjobs(ensure_fresh=True)) + self.assertEqual(1, len(jobs)) + + def test_wait_timeout(self): + with connect_close(self.board): + self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) + + def test_wait_arrival(self): + ev = threading.Event() + jobs = [] + + def poster(wait_post=0.2): + ev.wait() # wait until the waiter is active + time.sleep(wait_post) + self.board.post('test', p_utils.temporary_log_book()) + + def waiter(): + ev.set() + it = self.board.wait() + jobs.extend(it) + + with connect_close(self.board): + t1 = threading.Thread(target=poster) + t1.daemon = True + t1.start() + t2 = threading.Thread(target=waiter) + t2.daemon = True + t2.start() + for t in (t1, t2): + t.join() + + self.assertEqual(1, len(jobs)) + + def test_posting_claim(self): + + with connect_close(self.board): + with flush(self.client): + self.board.post('test', p_utils.temporary_log_book()) + + self.assertEqual(1, self.board.job_count) + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + j = possible_jobs[0] + self.assertEqual(states.UNCLAIMED, j.state) + + with flush(self.client): + self.board.claim(j, self.board.name) + + self.assertEqual(self.board.name, self.board.find_owner(j)) + self.assertEqual(states.CLAIMED, j.state) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(0, len(possible_jobs)) + + self.assertRaisesAttrAccess(excp.NotFound, j, 'state') + self.assertRaises(excp.NotFound, + self.board.consume, j, self.board.name) + + def test_posting_claim_consume(self): + + with connect_close(self.board): + with flush(self.client): + self.board.post('test', p_utils.temporary_log_book()) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + j = possible_jobs[0] + with flush(self.client): + self.board.claim(j, self.board.name) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(0, len(possible_jobs)) + with flush(self.client): + self.board.consume(j, self.board.name) + + self.assertEqual(0, len(list(self.board.iterjobs()))) + self.assertRaises(excp.NotFound, + self.board.consume, j, self.board.name) + + def test_posting_claim_abandon(self): + + with connect_close(self.board): + with flush(self.client): + self.board.post('test', p_utils.temporary_log_book()) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + j = possible_jobs[0] + with flush(self.client): + self.board.claim(j, self.board.name) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(0, len(possible_jobs)) + with flush(self.client): + self.board.abandon(j, self.board.name) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + + def test_posting_claim_diff_owner(self): + + with connect_close(self.board): + with flush(self.client): + self.board.post('test', p_utils.temporary_log_book()) + + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + with flush(self.client): + self.board.claim(possible_jobs[0], self.board.name) + + possible_jobs = list(self.board.iterjobs()) + self.assertEqual(1, len(possible_jobs)) + self.assertRaises(excp.UnclaimableJob, self.board.claim, + possible_jobs[0], self.board.name + "-1") + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(0, len(possible_jobs)) + + def test_posting_no_post(self): + with connect_close(self.board): + with mock.patch.object(self.client, 'create') as create_func: + create_func.side_effect = IOError("Unable to post") + self.assertRaises(IOError, self.board.post, + 'test', p_utils.temporary_log_book()) + self.assertEqual(0, self.board.job_count) + + def test_posting_with_book(self): + backend = impl_dir.DirBackend(conf={ + 'path': self.makeTmpDir(), + }) + backend.get_connection().upgrade() + book, flow_detail = p_utils.temporary_flow_detail(backend) + self.assertEqual(1, len(book)) + + client, board = self._create_board(persistence=backend) + self.addCleanup(board.close) + + with connect_close(board): + with flush(client): + board.post('test', book) + + possible_jobs = list(board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + j = possible_jobs[0] + self.assertEqual(1, len(j.book)) + self.assertEqual(book.name, j.book.name) + self.assertEqual(book.uuid, j.book.uuid) + + flow_details = list(j.book) + self.assertEqual(flow_detail.uuid, flow_details[0].uuid) + self.assertEqual(flow_detail.name, flow_details[0].name) + + def test_posting_abandon_no_owner(self): + + with connect_close(self.board): + with flush(self.client): + self.board.post('test', p_utils.temporary_log_book()) + + self.assertEqual(1, self.board.job_count) + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.assertEqual(1, len(possible_jobs)) + j = possible_jobs[0] + self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name) diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index ef652c57..9003cfdd 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -14,163 +14,83 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib -import mock -import threading -import time - import six -from kazoo.recipe import watchers - from zake import fake_client from zake import utils as zake_utils -from taskflow import exceptions as excp from taskflow.jobs.backends import impl_zookeeper from taskflow import states from taskflow import test from taskflow.openstack.common import jsonutils -from taskflow.openstack.common import uuidutils -from taskflow.persistence.backends import impl_dir +from taskflow.tests.unit.jobs import base from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils -@contextlib.contextmanager -def connect_close(*args): - try: - for a in args: - a.connect() - yield - finally: - for a in args: - a.close() +class ZakeJobboardTest(test.TestCase, base.BoardTestMixin): + def _create_board(self, client=None, persistence=None): + if not client: + client = fake_client.FakeClient() + board = impl_zookeeper.ZookeeperJobBoard('test-board', {}, + client=client, + persistence=persistence) + return (client, board) - -def create_board(client=None, persistence=None): - if not client: - client = fake_client.FakeClient() - board = impl_zookeeper.ZookeeperJobBoard('test-board', {}, - client=client, - persistence=persistence) - return (client, board) - - -@contextlib.contextmanager -def flush(client, path=None): - # This uses the linearity guarantee of zookeeper (and associated libraries) - # to create a temporary node, wait until a watcher notifies it's created, - # then yield back for more work, and then at the end of that work delete - # the created node. This ensures that the operations done in the yield - # of this context manager will be applied and all watchers will have fired - # before this context manager exits. - if not path: - path = "/tmp-%s" % uuidutils.generate_uuid() - created = threading.Event() - deleted = threading.Event() - - def on_created(data, stat): - if stat is not None: - created.set() - return False # cause this watcher to cease to exist - - def on_deleted(data, stat): - if stat is None: - deleted.set() - return False # cause this watcher to cease to exist - - watchers.DataWatch(client, path, func=on_created) - client.create(path) - created.wait() - try: - yield - finally: - watchers.DataWatch(client, path, func=on_deleted) - client.delete(path, recursive=True) - deleted.wait() - - -class TestZookeeperJobs(test.TestCase): def setUp(self): - super(TestZookeeperJobs, self).setUp() - self.client, self.board = create_board() + super(ZakeJobboardTest, self).setUp() + self.client, self.board = self._create_board() self.addCleanup(self.board.close) self.bad_paths = [self.board.path] self.bad_paths.extend(zake_utils.partition_path(self.board.path)) - def test_connect(self): - self.assertFalse(self.board.connected) - with connect_close(self.board): - self.assertTrue(self.board.connected) + def test_posting_owner_lost(self): - @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." - "millis_to_datetime") - def test_posting_dates(self, mock_dt): - epoch = misc.millis_to_datetime(0) - mock_dt.return_value = epoch + with base.connect_close(self.board): + with base.flush(self.client): + j = self.board.post('test', p_utils.temporary_log_book()) + self.assertEqual(states.UNCLAIMED, j.state) + with base.flush(self.client): + self.board.claim(j, self.board.name) + self.assertEqual(states.CLAIMED, j.state) - with connect_close(self.board): - j = self.board.post('test', p_utils.temporary_log_book()) - self.assertEqual(epoch, j.created_on) - self.assertEqual(epoch, j.last_modified) + # Forcefully delete the owner from the backend storage to make + # sure the job becomes unclaimed (this may happen if some admin + # manually deletes the lock). + paths = list(six.iteritems(self.client.storage.paths)) + for (path, value) in paths: + if path in self.bad_paths: + continue + if path.endswith('lock'): + value['data'] = misc.binary_encode(jsonutils.dumps({})) + self.assertEqual(states.UNCLAIMED, j.state) - self.assertTrue(mock_dt.called) + def test_posting_state_lock_lost(self): - def test_board_iter(self): - with connect_close(self.board): - it = self.board.iterjobs() - self.assertEqual(it.board, self.board) - self.assertFalse(it.only_unclaimed) - self.assertFalse(it.ensure_fresh) + with base.connect_close(self.board): + with base.flush(self.client): + j = self.board.post('test', p_utils.temporary_log_book()) + self.assertEqual(states.UNCLAIMED, j.state) + with base.flush(self.client): + self.board.claim(j, self.board.name) + self.assertEqual(states.CLAIMED, j.state) - def test_board_iter_empty(self): - with connect_close(self.board): - jobs_found = list(self.board.iterjobs()) - self.assertEqual([], jobs_found) - - def test_fresh_iter(self): - with connect_close(self.board): - book = p_utils.temporary_log_book() - self.board.post('test', book) - jobs = list(self.board.iterjobs(ensure_fresh=True)) - self.assertEqual(1, len(jobs)) - - def test_wait_timeout(self): - with connect_close(self.board): - self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) - - def test_wait_arrival(self): - ev = threading.Event() - jobs = [] - - def poster(wait_post=0.2): - ev.wait() # wait until the waiter is active - time.sleep(wait_post) - self.board.post('test', p_utils.temporary_log_book()) - - def waiter(): - ev.set() - it = self.board.wait() - jobs.extend(it) - - with connect_close(self.board): - t1 = threading.Thread(target=poster) - t1.daemon = True - t1.start() - t2 = threading.Thread(target=waiter) - t2.daemon = True - t2.start() - for t in (t1, t2): - t.join() - - self.assertEqual(1, len(jobs)) + # Forcefully delete the lock from the backend storage to make + # sure the job becomes unclaimed (this may happen if some admin + # manually deletes the lock). + paths = list(six.iteritems(self.client.storage.paths)) + for (path, value) in paths: + if path in self.bad_paths: + continue + if path.endswith("lock"): + self.client.storage.pop(path) + self.assertEqual(states.UNCLAIMED, j.state) def test_posting_received_raw(self): book = p_utils.temporary_log_book() - with connect_close(self.board): + with base.connect_close(self.board): self.assertTrue(self.board.connected) self.assertEqual(0, self.board.job_count) posted_job = self.board.post('test', book) @@ -201,175 +121,3 @@ class TestZookeeperJobs(test.TestCase): }, 'details': {}, }, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) - - def test_posting_claim(self): - - with connect_close(self.board): - with flush(self.client): - self.board.post('test', p_utils.temporary_log_book()) - - self.assertEqual(1, self.board.job_count) - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - j = possible_jobs[0] - self.assertEqual(states.UNCLAIMED, j.state) - - with flush(self.client): - self.board.claim(j, self.board.name) - - self.assertEqual(self.board.name, self.board.find_owner(j)) - self.assertEqual(states.CLAIMED, j.state) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(0, len(possible_jobs)) - - self.assertRaisesAttrAccess(excp.NotFound, j, 'state') - self.assertRaises(excp.NotFound, - self.board.consume, j, self.board.name) - - def test_posting_claim_consume(self): - - with connect_close(self.board): - with flush(self.client): - self.board.post('test', p_utils.temporary_log_book()) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - j = possible_jobs[0] - with flush(self.client): - self.board.claim(j, self.board.name) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(0, len(possible_jobs)) - with flush(self.client): - self.board.consume(j, self.board.name) - - self.assertEqual(0, len(list(self.board.iterjobs()))) - self.assertRaises(excp.NotFound, - self.board.consume, j, self.board.name) - - def test_posting_claim_abandon(self): - - with connect_close(self.board): - with flush(self.client): - self.board.post('test', p_utils.temporary_log_book()) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - j = possible_jobs[0] - with flush(self.client): - self.board.claim(j, self.board.name) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(0, len(possible_jobs)) - with flush(self.client): - self.board.abandon(j, self.board.name) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - - def test_posting_claim_diff_owner(self): - - with connect_close(self.board): - with flush(self.client): - self.board.post('test', p_utils.temporary_log_book()) - - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - with flush(self.client): - self.board.claim(possible_jobs[0], self.board.name) - - possible_jobs = list(self.board.iterjobs()) - self.assertEqual(1, len(possible_jobs)) - self.assertRaises(excp.UnclaimableJob, self.board.claim, - possible_jobs[0], self.board.name + "-1") - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(0, len(possible_jobs)) - - def test_posting_state_lock_lost(self): - - with connect_close(self.board): - with flush(self.client): - j = self.board.post('test', p_utils.temporary_log_book()) - self.assertEqual(states.UNCLAIMED, j.state) - with flush(self.client): - self.board.claim(j, self.board.name) - self.assertEqual(states.CLAIMED, j.state) - - # Forcefully delete the lock from the backend storage to make - # sure the job becomes unclaimed (this may happen if some admin - # manually deletes the lock). - paths = list(six.iteritems(self.client.storage.paths)) - for (path, value) in paths: - if path in self.bad_paths: - continue - if path.endswith("lock"): - self.client.storage.pop(path) - self.assertEqual(states.UNCLAIMED, j.state) - - def test_posting_no_post(self): - with connect_close(self.board): - with mock.patch.object(self.client, 'create') as create_func: - create_func.side_effect = IOError("Unable to post") - self.assertRaises(IOError, self.board.post, - 'test', p_utils.temporary_log_book()) - self.assertEqual(0, self.board.job_count) - - def test_posting_owner_lost(self): - - with connect_close(self.board): - with flush(self.client): - j = self.board.post('test', p_utils.temporary_log_book()) - self.assertEqual(states.UNCLAIMED, j.state) - with flush(self.client): - self.board.claim(j, self.board.name) - self.assertEqual(states.CLAIMED, j.state) - - # Forcefully delete the owner from the backend storage to make - # sure the job becomes unclaimed (this may happen if some admin - # manually deletes the lock). - paths = list(six.iteritems(self.client.storage.paths)) - for (path, value) in paths: - if path in self.bad_paths: - continue - if path.endswith('lock'): - value['data'] = misc.binary_encode(jsonutils.dumps({})) - self.assertEqual(states.UNCLAIMED, j.state) - - def test_posting_with_book(self): - backend = impl_dir.DirBackend(conf={ - 'path': self.makeTmpDir(), - }) - backend.get_connection().upgrade() - book, flow_detail = p_utils.temporary_flow_detail(backend) - self.assertEqual(1, len(book)) - - client, board = create_board(persistence=backend) - self.addCleanup(board.close) - - with connect_close(board): - with flush(client): - board.post('test', book) - - possible_jobs = list(board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - j = possible_jobs[0] - self.assertEqual(1, len(j.book)) - self.assertEqual(book.name, j.book.name) - self.assertEqual(book.uuid, j.book.uuid) - - flow_details = list(j.book) - self.assertEqual(flow_detail.uuid, flow_details[0].uuid) - self.assertEqual(flow_detail.name, flow_details[0].name) - - def test_posting_abandon_no_owner(self): - - with connect_close(self.board): - with flush(self.client): - self.board.post('test', p_utils.temporary_log_book()) - - self.assertEqual(1, self.board.job_count) - possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) - self.assertEqual(1, len(possible_jobs)) - j = possible_jobs[0] - self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name) diff --git a/taskflow/tests/unit/persistence/test_zake_persistence.py b/taskflow/tests/unit/persistence/test_zake_persistence.py deleted file mode 100644 index 91f04f9d..00000000 --- a/taskflow/tests/unit/persistence/test_zake_persistence.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 AT&T Labs All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib - -from zake import fake_client - -from taskflow.persistence import backends -from taskflow.persistence.backends import impl_zookeeper -from taskflow import test -from taskflow.tests.unit.persistence import base - - -class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin): - def _get_connection(self): - return self._backend.get_connection() - - def setUp(self): - super(ZakePersistenceTest, self).setUp() - conf = { - "path": "/taskflow", - } - client = fake_client.FakeClient() - client.start() - self._backend = impl_zookeeper.ZkBackend(conf, client=client) - conn = self._backend.get_connection() - conn.upgrade() - - def test_zk_persistence_entry_point(self): - conf = {'connection': 'zookeeper:'} - with contextlib.closing(backends.fetch(conf)) as be: - self.assertIsInstance(be, impl_zookeeper.ZkBackend) diff --git a/taskflow/tests/unit/persistence/test_zk_persistence.py b/taskflow/tests/unit/persistence/test_zk_persistence.py index 04eac378..414db09b 100644 --- a/taskflow/tests/unit/persistence/test_zk_persistence.py +++ b/taskflow/tests/unit/persistence/test_zk_persistence.py @@ -17,37 +17,18 @@ import contextlib import testtools +from zake import fake_client from taskflow.openstack.common import uuidutils +from taskflow.persistence import backends from taskflow.persistence.backends import impl_zookeeper from taskflow import test from taskflow.tests.unit.persistence import base -from taskflow.utils import kazoo_utils +from taskflow.tests import utils as test_utils -TEST_CONFIG = { - 'timeout': 1.0, - 'hosts': ["localhost:2181"], -} TEST_PATH_TPL = '/taskflow/persistence-test/%s' - - -def _zookeeper_available(): - client = kazoo_utils.make_client(TEST_CONFIG) - try: - # NOTE(imelnikov): 3 seconds we should be enough for localhost - client.start(timeout=3) - zk_ver = client.server_version() - if zk_ver >= impl_zookeeper.MIN_ZK_VERSION: - return True - else: - return False - except Exception: - return False - finally: - kazoo_utils.finalize_client(client) - - -_ZOOKEEPER_AVAILABLE = _zookeeper_available() +_ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( + impl_zookeeper.MIN_ZK_VERSION) @testtools.skipIf(not _ZOOKEEPER_AVAILABLE, 'zookeeper is not available') @@ -61,7 +42,7 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): def setUp(self): super(ZkPersistenceTest, self).setUp() - conf = TEST_CONFIG.copy() + conf = test_utils.ZK_TEST_CONFIG.copy() # Create a unique path just for this test (so that we don't overwrite # what other tests are doing). conf['path'] = TEST_PATH_TPL % (uuidutils.generate_uuid()) @@ -74,3 +55,30 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin): with contextlib.closing(self._get_connection()) as conn: conn.upgrade() self.addCleanup(self._clear_all) + + def test_zk_persistence_entry_point(self): + conf = {'connection': 'zookeeper:'} + with contextlib.closing(backends.fetch(conf)) as be: + self.assertIsInstance(be, impl_zookeeper.ZkBackend) + + +@testtools.skipIf(_ZOOKEEPER_AVAILABLE, 'zookeeper is available') +class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin): + def _get_connection(self): + return self._backend.get_connection() + + def setUp(self): + super(ZakePersistenceTest, self).setUp() + conf = { + "path": "/taskflow", + } + self.client = fake_client.FakeClient() + self.client.start() + self._backend = impl_zookeeper.ZkBackend(conf, client=self.client) + conn = self._backend.get_connection() + conn.upgrade() + + def test_zk_persistence_entry_point(self): + conf = {'connection': 'zookeeper:'} + with contextlib.closing(backends.fetch(conf)) as be: + self.assertIsInstance(be, impl_zookeeper.ZkBackend) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index d8793215..ce3289ec 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -23,12 +23,18 @@ from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task +from taskflow.utils import kazoo_utils from taskflow.utils import misc ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' +ZK_TEST_CONFIG = { + 'timeout': 1.0, + 'hosts': ["localhost:2181"], +} + @contextlib.contextmanager def wrap_all_failures(): @@ -44,6 +50,25 @@ def wrap_all_failures(): raise exceptions.WrappedFailure([misc.Failure()]) +def zookeeper_available(min_version, timeout=3): + client = kazoo_utils.make_client(ZK_TEST_CONFIG.copy()) + try: + # NOTE(imelnikov): 3 seconds we should be enough for localhost + client.start(timeout=float(timeout)) + if min_version: + zk_ver = client.server_version() + if zk_ver >= min_version: + return True + else: + return False + else: + return True + except Exception: + return False + finally: + kazoo_utils.finalize_client(client) + + class DummyTask(task.Task): def execute(self, context, *args, **kwargs):