diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index a68aa996..9ee9c627 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -285,16 +285,18 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._emit_notifications = bool(emit_notifications) self._connected = False - def _emit(self, state, details): + def _try_emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo threads # and queue(s)... worker = self._worker - if worker is None: + if worker is None or not self._emit_notifications: + # Worker has been destroyed or we aren't supposed to emit anything + # in the first place... return try: worker.submit(self.notifier.notify, state, details) except RuntimeError: - # Notification thread is shutdown just skip submitting a + # Notification thread is/was shutdown just skip submitting a # notification... pass @@ -353,7 +355,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): job = self._known_jobs.pop(path, None) if job is not None: LOG.debug("Removed job that was at path '%s'", path) - self._emit(base.REMOVAL, details={'job': job}) + self._try_emit(base.REMOVAL, details={'job': job}) def _process_child(self, path, request): """Receives the result of a child data fetch request.""" @@ -393,7 +395,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._known_jobs[path] = job self._job_cond.notify_all() if job is not None: - self._emit(base.POSTED, details={'job': job}) + self._try_emit(base.POSTED, details={'job': job}) def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) @@ -464,7 +466,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): with self._job_cond: self._known_jobs[job_path] = job self._job_cond.notify_all() - self._emit(base.POSTED, details={'job': job}) + self._try_emit(base.POSTED, details={'job': job}) return job @base.check_who