Use sequencing when posting jobs

Retain the posting order by using sequencing when posting
jobs to a jobboard (at least in the impl_zookeeper case
this is possible).

Adjust the interface to recommend that jobboard implementations
post in oldest to newest order (we can not easily enforce this
since some implementations will likely not be able to do this
in every scenario).

Bring in a new zake which has sequencing support (since this
is required to support this feature).

Change-Id: I0c475c3c1d16e9bbc9db6ff0ad6b7103c5948a47
This commit is contained in:
Joshua Harlow
2014-04-08 14:46:56 -07:00
parent 1bc76325c1
commit 7d42e5b33c
4 changed files with 92 additions and 105 deletions

View File

@@ -28,6 +28,7 @@ from taskflow.jobs import job as base_job
from taskflow.jobs import jobboard
from taskflow.openstack.common import excutils
from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
from taskflow import states
from taskflow.utils import kazoo_utils
@@ -48,18 +49,6 @@ ALL_JOB_STATES = (
# Transaction support was added in 3.4.0
MIN_ZK_VERSION = (3, 4, 0)
# Used to ensure that watchers don't try to overwrite jobs that are still being
# posted (and may have not been completly posted yet), these jobs should not be
# yield back until they are in the ready state.
_READY = 'ready'
_POSTING = 'posting'
def _get_paths(base_path, job_uuid):
job_path = k_paths.join(base_path, job_uuid)
lock_path = k_paths.join(base_path, job_uuid, 'lock')
return (job_path, lock_path)
def _check_who(who):
if not isinstance(who, six.string_types):
@@ -69,7 +58,7 @@ def _check_who(who):
class ZookeeperJob(base_job.Job):
def __init__(self, name, board, client, backend,
def __init__(self, name, board, client, backend, path,
uuid=None, details=None, book=None, book_data=None):
super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details)
self._board = board
@@ -82,6 +71,16 @@ class ZookeeperJob(base_job.Job):
if all((self._book, self._book_data)):
raise ValueError("Only one of 'book_data' or 'book'"
" can be provided")
self._path = path
self._lock_path = "%s.lock" % (path)
@property
def lock_path(self):
return self._lock_path
@property
def path(self):
return self._path
@property
def board(self):
@@ -103,9 +102,8 @@ class ZookeeperJob(base_job.Job):
def state(self):
owner = self.board.find_owner(self)
job_data = {}
job_path, _lock_path = _get_paths(self.board.path, self.uuid)
try:
raw_data, _data_stat = self._client.get(job_path)
raw_data, _data_stat = self._client.get(self.path)
job_data = misc.decode_json(raw_data)
except k_exceptions.NoNodeError:
pass
@@ -128,6 +126,12 @@ class ZookeeperJob(base_job.Job):
return states.UNCLAIMED
return states.CLAIMED
def __cmp__(self, other):
return cmp(self.path, other.path)
def __hash__(self):
return hash(self.path)
@property
def book(self):
if self._book is None:
@@ -170,6 +174,9 @@ class ZookeeperJobBoard(jobboard.JobBoard):
self._client.add_listener(self._state_change_listener)
self._bad_paths = frozenset([path])
self._job_watcher = None
# Since we use sequenced ids this will be the path that the sequences
# are prefixed with, for example, job0000000001, job0000000002, ...
self._job_base = k_paths.join(path, "job")
@property
def path(self):
@@ -178,13 +185,7 @@ class ZookeeperJobBoard(jobboard.JobBoard):
@property
def job_count(self):
with self._job_mutate:
known_jobs = list(six.itervalues(self._known_jobs))
count = 0
for (_job, posting_state) in known_jobs:
if posting_state != _READY:
continue
count += 1
return count
return len(self._known_jobs)
def _force_refresh(self, delayed=False):
try:
@@ -208,20 +209,18 @@ class ZookeeperJobBoard(jobboard.JobBoard):
if only_unclaimed:
ok_states = UNCLAIMED_JOB_STATES
with self._job_mutate:
known_jobs = list(six.iteritems(self._known_jobs))
for (path, (job, posting_state)) in known_jobs:
if posting_state != _READY:
continue
known_jobs = list(six.itervalues(self._known_jobs))
for job in sorted(known_jobs):
try:
if job.state in ok_states:
yield job
except excp.JobFailure as e:
LOG.warn("Failed determining the state of job %s"
" due to: %s", job.uuid, e)
except excp.JobFailure:
LOG.warn("Failed determining the state of job: %s (%s)",
job.uuid, job.path, exc_info=True)
except excp.NotFound:
# Someone destroyed it while we are iterating.
with self._job_mutate:
self._remove_job(path)
self._remove_job(job.path)
def _remove_job(self, path):
LOG.debug("Removing job that was at path: %s", path)
@@ -235,11 +234,11 @@ class ZookeeperJobBoard(jobboard.JobBoard):
with self._job_mutate:
if path not in self._known_jobs:
job = ZookeeperJob(job_data['name'], self,
self._client, self._persistence,
self._client, self._persistence, path,
uuid=job_data['uuid'],
book_data=job_data.get("book"),
details=job_data.get("details", {}))
self._known_jobs[path] = (job, _READY)
self._known_jobs[path] = job
except (ValueError, TypeError, KeyError):
LOG.warn("Incorrectly formatted job data found at path: %s",
path, exc_info=True)
@@ -263,9 +262,7 @@ class ZookeeperJobBoard(jobboard.JobBoard):
# Remove jobs that we know about but which are no longer children
with self._job_mutate:
removals = set()
for path, (_job, posting_state) in six.iteritems(self._known_jobs):
if posting_state != _READY:
continue
for path, _job in six.iteritems(self._known_jobs):
if path not in child_paths:
removals.add(path)
for path in removals:
@@ -289,64 +286,54 @@ class ZookeeperJobBoard(jobboard.JobBoard):
else:
child_proc(request)
def _format_job(self, job):
posting = {
'uuid': job.uuid,
'name': job.name,
}
if job.details is not None:
posting['details'] = job.details
if job.book is not None:
posting['book'] = {
'name': job.book.name,
'uuid': job.book.uuid,
}
return misc.binary_encode(jsonutils.dumps(posting))
def post(self, name, book, details=None):
# Didn't work, clean it up.
def try_clean(path):
with self._job_mutate:
self._remove_job(path)
def format_posting(job_uuid):
posting = {
'uuid': job_uuid,
'name': name,
}
if details:
posting['details'] = details
else:
posting['details'] = {}
if book is not None:
posting['book'] = {
'name': book.name,
'uuid': book.uuid,
}
return posting
# NOTE(harlowja): Jobs are not ephemeral, they will persist until they
# are consumed (this may change later, but seems safer to do this until
# further notice).
job = ZookeeperJob(name, self,
self._client, self._persistence,
book=book, details=details)
job_path, _lock_path = _get_paths(self.path, job.uuid)
# NOTE(harlowja): This avoids the watcher thread from attempting to
# overwrite or delete this job which is not yet ready but is in the
# process of being posted.
with self._job_mutate:
self._known_jobs[job_path] = (job, _POSTING)
with self._wrap(job.uuid, "Posting failure: %s", ensure_known=False):
try:
self._client.create(job_path, value=self._format_job(job))
with self._job_mutate:
self._known_jobs[job_path] = (job, _READY)
return job
except k_exceptions.NodeExistsException:
try_clean(job_path)
raise excp.Duplicate("Duplicate job %s already posted"
% job.uuid)
except Exception:
with excutils.save_and_reraise_exception():
try_clean(job_path)
job_uuid = uuidutils.generate_uuid()
with self._wrap(job_uuid, None,
"Posting failure: %s", ensure_known=False):
job_posting = format_posting(job_uuid)
job_posting = misc.binary_encode(jsonutils.dumps(job_posting))
job_path = self._client.create(self._job_base,
value=job_posting,
sequence=True,
ephemeral=False)
job = ZookeeperJob(name, self, self._client,
self._persistence, job_path,
book=book, details=details,
uuid=job_uuid)
with self._job_mutate:
self._known_jobs[job_path] = job
return job
def claim(self, job, who):
_check_who(who)
job_path, lock_path = _get_paths(self.path, job.uuid)
with self._wrap(job.uuid, "Claiming failure: %s"):
with self._wrap(job.uuid, job.path, "Claiming failure: %s"):
# NOTE(harlowja): post as json which will allow for future changes
# more easily than a raw string/text.
value = jsonutils.dumps({
'owner': who,
})
try:
self._client.create(lock_path,
self._client.create(job.lock_path,
value=misc.binary_encode(value),
ephemeral=True)
except k_exceptions.NodeExistsException:
@@ -362,9 +349,14 @@ class ZookeeperJobBoard(jobboard.JobBoard):
raise excp.UnclaimableJob(msg)
@contextlib.contextmanager
def _wrap(self, job_uuid, fail_msg_tpl="Failure: %s", ensure_known=True):
def _wrap(self, job_uuid, job_path,
fail_msg_tpl="Failure: %s", ensure_known=True):
if job_path:
fail_msg_tpl += " (%s)" % (job_path)
if ensure_known:
job_path, _lock_path = _get_paths(self.path, job_uuid)
if not job_path:
raise ValueError("Unable to check if %r is a known path"
% (job_path))
with self._job_mutate:
if job_path not in self._known_jobs:
fail_msg_tpl += ", unknown job"
@@ -385,11 +377,10 @@ class ZookeeperJobBoard(jobboard.JobBoard):
raise excp.JobFailure(fail_msg_tpl % (job_uuid), e)
def find_owner(self, job):
_job_path, lock_path = _get_paths(self.path, job.uuid)
with self._wrap(job.uuid, "Owner query failure: %s"):
with self._wrap(job.uuid, job.path, "Owner query failure: %s"):
try:
self._client.sync(lock_path)
raw_data, _lock_stat = self._client.get(lock_path)
self._client.sync(job.lock_path)
raw_data, _lock_stat = self._client.get(job.lock_path)
data = misc.decode_json(raw_data)
owner = data.get("owner")
except k_exceptions.NoNodeError:
@@ -397,16 +388,14 @@ class ZookeeperJobBoard(jobboard.JobBoard):
return owner
def _get_owner_and_data(self, job):
job_path, lock_path = _get_paths(self.path, job.uuid)
lock_data, lock_stat = self._client.get(lock_path)
job_data, job_stat = self._client.get(job_path)
lock_data, lock_stat = self._client.get(job.lock_path)
job_data, job_stat = self._client.get(job.path)
return (misc.decode_json(lock_data), lock_stat,
misc.decode_json(job_data), job_stat)
def consume(self, job, who):
_check_who(who)
job_path, lock_path = _get_paths(self.path, job.uuid)
with self._wrap(job.uuid, "Consumption failure: %s"):
with self._wrap(job.uuid, job.path, "Consumption failure: %s"):
try:
owner_data = self._get_owner_and_data(job)
lock_data, lock_stat, data, data_stat = owner_data
@@ -418,17 +407,15 @@ class ZookeeperJobBoard(jobboard.JobBoard):
raise excp.JobFailure("Can not consume a job %s"
" which is not owned by %s"
% (job.uuid, who))
with self._client.transaction() as txn:
txn.delete(lock_path, version=lock_stat.version)
txn.delete(job_path, version=data_stat.version)
txn.delete(job.lock_path, version=lock_stat.version)
txn.delete(job.path, version=data_stat.version)
with self._job_mutate:
self._remove_job(job_path)
self._remove_job(job.path)
def abandon(self, job, who):
_check_who(who)
job_path, lock_path = _get_paths(self.path, job.uuid)
with self._wrap(job.uuid, "Abandonment failure: %s"):
with self._wrap(job.uuid, job.path, "Abandonment failure: %s"):
try:
owner_data = self._get_owner_and_data(job)
lock_data, lock_stat, data, data_stat = owner_data
@@ -440,9 +427,8 @@ class ZookeeperJobBoard(jobboard.JobBoard):
raise excp.JobFailure("Can not abandon a job %s"
" which is not owned by %s"
% (job.uuid, who))
with self._client.transaction() as txn:
txn.delete(lock_path, version=lock_stat.version)
txn.delete(job.lock_path, version=lock_stat.version)
def _state_change_listener(self, state):
LOG.debug("Kazoo client has changed to state: %s", state)

View File

@@ -37,6 +37,11 @@ class JobBoard(object):
"""Yields back jobs that are currently on this jobboard (claimed
or not claimed).
NOTE(harlowja): the ordering of this iteration should be by posting
order (oldest to newest) if possible, but it is left up to the backing
implementation to provide the order that best suits it (so don't depend
on it always being oldest to newest).
:param only_unclaimed: boolean that indicates whether to only iteration
over unclaimed jobs.
:param ensure_fresh: boolean that requests to only iterate over the

View File

@@ -102,7 +102,6 @@ class TestZookeeperJobs(test.TestCase):
# Check the actual data that was posted.
self.assertEqual(1, len(paths))
path_key = list(six.iterkeys(paths))[0]
self.assertIn(posted_job.uuid, path_key)
self.assertTrue(len(paths[path_key]['data']) > 0)
self.assertDictEqual({
'uuid': posted_job.uuid,
@@ -220,13 +219,10 @@ class TestZookeeperJobs(test.TestCase):
self.assertEqual(states.UNCLAIMED, j.state)
def test_posting_no_post(self):
def bad_format(job):
raise UnicodeError("Could not format")
with connect_close(self.board):
with mock.patch.object(self.board, '_format_job', bad_format):
self.assertRaises(UnicodeError, self.board.post,
with mock.patch.object(self.client, 'create') as create_func:
create_func.side_effect = IOError("Unable to post")
self.assertRaises(IOError, self.board.post,
'test', p_utils.temporary_log_book())
self.assertEqual(0, self.board.job_count)

View File

@@ -5,7 +5,7 @@ mock>=1.0
python-subunit>=0.0.18
testrepository>=0.0.18
testtools>=0.9.34
zake>=0.0.13
zake>=0.0.15
# docs build jobs
sphinx>=1.1.2,<1.2
oslosphinx