Cleanup zookeeper integration testing

In the persistence tests only use zake when zookeeper is
not available (and of the right version). When zookeeper is
available skip running zake.

In the jobboard tests split out the tests which are not
specific for zake into a base class (allowing for a future
commit to add a zookeeper integration test).

Change-Id: I50d51639a7f6c03c29d559c485676fddb9a7cf20
This commit is contained in:
Joshua Harlow
2014-05-09 20:03:58 -07:00
parent fb5d54d506
commit 125d015d92
5 changed files with 379 additions and 370 deletions

View File

@@ -0,0 +1,273 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
import threading
import time
from kazoo.recipe import watchers
from taskflow import exceptions as excp
from taskflow.openstack.common import uuidutils
from taskflow.persistence.backends import impl_dir
from taskflow import states
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@contextlib.contextmanager
def connect_close(*args):
try:
for a in args:
a.connect()
yield
finally:
for a in args:
a.close()
@contextlib.contextmanager
def flush(client, path=None):
# This uses the linearity guarantee of zookeeper (and associated libraries)
# to create a temporary node, wait until a watcher notifies it's created,
# then yield back for more work, and then at the end of that work delete
# the created node. This ensures that the operations done in the yield
# of this context manager will be applied and all watchers will have fired
# before this context manager exits.
if not path:
path = "/tmp-%s" % uuidutils.generate_uuid()
created = threading.Event()
deleted = threading.Event()
def on_created(data, stat):
if stat is not None:
created.set()
return False # cause this watcher to cease to exist
def on_deleted(data, stat):
if stat is None:
deleted.set()
return False # cause this watcher to cease to exist
watchers.DataWatch(client, path, func=on_created)
client.create(path)
created.wait()
try:
yield
finally:
watchers.DataWatch(client, path, func=on_deleted)
client.delete(path, recursive=True)
deleted.wait()
class BoardTestMixin(object):
def test_connect(self):
self.assertFalse(self.board.connected)
with connect_close(self.board):
self.assertTrue(self.board.connected)
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
"millis_to_datetime")
def test_posting_dates(self, mock_dt):
epoch = misc.millis_to_datetime(0)
mock_dt.return_value = epoch
with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(epoch, j.created_on)
self.assertEqual(epoch, j.last_modified)
self.assertTrue(mock_dt.called)
def test_board_iter(self):
with connect_close(self.board):
it = self.board.iterjobs()
self.assertEqual(it.board, self.board)
self.assertFalse(it.only_unclaimed)
self.assertFalse(it.ensure_fresh)
def test_board_iter_empty(self):
with connect_close(self.board):
jobs_found = list(self.board.iterjobs())
self.assertEqual([], jobs_found)
def test_fresh_iter(self):
with connect_close(self.board):
book = p_utils.temporary_log_book()
self.board.post('test', book)
jobs = list(self.board.iterjobs(ensure_fresh=True))
self.assertEqual(1, len(jobs))
def test_wait_timeout(self):
with connect_close(self.board):
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
def test_wait_arrival(self):
ev = threading.Event()
jobs = []
def poster(wait_post=0.2):
ev.wait() # wait until the waiter is active
time.sleep(wait_post)
self.board.post('test', p_utils.temporary_log_book())
def waiter():
ev.set()
it = self.board.wait()
jobs.extend(it)
with connect_close(self.board):
t1 = threading.Thread(target=poster)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=waiter)
t2.daemon = True
t2.start()
for t in (t1, t2):
t.join()
self.assertEqual(1, len(jobs))
def test_posting_claim(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(states.UNCLAIMED, j.state)
with flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(self.board.name, self.board.find_owner(j))
self.assertEqual(states.CLAIMED, j.state)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
self.assertRaisesAttrAccess(excp.NotFound, j, 'state')
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_consume(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.consume(j, self.board.name)
self.assertEqual(0, len(list(self.board.iterjobs())))
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_abandon(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.abandon(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
def test_posting_claim_diff_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
with flush(self.client):
self.board.claim(possible_jobs[0], self.board.name)
possible_jobs = list(self.board.iterjobs())
self.assertEqual(1, len(possible_jobs))
self.assertRaises(excp.UnclaimableJob, self.board.claim,
possible_jobs[0], self.board.name + "-1")
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
def test_posting_no_post(self):
with connect_close(self.board):
with mock.patch.object(self.client, 'create') as create_func:
create_func.side_effect = IOError("Unable to post")
self.assertRaises(IOError, self.board.post,
'test', p_utils.temporary_log_book())
self.assertEqual(0, self.board.job_count)
def test_posting_with_book(self):
backend = impl_dir.DirBackend(conf={
'path': self.makeTmpDir(),
})
backend.get_connection().upgrade()
book, flow_detail = p_utils.temporary_flow_detail(backend)
self.assertEqual(1, len(book))
client, board = self._create_board(persistence=backend)
self.addCleanup(board.close)
with connect_close(board):
with flush(client):
board.post('test', book)
possible_jobs = list(board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(1, len(j.book))
self.assertEqual(book.name, j.book.name)
self.assertEqual(book.uuid, j.book.uuid)
flow_details = list(j.book)
self.assertEqual(flow_detail.uuid, flow_details[0].uuid)
self.assertEqual(flow_detail.name, flow_details[0].name)
def test_posting_abandon_no_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name)

View File

@@ -14,163 +14,83 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import mock
import threading
import time
import six import six
from kazoo.recipe import watchers
from zake import fake_client from zake import fake_client
from zake import utils as zake_utils from zake import utils as zake_utils
from taskflow import exceptions as excp
from taskflow.jobs.backends import impl_zookeeper from taskflow.jobs.backends import impl_zookeeper
from taskflow import states from taskflow import states
from taskflow import test from taskflow import test
from taskflow.openstack.common import jsonutils from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import uuidutils from taskflow.tests.unit.jobs import base
from taskflow.persistence.backends import impl_dir
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@contextlib.contextmanager class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
def connect_close(*args): def _create_board(self, client=None, persistence=None):
try: if not client:
for a in args: client = fake_client.FakeClient()
a.connect() board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
yield client=client,
finally: persistence=persistence)
for a in args: return (client, board)
a.close()
def create_board(client=None, persistence=None):
if not client:
client = fake_client.FakeClient()
board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
client=client,
persistence=persistence)
return (client, board)
@contextlib.contextmanager
def flush(client, path=None):
# This uses the linearity guarantee of zookeeper (and associated libraries)
# to create a temporary node, wait until a watcher notifies it's created,
# then yield back for more work, and then at the end of that work delete
# the created node. This ensures that the operations done in the yield
# of this context manager will be applied and all watchers will have fired
# before this context manager exits.
if not path:
path = "/tmp-%s" % uuidutils.generate_uuid()
created = threading.Event()
deleted = threading.Event()
def on_created(data, stat):
if stat is not None:
created.set()
return False # cause this watcher to cease to exist
def on_deleted(data, stat):
if stat is None:
deleted.set()
return False # cause this watcher to cease to exist
watchers.DataWatch(client, path, func=on_created)
client.create(path)
created.wait()
try:
yield
finally:
watchers.DataWatch(client, path, func=on_deleted)
client.delete(path, recursive=True)
deleted.wait()
class TestZookeeperJobs(test.TestCase):
def setUp(self): def setUp(self):
super(TestZookeeperJobs, self).setUp() super(ZakeJobboardTest, self).setUp()
self.client, self.board = create_board() self.client, self.board = self._create_board()
self.addCleanup(self.board.close) self.addCleanup(self.board.close)
self.bad_paths = [self.board.path] self.bad_paths = [self.board.path]
self.bad_paths.extend(zake_utils.partition_path(self.board.path)) self.bad_paths.extend(zake_utils.partition_path(self.board.path))
def test_connect(self): def test_posting_owner_lost(self):
self.assertFalse(self.board.connected)
with connect_close(self.board):
self.assertTrue(self.board.connected)
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." with base.connect_close(self.board):
"millis_to_datetime") with base.flush(self.client):
def test_posting_dates(self, mock_dt): j = self.board.post('test', p_utils.temporary_log_book())
epoch = misc.millis_to_datetime(0) self.assertEqual(states.UNCLAIMED, j.state)
mock_dt.return_value = epoch with base.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
with connect_close(self.board): # Forcefully delete the owner from the backend storage to make
j = self.board.post('test', p_utils.temporary_log_book()) # sure the job becomes unclaimed (this may happen if some admin
self.assertEqual(epoch, j.created_on) # manually deletes the lock).
self.assertEqual(epoch, j.last_modified) paths = list(six.iteritems(self.client.storage.paths))
for (path, value) in paths:
if path in self.bad_paths:
continue
if path.endswith('lock'):
value['data'] = misc.binary_encode(jsonutils.dumps({}))
self.assertEqual(states.UNCLAIMED, j.state)
self.assertTrue(mock_dt.called) def test_posting_state_lock_lost(self):
def test_board_iter(self): with base.connect_close(self.board):
with connect_close(self.board): with base.flush(self.client):
it = self.board.iterjobs() j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(it.board, self.board) self.assertEqual(states.UNCLAIMED, j.state)
self.assertFalse(it.only_unclaimed) with base.flush(self.client):
self.assertFalse(it.ensure_fresh) self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
def test_board_iter_empty(self): # Forcefully delete the lock from the backend storage to make
with connect_close(self.board): # sure the job becomes unclaimed (this may happen if some admin
jobs_found = list(self.board.iterjobs()) # manually deletes the lock).
self.assertEqual([], jobs_found) paths = list(six.iteritems(self.client.storage.paths))
for (path, value) in paths:
def test_fresh_iter(self): if path in self.bad_paths:
with connect_close(self.board): continue
book = p_utils.temporary_log_book() if path.endswith("lock"):
self.board.post('test', book) self.client.storage.pop(path)
jobs = list(self.board.iterjobs(ensure_fresh=True)) self.assertEqual(states.UNCLAIMED, j.state)
self.assertEqual(1, len(jobs))
def test_wait_timeout(self):
with connect_close(self.board):
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
def test_wait_arrival(self):
ev = threading.Event()
jobs = []
def poster(wait_post=0.2):
ev.wait() # wait until the waiter is active
time.sleep(wait_post)
self.board.post('test', p_utils.temporary_log_book())
def waiter():
ev.set()
it = self.board.wait()
jobs.extend(it)
with connect_close(self.board):
t1 = threading.Thread(target=poster)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=waiter)
t2.daemon = True
t2.start()
for t in (t1, t2):
t.join()
self.assertEqual(1, len(jobs))
def test_posting_received_raw(self): def test_posting_received_raw(self):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()
with connect_close(self.board): with base.connect_close(self.board):
self.assertTrue(self.board.connected) self.assertTrue(self.board.connected)
self.assertEqual(0, self.board.job_count) self.assertEqual(0, self.board.job_count)
posted_job = self.board.post('test', book) posted_job = self.board.post('test', book)
@@ -201,175 +121,3 @@ class TestZookeeperJobs(test.TestCase):
}, },
'details': {}, 'details': {},
}, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) }, jsonutils.loads(misc.binary_decode(paths[path_key]['data'])))
def test_posting_claim(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(states.UNCLAIMED, j.state)
with flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(self.board.name, self.board.find_owner(j))
self.assertEqual(states.CLAIMED, j.state)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
self.assertRaisesAttrAccess(excp.NotFound, j, 'state')
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_consume(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.consume(j, self.board.name)
self.assertEqual(0, len(list(self.board.iterjobs())))
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_abandon(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.abandon(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
def test_posting_claim_diff_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
with flush(self.client):
self.board.claim(possible_jobs[0], self.board.name)
possible_jobs = list(self.board.iterjobs())
self.assertEqual(1, len(possible_jobs))
self.assertRaises(excp.UnclaimableJob, self.board.claim,
possible_jobs[0], self.board.name + "-1")
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
def test_posting_state_lock_lost(self):
with connect_close(self.board):
with flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
with flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
# Forcefully delete the lock from the backend storage to make
# sure the job becomes unclaimed (this may happen if some admin
# manually deletes the lock).
paths = list(six.iteritems(self.client.storage.paths))
for (path, value) in paths:
if path in self.bad_paths:
continue
if path.endswith("lock"):
self.client.storage.pop(path)
self.assertEqual(states.UNCLAIMED, j.state)
def test_posting_no_post(self):
with connect_close(self.board):
with mock.patch.object(self.client, 'create') as create_func:
create_func.side_effect = IOError("Unable to post")
self.assertRaises(IOError, self.board.post,
'test', p_utils.temporary_log_book())
self.assertEqual(0, self.board.job_count)
def test_posting_owner_lost(self):
with connect_close(self.board):
with flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
with flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
# Forcefully delete the owner from the backend storage to make
# sure the job becomes unclaimed (this may happen if some admin
# manually deletes the lock).
paths = list(six.iteritems(self.client.storage.paths))
for (path, value) in paths:
if path in self.bad_paths:
continue
if path.endswith('lock'):
value['data'] = misc.binary_encode(jsonutils.dumps({}))
self.assertEqual(states.UNCLAIMED, j.state)
def test_posting_with_book(self):
backend = impl_dir.DirBackend(conf={
'path': self.makeTmpDir(),
})
backend.get_connection().upgrade()
book, flow_detail = p_utils.temporary_flow_detail(backend)
self.assertEqual(1, len(book))
client, board = create_board(persistence=backend)
self.addCleanup(board.close)
with connect_close(board):
with flush(client):
board.post('test', book)
possible_jobs = list(board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(1, len(j.book))
self.assertEqual(book.name, j.book.name)
self.assertEqual(book.uuid, j.book.uuid)
flow_details = list(j.book)
self.assertEqual(flow_detail.uuid, flow_details[0].uuid)
self.assertEqual(flow_detail.name, flow_details[0].name)
def test_posting_abandon_no_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name)

View File

@@ -1,45 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 AT&T Labs All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from zake import fake_client
from taskflow.persistence import backends
from taskflow.persistence.backends import impl_zookeeper
from taskflow import test
from taskflow.tests.unit.persistence import base
class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin):
def _get_connection(self):
return self._backend.get_connection()
def setUp(self):
super(ZakePersistenceTest, self).setUp()
conf = {
"path": "/taskflow",
}
client = fake_client.FakeClient()
client.start()
self._backend = impl_zookeeper.ZkBackend(conf, client=client)
conn = self._backend.get_connection()
conn.upgrade()
def test_zk_persistence_entry_point(self):
conf = {'connection': 'zookeeper:'}
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_zookeeper.ZkBackend)

View File

@@ -17,37 +17,18 @@
import contextlib import contextlib
import testtools import testtools
from zake import fake_client
from taskflow.openstack.common import uuidutils from taskflow.openstack.common import uuidutils
from taskflow.persistence import backends
from taskflow.persistence.backends import impl_zookeeper from taskflow.persistence.backends import impl_zookeeper
from taskflow import test from taskflow import test
from taskflow.tests.unit.persistence import base from taskflow.tests.unit.persistence import base
from taskflow.utils import kazoo_utils from taskflow.tests import utils as test_utils
TEST_CONFIG = {
'timeout': 1.0,
'hosts': ["localhost:2181"],
}
TEST_PATH_TPL = '/taskflow/persistence-test/%s' TEST_PATH_TPL = '/taskflow/persistence-test/%s'
_ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
impl_zookeeper.MIN_ZK_VERSION)
def _zookeeper_available():
client = kazoo_utils.make_client(TEST_CONFIG)
try:
# NOTE(imelnikov): 3 seconds we should be enough for localhost
client.start(timeout=3)
zk_ver = client.server_version()
if zk_ver >= impl_zookeeper.MIN_ZK_VERSION:
return True
else:
return False
except Exception:
return False
finally:
kazoo_utils.finalize_client(client)
_ZOOKEEPER_AVAILABLE = _zookeeper_available()
@testtools.skipIf(not _ZOOKEEPER_AVAILABLE, 'zookeeper is not available') @testtools.skipIf(not _ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
@@ -61,7 +42,7 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin):
def setUp(self): def setUp(self):
super(ZkPersistenceTest, self).setUp() super(ZkPersistenceTest, self).setUp()
conf = TEST_CONFIG.copy() conf = test_utils.ZK_TEST_CONFIG.copy()
# Create a unique path just for this test (so that we don't overwrite # Create a unique path just for this test (so that we don't overwrite
# what other tests are doing). # what other tests are doing).
conf['path'] = TEST_PATH_TPL % (uuidutils.generate_uuid()) conf['path'] = TEST_PATH_TPL % (uuidutils.generate_uuid())
@@ -74,3 +55,30 @@ class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin):
with contextlib.closing(self._get_connection()) as conn: with contextlib.closing(self._get_connection()) as conn:
conn.upgrade() conn.upgrade()
self.addCleanup(self._clear_all) self.addCleanup(self._clear_all)
def test_zk_persistence_entry_point(self):
conf = {'connection': 'zookeeper:'}
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_zookeeper.ZkBackend)
@testtools.skipIf(_ZOOKEEPER_AVAILABLE, 'zookeeper is available')
class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin):
def _get_connection(self):
return self._backend.get_connection()
def setUp(self):
super(ZakePersistenceTest, self).setUp()
conf = {
"path": "/taskflow",
}
self.client = fake_client.FakeClient()
self.client.start()
self._backend = impl_zookeeper.ZkBackend(conf, client=self.client)
conn = self._backend.get_connection()
conn.upgrade()
def test_zk_persistence_entry_point(self):
conf = {'connection': 'zookeeper:'}
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_zookeeper.ZkBackend)

