From f7daf2217735259622886917d7bb7c33b122408d Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 28 Jul 2014 15:15:53 -0700 Subject: [PATCH] Use checked_commit() around consume() and abandon() To ensure we reliably handle when a transaction fails we should use the checked_commit() helper function instead of the currently not fully exception handling kazoo transaction commit function. This should be addressed in the future with: - https://github.com/python-zk/kazoo/pull/224 - https://github.com/python-zk/kazoo/pull/225 Those have not merged yet (or been released) so we need to use a similar function in the meantime. Change-Id: Icf83b7d4955c11227e733287170a7bd3ab372bd2 --- taskflow/jobs/backends/impl_zookeeper.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 4260ab91..8416d305 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -581,9 +581,10 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): raise excp.JobFailure("Can not consume a job %s" " which is not owned by %s" % (job.uuid, who)) - with self._client.transaction() as txn: - txn.delete(job.lock_path, version=lock_stat.version) - txn.delete(job.path, version=data_stat.version) + txn = self._client.transaction() + txn.delete(job.lock_path, version=lock_stat.version) + txn.delete(job.path, version=data_stat.version) + kazoo_utils.checked_commit(txn) self._remove_job(job.path) def abandon(self, job, who): @@ -600,8 +601,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): raise excp.JobFailure("Can not abandon a job %s" " which is not owned by %s" % (job.uuid, who)) - with self._client.transaction() as txn: - txn.delete(job.lock_path, version=lock_stat.version) + txn = self._client.transaction() + txn.delete(job.lock_path, version=lock_stat.version) + kazoo_utils.checked_commit(txn) def _state_change_listener(self, state): LOG.debug("Kazoo client has changed to state: %s", state)