diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index a6237889..a68aa996 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -404,7 +404,6 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): # Skip lock paths or non-job-paths (these are not valid jobs) continue child_paths.append(k_paths.join(self.path, c)) - # Figure out what we really should be investigating and what we # shouldn't (remove jobs that exist in our local version, but don't # exist in the children anymore) and accumulate all paths that we @@ -569,14 +568,29 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): entity_type = entity.kind if entity_type == c_base.Conductor.ENTITY_KIND: entity_path = k_paths.join(self.entity_path, entity_type) - self._client.ensure_path(entity_path) - - conductor_name = entity.name - self._client.create(k_paths.join(entity_path, - conductor_name), - value=misc.binary_encode( - jsonutils.dumps(entity.to_dict())), - ephemeral=True) + try: + self._client.ensure_path(entity_path) + self._client.create(k_paths.join(entity_path, entity.name), + value=misc.binary_encode( + jsonutils.dumps(entity.to_dict())), + ephemeral=True) + except k_exceptions.NodeExistsError: + pass + except self._client.handler.timeout_exception: + excp.raise_with_cause( + excp.JobFailure, + "Can not register entity %s under %s, operation" + " timed out" % (entity.name, entity_path)) + except k_exceptions.SessionExpiredError: + excp.raise_with_cause( + excp.JobFailure, + "Can not register entity %s under %s, session" + " expired" % (entity.name, entity_path)) + except k_exceptions.KazooException: + excp.raise_with_cause( + excp.JobFailure, + "Can not register entity %s under %s, internal" + " error" % (entity.name, entity_path)) else: raise excp.NotImplementedError( "Not implemented for other entity type '%s'" % entity_type)