diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 11b41de6..3ede5938 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -55,10 +55,10 @@ class ZkBackend(base.Backend): raise ValueError("Zookeeper path must be absolute") self._path = path if client is not None: - self._zk = client + self._client = client self._owned = False else: - self._zk = k_utils.make_client(conf) + self._client = k_utils.make_client(conf) self._owned = True self._validated = False @@ -67,7 +67,7 @@ class ZkBackend(base.Backend): return self._path def get_connection(self): - conn = ZkConnection(self, self._zk) + conn = ZkConnection(self, self._client) if not self._validated: conn.validate() self._validated = True @@ -78,11 +78,11 @@ class ZkBackend(base.Backend): if not self._owned: return try: - self._zk.stop() + self._client.stop() except (k_exc.KazooException, k_exc.ZookeeperError) as e: raise exc.StorageError("Unable to stop client: %s" % e) try: - self._zk.close() + self._client.close() except TypeError: # NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167 pass @@ -264,7 +264,7 @@ class ZkConnection(base.Connection): def save_logbook(self, lb): """Save (update) a log_book transactionally.""" - def create_logbook(lb_path, txn): + def _create_logbook(lb_path, txn): lb_data = p_utils.format_logbook(lb, created_at=None) txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) for fd in lb: @@ -285,7 +285,7 @@ class ZkConnection(base.Connection): txn.create(td_path, misc.binary_encode(td_data)) return lb - def update_logbook(lb_path, lb_data, txn): + def _update_logbook(lb_path, lb_data, txn): e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data)) e_lb = p_utils.logbook_merge(e_lb, lb) lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at) @@ -309,36 +309,42 @@ class ZkConnection(base.Connection): lb_data, _zstat = self._client.get(lb_path) except k_exc.NoNodeError: # Create a new logbook since it doesn't exist. - e_lb = create_logbook(lb_path, txn) + e_lb = _create_logbook(lb_path, txn) else: # Otherwise update the existing logbook instead. - e_lb = update_logbook(lb_path, lb_data, txn) + e_lb = _update_logbook(lb_path, lb_data, txn) # Finally return (updated) logbook. return e_lb + def _get_logbook(self, lb_uuid): + lb_path = paths.join(self.book_path, lb_uuid) + try: + lb_data, _zstat = self._client.get(lb_path) + except k_exc.NoNodeError: + raise exc.NotFound("No logbook found with id: %s" % lb_uuid) + else: + lb = p_utils.unformat_logbook(lb_uuid, + misc.decode_json(lb_data)) + for fd_uuid in self._client.get_children(lb_path): + lb.add(self._get_flow_details(fd_uuid)) + return lb + def get_logbook(self, lb_uuid): """Read a logbook. *Read-only*, so no need of zk transaction. """ with self._exc_wrapper(): - lb_path = paths.join(self.book_path, lb_uuid) - try: - lb_data, _zstat = self._client.get(lb_path) - except k_exc.NoNodeError: - raise exc.NotFound("No logbook found with id: %s" % lb_uuid) - else: - lb = p_utils.unformat_logbook(lb_uuid, - misc.decode_json(lb_data)) - for fd_uuid in self._client.get_children(lb_path): - lb.add(self._get_flow_details(fd_uuid)) - return lb + return self._get_logbook(lb_uuid) def get_logbooks(self): - """Read all logbooks. *Read-only*, so no need of zk transaction.""" + """Read all logbooks. + + *Read-only*, so no need of zk transaction. + """ with self._exc_wrapper(): for lb_uuid in self._client.get_children(self.book_path): - yield self.get_logbook(lb_uuid) + yield self._get_logbook(lb_uuid) def destroy_logbook(self, lb_uuid): """Detroy (delete) a log_book transactionally."""