diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 644ca453..155ffe41 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -403,3 +403,9 @@ class Connection(base.Connection): def get_logbook(self, book_uuid): return self._run_with_process_lock("book", self._get_logbook, book_uuid) + + def get_flow_details(self, fd_uuid): + return self._get_flow_details(fd_uuid) + + def get_atom_details(self, ad_uuid): + return self._get_atom_details(ad_uuid) diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 5e94afb1..020deff8 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -234,3 +234,17 @@ class Connection(base.Connection): yield book except KeyError: pass + + def get_flow_details(self, fd_uuid): + try: + with self._lock.read_lock(): + return self._memory.flow_details[fd_uuid] + except KeyError: + raise exc.NotFound("No flow details found '%s'" % fd_uuid) + + def get_atom_details(self, ad_uuid): + try: + with self._lock.read_lock(): + return self._memory.atom_details[ad_uuid] + except KeyError: + raise exc.NotFound("No atom details found '%s'" % ad_uuid) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 5796e273..a49d2492 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -558,5 +558,35 @@ class Connection(base.Connection): for book in gathered: yield book + def get_flow_details(self, fd_uuid): + try: + flowdetails = self._tables.flowdetails + with self._engine.begin() as conn: + q = (sql.select([flowdetails]). + where(flowdetails.c.uuid == fd_uuid)) + row = conn.execute(q).first() + if not row: + raise exc.NotFound("No flow details found with uuid" + " '%s'" % fd_uuid) + return self._converter.convert_flow_detail(row) + except sa_exc.SQLAlchemyError as e: + raise exc.StorageFailure("Failed getting flow details with" + " uuid '%s'" % fd_uuid, e) + + def get_atom_details(self, ad_uuid): + try: + atomdetails = self._tables.atomdetails + with self._engine.begin() as conn: + q = (sql.select([atomdetails]). + where(atomdetails.c.uuid == ad_uuid)) + row = conn.execute(q).first() + if not row: + raise exc.NotFound("No atom details found with uuid" + " '%s'" % ad_uuid) + return self._converter.convert_atom_detail(row) + except sa_exc.SQLAlchemyError as e: + raise exc.StorageFailure("Failed getting atom details with" + " uuid '%s'" % ad_uuid, e) + def close(self): pass diff --git a/taskflow/persistence/base.py b/taskflow/persistence/base.py index 00fb29be..0ce09259 100644 --- a/taskflow/persistence/base.py +++ b/taskflow/persistence/base.py @@ -118,6 +118,16 @@ class Connection(object): """Return an iterable of logbook objects.""" pass + @abc.abstractmethod + def get_flow_details(self, fd_uuid): + """Fetches a flowdetails object matching the given uuid.""" + pass + + @abc.abstractmethod + def get_atom_details(self, ad_uuid): + """Fetches a atomdetails object matching the given uuid.""" + pass + def _format_atom(atom_detail): return {