Merge "Use checked_commit() around consume() and abandon()"
This commit is contained in:
@@ -581,9 +581,10 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
raise excp.JobFailure("Can not consume a job %s"
|
||||
" which is not owned by %s"
|
||||
% (job.uuid, who))
|
||||
with self._client.transaction() as txn:
|
||||
txn.delete(job.lock_path, version=lock_stat.version)
|
||||
txn.delete(job.path, version=data_stat.version)
|
||||
txn = self._client.transaction()
|
||||
txn.delete(job.lock_path, version=lock_stat.version)
|
||||
txn.delete(job.path, version=data_stat.version)
|
||||
kazoo_utils.checked_commit(txn)
|
||||
self._remove_job(job.path)
|
||||
|
||||
def abandon(self, job, who):
|
||||
@@ -600,8 +601,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
raise excp.JobFailure("Can not abandon a job %s"
|
||||
" which is not owned by %s"
|
||||
% (job.uuid, who))
|
||||
with self._client.transaction() as txn:
|
||||
txn.delete(job.lock_path, version=lock_stat.version)
|
||||
txn = self._client.transaction()
|
||||
txn.delete(job.lock_path, version=lock_stat.version)
|
||||
kazoo_utils.checked_commit(txn)
|
||||
|
||||
def _state_change_listener(self, state):
|
||||
LOG.debug("Kazoo client has changed to state: %s", state)
|
||||
|
Reference in New Issue
Block a user