Use a more stable flush method

Instead of using the zake provided flush method (which
does have issues, since it does not guarantee that the
associated watches will have been called) instead use a
new method which creates nodes, waits for there watches
to be triggered, does other work, and then deletes the
node and waits for that watcher to be triggered; this is
more stable (as it depends on the linearity guarantee
of zookeeper and the kazoo threading model).

Change-Id: I12fd9c7bcc5cd9009b4175166edfc924e94161bf
This commit is contained in:
Joshua Harlow 2014-05-09 19:33:41 -07:00
parent 258e009ae3
commit fb5d54d506
2 changed files with 71 additions and 40 deletions

View File

@ -21,6 +21,8 @@ import time
import six import six
from kazoo.recipe import watchers
from zake import fake_client from zake import fake_client
from zake import utils as zake_utils from zake import utils as zake_utils
@ -30,6 +32,7 @@ from taskflow import states
from taskflow import test from taskflow import test
from taskflow.openstack.common import jsonutils from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import uuidutils
from taskflow.persistence.backends import impl_dir from taskflow.persistence.backends import impl_dir
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@ -55,6 +58,40 @@ def create_board(client=None, persistence=None):
return (client, board) return (client, board)
@contextlib.contextmanager
def flush(client, path=None):
# This uses the linearity guarantee of zookeeper (and associated libraries)
# to create a temporary node, wait until a watcher notifies it's created,
# then yield back for more work, and then at the end of that work delete
# the created node. This ensures that the operations done in the yield
# of this context manager will be applied and all watchers will have fired
# before this context manager exits.
if not path:
path = "/tmp-%s" % uuidutils.generate_uuid()
created = threading.Event()
deleted = threading.Event()
def on_created(data, stat):
if stat is not None:
created.set()
return False # cause this watcher to cease to exist
def on_deleted(data, stat):
if stat is None:
deleted.set()
return False # cause this watcher to cease to exist
watchers.DataWatch(client, path, func=on_created)
client.create(path)
created.wait()
try:
yield
finally:
watchers.DataWatch(client, path, func=on_deleted)
client.delete(path, recursive=True)
deleted.wait()
class TestZookeeperJobs(test.TestCase): class TestZookeeperJobs(test.TestCase):
def setUp(self): def setUp(self):
super(TestZookeeperJobs, self).setUp() super(TestZookeeperJobs, self).setUp()
@ -66,7 +103,6 @@ class TestZookeeperJobs(test.TestCase):
def test_connect(self): def test_connect(self):
self.assertFalse(self.board.connected) self.assertFalse(self.board.connected)
with connect_close(self.board): with connect_close(self.board):
self.client.flush()
self.assertTrue(self.board.connected) self.assertTrue(self.board.connected)
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
@ -77,7 +113,6 @@ class TestZookeeperJobs(test.TestCase):
with connect_close(self.board): with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book()) j = self.board.post('test', p_utils.temporary_log_book())
self.client.flush()
self.assertEqual(epoch, j.created_on) self.assertEqual(epoch, j.created_on)
self.assertEqual(epoch, j.last_modified) self.assertEqual(epoch, j.last_modified)
@ -99,8 +134,6 @@ class TestZookeeperJobs(test.TestCase):
with connect_close(self.board): with connect_close(self.board):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()
self.board.post('test', book) self.board.post('test', book)
self.client.flush()
jobs = list(self.board.iterjobs(ensure_fresh=True)) jobs = list(self.board.iterjobs(ensure_fresh=True))
self.assertEqual(1, len(jobs)) self.assertEqual(1, len(jobs))
@ -138,11 +171,9 @@ class TestZookeeperJobs(test.TestCase):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()
with connect_close(self.board): with connect_close(self.board):
self.client.flush()
self.assertTrue(self.board.connected) self.assertTrue(self.board.connected)
self.assertEqual(0, self.board.job_count) self.assertEqual(0, self.board.job_count)
posted_job = self.board.post('test', book) posted_job = self.board.post('test', book)
self.client.flush()
self.assertEqual(self.board, posted_job.board) self.assertEqual(self.board, posted_job.board)
self.assertEqual(1, self.board.job_count) self.assertEqual(1, self.board.job_count)
@ -174,8 +205,8 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_claim(self): def test_posting_claim(self):
with connect_close(self.board): with connect_close(self.board):
self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count) self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
@ -183,8 +214,9 @@ class TestZookeeperJobs(test.TestCase):
j = possible_jobs[0] j = possible_jobs[0]
self.assertEqual(states.UNCLAIMED, j.state) self.assertEqual(states.UNCLAIMED, j.state)
self.board.claim(j, self.board.name) with flush(self.client):
self.client.flush() self.board.claim(j, self.board.name)
self.assertEqual(self.board.name, self.board.find_owner(j)) self.assertEqual(self.board.name, self.board.find_owner(j))
self.assertEqual(states.CLAIMED, j.state) self.assertEqual(states.CLAIMED, j.state)
@ -198,19 +230,19 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_claim_consume(self): def test_posting_claim_consume(self):
with connect_close(self.board): with connect_close(self.board):
self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0] j = possible_jobs[0]
self.board.claim(j, self.board.name) with flush(self.client):
self.client.flush() self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs)) self.assertEqual(0, len(possible_jobs))
self.board.consume(j, self.board.name) with flush(self.client):
self.client.flush() self.board.consume(j, self.board.name)
self.assertEqual(0, len(list(self.board.iterjobs()))) self.assertEqual(0, len(list(self.board.iterjobs())))
self.assertRaises(excp.NotFound, self.assertRaises(excp.NotFound,
@ -219,20 +251,19 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_claim_abandon(self): def test_posting_claim_abandon(self):
with connect_close(self.board): with connect_close(self.board):
self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0] j = possible_jobs[0]
self.board.claim(j, self.board.name) with flush(self.client):
self.client.flush() self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs)) self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.abandon(j, self.board.name) self.board.abandon(j, self.board.name)
self.client.flush()
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
@ -240,13 +271,13 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_claim_diff_owner(self): def test_posting_claim_diff_owner(self):
with connect_close(self.board): with connect_close(self.board):
self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
self.board.claim(possible_jobs[0], self.board.name) with flush(self.client):
self.client.flush() self.board.claim(possible_jobs[0], self.board.name)
possible_jobs = list(self.board.iterjobs()) possible_jobs = list(self.board.iterjobs())
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
@ -258,11 +289,11 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_state_lock_lost(self): def test_posting_state_lock_lost(self):
with connect_close(self.board): with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state) self.assertEqual(states.UNCLAIMED, j.state)
self.board.claim(j, self.board.name) with flush(self.client):
self.client.flush() self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state) self.assertEqual(states.CLAIMED, j.state)
# Forcefully delete the lock from the backend storage to make # Forcefully delete the lock from the backend storage to make
@ -287,11 +318,11 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_owner_lost(self): def test_posting_owner_lost(self):
with connect_close(self.board): with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state) self.assertEqual(states.UNCLAIMED, j.state)
self.board.claim(j, self.board.name) with flush(self.client):
self.client.flush() self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state) self.assertEqual(states.CLAIMED, j.state)
# Forcefully delete the owner from the backend storage to make # Forcefully delete the owner from the backend storage to make
@ -317,8 +348,8 @@ class TestZookeeperJobs(test.TestCase):
self.addCleanup(board.close) self.addCleanup(board.close)
with connect_close(board): with connect_close(board):
board.post('test', book) with flush(client):
client.flush() board.post('test', book)
possible_jobs = list(board.iterjobs(only_unclaimed=True)) possible_jobs = list(board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs)) self.assertEqual(1, len(possible_jobs))
@ -334,8 +365,8 @@ class TestZookeeperJobs(test.TestCase):
def test_posting_abandon_no_owner(self): def test_posting_abandon_no_owner(self):
with connect_close(self.board): with connect_close(self.board):
self.board.post('test', p_utils.temporary_log_book()) with flush(self.client):
self.client.flush() self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count) self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) possible_jobs = list(self.board.iterjobs(only_unclaimed=True))

View File

@ -5,7 +5,7 @@ mock>=1.0
python-subunit>=0.0.18 python-subunit>=0.0.18
testrepository>=0.0.18 testrepository>=0.0.18
testtools>=0.9.34 testtools>=0.9.34
zake>=0.0.15 zake>=0.0.18
# docs build jobs # docs build jobs
sphinx>=1.1.2,<1.2 sphinx>=1.1.2,<1.2
oslosphinx oslosphinx