diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 008d64c0..4fc7b6eb 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -581,9 +581,10 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): raise excp.JobFailure("Can not consume a job %s" " which is not owned by %s" % (job.uuid, who)) - with self._client.transaction() as txn: - txn.delete(job.lock_path, version=lock_stat.version) - txn.delete(job.path, version=data_stat.version) + txn = self._client.transaction() + txn.delete(job.lock_path, version=lock_stat.version) + txn.delete(job.path, version=data_stat.version) + kazoo_utils.checked_commit(txn) self._remove_job(job.path) def abandon(self, job, who): @@ -600,8 +601,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): raise excp.JobFailure("Can not abandon a job %s" " which is not owned by %s" % (job.uuid, who)) - with self._client.transaction() as txn: - txn.delete(job.lock_path, version=lock_stat.version) + txn = self._client.transaction() + txn.delete(job.lock_path, version=lock_stat.version) + kazoo_utils.checked_commit(txn) def _state_change_listener(self, state): LOG.debug("Kazoo client has changed to state: %s", state)