View File

@@ -23,12 +23,18 @@ from taskflow import exceptions
from taskflow.persistence.backends import impl_memory from taskflow.persistence.backends import impl_memory
from taskflow import retry from taskflow import retry
from taskflow import task from taskflow import task
from taskflow.utils import kazoo_utils
from taskflow.utils import misc from taskflow.utils import misc
ARGS_KEY = '__args__' ARGS_KEY = '__args__'
KWARGS_KEY = '__kwargs__' KWARGS_KEY = '__kwargs__'
ORDER_KEY = '__order__' ORDER_KEY = '__order__'
ZK_TEST_CONFIG = {
'timeout': 1.0,
'hosts': ["localhost:2181"],
}
@contextlib.contextmanager @contextlib.contextmanager
def wrap_all_failures(): def wrap_all_failures():
@@ -44,6 +50,25 @@ def wrap_all_failures():
raise exceptions.WrappedFailure([misc.Failure()]) raise exceptions.WrappedFailure([misc.Failure()])
def zookeeper_available(min_version, timeout=3):
client = kazoo_utils.make_client(ZK_TEST_CONFIG.copy())
try:
# NOTE(imelnikov): 3 seconds we should be enough for localhost
client.start(timeout=float(timeout))
if min_version:
zk_ver = client.server_version()
if zk_ver >= min_version:
return True
else:
return False
else:
return True
except Exception:
return False
finally:
kazoo_utils.finalize_client(client)
class DummyTask(task.Task): class DummyTask(task.Task):
def execute(self, context, *args, **kwargs): def execute(self, context, *args, **kwargs):