diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 10f529eb..55c7b8eb 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -351,6 +351,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._job_base = k_paths.join(path, JOB_PREFIX) self._worker = None self._emit_notifications = bool(emit_notifications) + self._connected = False def _emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo queue. @@ -715,7 +716,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): @property def connected(self): - return self._client.connected + return self._connected and self._client.connected @lock_utils.locked(lock='_open_close_lock') def close(self): @@ -729,6 +730,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): with self._job_cond: self._known_jobs.clear() LOG.debug("Stopped & cleared local state") + self._connected = False @lock_utils.locked(lock='_open_close_lock') def connect(self, timeout=10.0): @@ -763,6 +765,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self.path, func=self._on_job_posting, allow_session_lost=True) + self._connected = True except excp.IncompatibleVersion: with excutils.save_and_reraise_exception(): try_clean()