diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 1ecfcef2..867f5369 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -165,13 +165,17 @@ class Notify(Message): except su.ValidationError as e: cls_name = reflection.get_class_name(cls, fully_qualified=False) if response: - raise excp.InvalidFormat("%s message response data not of the" - " expected format: %s" - % (cls_name, e.message), e) + excp.raise_with_cause(excp.InvalidFormat, + "%s message response data not of the" + " expected format: %s" % (cls_name, + e.message), + cause=e) else: - raise excp.InvalidFormat("%s message sender data not of the" - " expected format: %s" - % (cls_name, e.message), e) + excp.raise_with_cause(excp.InvalidFormat, + "%s message sender data not of the" + " expected format: %s" % (cls_name, + e.message), + cause=e) _WorkUnit = collections.namedtuple('_WorkUnit', ['task_cls', 'task_name', @@ -361,9 +365,11 @@ class Request(Message): su.schema_validate(data, cls.SCHEMA) except su.ValidationError as e: cls_name = reflection.get_class_name(cls, fully_qualified=False) - raise excp.InvalidFormat("%s message response data not of the" - " expected format: %s" - % (cls_name, e.message), e) + excp.raise_with_cause(excp.InvalidFormat, + "%s message response data not of the" + " expected format: %s" % (cls_name, + e.message), + cause=e) else: # Validate all failure dictionaries that *may* be present... failures = [] @@ -505,9 +511,11 @@ class Response(Message): su.schema_validate(data, cls.SCHEMA) except su.ValidationError as e: cls_name = reflection.get_class_name(cls, fully_qualified=False) - raise excp.InvalidFormat("%s message response data not of the" - " expected format: %s" - % (cls_name, e.message), e) + excp.raise_with_cause(excp.InvalidFormat, + "%s message response data not of the" + " expected format: %s" % (cls_name, + e.message), + cause=e) else: state = data['state'] if state == FAILURE and 'result' in data: diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 34ec5b42..721c7134 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -17,6 +17,7 @@ import collections import contextlib import functools +import sys import threading from concurrent import futures @@ -120,22 +121,27 @@ class ZookeeperJob(base.Job): return trans_func(attr) else: return attr - except k_exceptions.NoNodeError as e: - raise excp.NotFound("Can not fetch the %r attribute" - " of job %s (%s), path %s not found" - % (attr_name, self.uuid, self.path, path), e) - except self._client.handler.timeout_exception as e: - raise excp.JobFailure("Can not fetch the %r attribute" - " of job %s (%s), operation timed out" - % (attr_name, self.uuid, self.path), e) - except k_exceptions.SessionExpiredError as e: - raise excp.JobFailure("Can not fetch the %r attribute" - " of job %s (%s), session expired" - % (attr_name, self.uuid, self.path), e) - except (AttributeError, k_exceptions.KazooException) as e: - raise excp.JobFailure("Can not fetch the %r attribute" - " of job %s (%s), internal error" % - (attr_name, self.uuid, self.path), e) + except k_exceptions.NoNodeError: + excp.raise_with_cause( + excp.NotFound, + "Can not fetch the %r attribute of job %s (%s)," + " path %s not found" % (attr_name, self.uuid, + self.path, path)) + except self._client.handler.timeout_exception: + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the %r attribute of job %s (%s)," + " operation timed out" % (attr_name, self.uuid, self.path)) + except k_exceptions.SessionExpiredError: + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the %r attribute of job %s (%s)," + " session expired" % (attr_name, self.uuid, self.path)) + except (AttributeError, k_exceptions.KazooException): + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the %r attribute of job %s (%s)," + " internal error" % (attr_name, self.uuid, self.path)) @property def last_modified(self): @@ -189,15 +195,21 @@ class ZookeeperJob(base.Job): job_data = misc.decode_json(raw_data) except k_exceptions.NoNodeError: pass - except k_exceptions.SessionExpiredError as e: - raise excp.JobFailure("Can not fetch the state of %s," - " session expired" % (self.uuid), e) - except self._client.handler.timeout_exception as e: - raise excp.JobFailure("Can not fetch the state of %s," - " operation timed out" % (self.uuid), e) - except k_exceptions.KazooException as e: - raise excp.JobFailure("Can not fetch the state of %s, internal" - " error" % (self.uuid), e) + except k_exceptions.SessionExpiredError: + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the state of %s," + " session expired" % (self.uuid)) + except self._client.handler.timeout_exception: + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the state of %s," + " operation timed out" % (self.uuid)) + except k_exceptions.KazooException: + excp.raise_with_cause( + excp.JobFailure, + "Can not fetch the state of %s," + " internal error" % (self.uuid)) if not job_data: # No data this job has been completed (the owner that we might have # fetched will not be able to be fetched again, since the job node @@ -390,15 +402,17 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def _force_refresh(self): try: children = self._client.get_children(self.path) - except self._client.handler.timeout_exception as e: - raise excp.JobFailure("Refreshing failure, operation timed out", - e) - except k_exceptions.SessionExpiredError as e: - raise excp.JobFailure("Refreshing failure, session expired", e) + except self._client.handler.timeout_exception: + excp.raise_with_cause(excp.JobFailure, + "Refreshing failure, operation timed out") + except k_exceptions.SessionExpiredError: + excp.raise_with_cause(excp.JobFailure, + "Refreshing failure, session expired") except k_exceptions.NoNodeError: pass - except k_exceptions.KazooException as e: - raise excp.JobFailure("Refreshing failure, internal error", e) + except k_exceptions.KazooException: + excp.raise_with_cause(excp.JobFailure, + "Refreshing failure, internal error") else: self._on_job_posting(children, delayed=False) @@ -550,10 +564,11 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): except Exception: owner = None if owner: - msg = "Job %s already claimed by '%s'" % (job.uuid, owner) + message = "Job %s already claimed by '%s'" % (job.uuid, owner) else: - msg = "Job %s already claimed" % (job.uuid) - return excp.UnclaimableJob(msg, cause) + message = "Job %s already claimed" % (job.uuid) + excp.raise_with_cause(excp.UnclaimableJob, + message, cause=cause) with self._wrap(job.uuid, job.path, "Claiming failure: %s"): # NOTE(harlowja): post as json which will allow for future changes @@ -573,21 +588,23 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): try: kazoo_utils.checked_commit(txn) except k_exceptions.NodeExistsError as e: - raise _unclaimable_try_find_owner(e) + _unclaimable_try_find_owner(e) except kazoo_utils.KazooTransactionException as e: if len(e.failures) < 2: raise else: if isinstance(e.failures[0], k_exceptions.NoNodeError): - raise excp.NotFound( + excp.raise_with_cause( + excp.NotFound, "Job %s not found to be claimed" % job.uuid, - e.failures[0]) + cause=e.failures[0]) if isinstance(e.failures[1], k_exceptions.NodeExistsError): - raise _unclaimable_try_find_owner(e.failures[1]) + _unclaimable_try_find_owner(e.failures[1]) else: - raise excp.UnclaimableJob( + excp.raise_with_cause( + excp.UnclaimableJob, "Job %s claim failed due to transaction" - " not succeeding" % (job.uuid), e) + " not succeeding" % (job.uuid), cause=e) @contextlib.contextmanager def _wrap(self, job_uuid, job_path, @@ -603,18 +620,18 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): raise excp.NotFound(fail_msg_tpl % (job_uuid)) try: yield - except self._client.handler.timeout_exception as e: + except self._client.handler.timeout_exception: fail_msg_tpl += ", operation timed out" - raise excp.JobFailure(fail_msg_tpl % (job_uuid), e) - except k_exceptions.SessionExpiredError as e: + excp.raise_with_cause(excp.JobFailure, fail_msg_tpl % (job_uuid)) + except k_exceptions.SessionExpiredError: fail_msg_tpl += ", session expired" - raise excp.JobFailure(fail_msg_tpl % (job_uuid), e) + excp.raise_with_cause(excp.JobFailure, fail_msg_tpl % (job_uuid)) except k_exceptions.NoNodeError: fail_msg_tpl += ", unknown job" - raise excp.NotFound(fail_msg_tpl % (job_uuid)) - except k_exceptions.KazooException as e: + excp.raise_with_cause(excp.NotFound, fail_msg_tpl % (job_uuid)) + except k_exceptions.KazooException: fail_msg_tpl += ", internal error" - raise excp.JobFailure(fail_msg_tpl % (job_uuid), e) + excp.raise_with_cause(excp.JobFailure, fail_msg_tpl % (job_uuid)) def find_owner(self, job): with self._wrap(job.uuid, job.path, "Owner query failure: %s"): @@ -757,8 +774,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): timeout = float(timeout) self._client.start(timeout=timeout) except (self._client.handler.timeout_exception, - k_exceptions.KazooException) as e: - raise excp.JobFailure("Failed to connect to zookeeper", e) + k_exceptions.KazooException): + excp.raise_with_cause(excp.JobFailure, + "Failed to connect to zookeeper") try: if self._conf.get('check_compatible', True): kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION) @@ -777,7 +795,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): with excutils.save_and_reraise_exception(): try_clean() except (self._client.handler.timeout_exception, - k_exceptions.KazooException) as e: - try_clean() - raise excp.JobFailure("Failed to do post-connection" - " initialization", e) + k_exceptions.KazooException): + exc_type, exc, exc_tb = sys.exc_info() + try: + try_clean() + excp.raise_with_cause(excp.JobFailure, + "Failed to do post-connection" + " initialization", cause=exc) + finally: + del(exc_type, exc, exc_tb) diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 5185b2b8..e71d5b9c 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -36,8 +36,12 @@ def _storagefailure_wrapper(): raise except Exception as e: if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT: - raise exc.NotFound('Item not found: %s' % e.filename, e) - raise exc.StorageFailure("Storage backend internal error", e) + exc.raise_with_cause(exc.NotFound, + 'Item not found: %s' % e.filename, + cause=e) + else: + exc.raise_with_cause(exc.StorageFailure, + "Storage backend internal error", cause=e) class DirBackend(path_based.PathBasedBackend): diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index e4d135fa..a8a9b6e4 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -257,10 +257,11 @@ class Connection(path_based.PathBasedConnection): with lock(): try: yield - except exc.TaskFlowException as e: + except exc.TaskFlowException: raise - except Exception as e: - raise exc.StorageFailure("Storage backend internal error", e) + except Exception: + exc.raise_with_cause(exc.StorageFailure, + "Storage backend internal error") def _join_path(self, *parts): return pp.join(*parts) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index d6eeaaba..d9fdf732 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -405,16 +405,18 @@ class Connection(base.Connection): self._metadata.create_all(bind=conn) else: migration.db_sync(conn) - except sa_exc.SQLAlchemyError as e: - raise exc.StorageFailure("Failed upgrading database version", e) + except sa_exc.SQLAlchemyError: + exc.raise_with_cause(exc.StorageFailure, + "Failed upgrading database version") def clear_all(self): try: logbooks = self._tables.logbooks with self._engine.begin() as conn: conn.execute(logbooks.delete()) - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed clearing all entries", e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed clearing all entries") def update_atom_details(self, atom_detail): try: @@ -429,9 +431,10 @@ class Connection(base.Connection): e_ad = self._converter.convert_atom_detail(row) self._update_atom_details(conn, atom_detail, e_ad) return e_ad - except sa_exc.SQLAlchemyError as e: - raise exc.StorageFailure("Failed updating atom details with" - " uuid '%s'" % atom_detail.uuid, e) + except sa_exc.SQLAlchemyError: + exc.raise_with_cause(exc.StorageFailure, + "Failed updating atom details" + " with uuid '%s'" % atom_detail.uuid) def _insert_flow_details(self, conn, fd, parent_uuid): value = fd.to_dict() @@ -479,9 +482,10 @@ class Connection(base.Connection): self._converter.populate_flow_detail(conn, e_fd) self._update_flow_details(conn, flow_detail, e_fd) return e_fd - except sa_exc.SQLAlchemyError as e: - raise exc.StorageFailure("Failed updating flow details with" - " uuid '%s'" % flow_detail.uuid, e) + except sa_exc.SQLAlchemyError: + exc.raise_with_cause(exc.StorageFailure, + "Failed updating flow details with" + " uuid '%s'" % flow_detail.uuid) def destroy_logbook(self, book_uuid): try: @@ -492,9 +496,9 @@ class Connection(base.Connection): if r.rowcount == 0: raise exc.NotFound("No logbook found with" " uuid '%s'" % book_uuid) - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed destroying" - " logbook '%s'" % book_uuid, e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed destroying logbook '%s'" % book_uuid) def save_logbook(self, book): try: @@ -523,9 +527,10 @@ class Connection(base.Connection): for fd in book: self._insert_flow_details(conn, fd, book.uuid) return book - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed saving logbook" - " '%s'" % book.uuid, e) + except sa_exc.DBAPIError: + exc.raise_with_cause( + exc.StorageFailure, + "Failed saving logbook '%s'" % book.uuid) def get_logbook(self, book_uuid, lazy=False): try: @@ -541,9 +546,9 @@ class Connection(base.Connection): if not lazy: self._converter.populate_book(conn, book) return book - except sa_exc.DBAPIError as e: - raise exc.StorageFailure( - "Failed getting logbook '%s'" % book_uuid, e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting logbook '%s'" % book_uuid) def get_logbooks(self, lazy=False): gathered = [] @@ -555,8 +560,9 @@ class Connection(base.Connection): if not lazy: self._converter.populate_book(conn, book) gathered.append(book) - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed getting logbooks", e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting logbooks") for book in gathered: yield book @@ -568,8 +574,10 @@ class Connection(base.Connection): if not lazy: self._converter.populate_flow_detail(conn, fd) gathered.append(fd) - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed getting flow details", e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting flow details in" + " logbook '%s'" % book_uuid) for flow_details in gathered: yield flow_details @@ -587,9 +595,10 @@ class Connection(base.Connection): if not lazy: self._converter.populate_flow_detail(conn, fd) return fd - except sa_exc.SQLAlchemyError as e: - raise exc.StorageFailure("Failed getting flow details with" - " uuid '%s'" % fd_uuid, e) + except sa_exc.SQLAlchemyError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting flow details with" + " uuid '%s'" % fd_uuid) def get_atom_details(self, ad_uuid): try: @@ -602,9 +611,10 @@ class Connection(base.Connection): raise exc.NotFound("No atom details found with uuid" " '%s'" % ad_uuid) return self._converter.convert_atom_detail(row) - except sa_exc.SQLAlchemyError as e: - raise exc.StorageFailure("Failed getting atom details with" - " uuid '%s'" % ad_uuid, e) + except sa_exc.SQLAlchemyError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting atom details with" + " uuid '%s'" % ad_uuid) def get_atoms_for_flow(self, fd_uuid): gathered = [] @@ -612,8 +622,10 @@ class Connection(base.Connection): with contextlib.closing(self._engine.connect()) as conn: for ad in self._converter.atom_query_iter(conn, fd_uuid): gathered.append(ad) - except sa_exc.DBAPIError as e: - raise exc.StorageFailure("Failed getting atom details", e) + except sa_exc.DBAPIError: + exc.raise_with_cause(exc.StorageFailure, + "Failed getting atom details in flow" + " detail '%s'" % fd_uuid) for atom_details in gathered: yield atom_details diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 5626b289..0d7c00ee 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -67,8 +67,9 @@ class ZkBackend(path_based.PathBasedBackend): return try: k_utils.finalize_client(self._client) - except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageFailure("Unable to finalize client", e) + except (k_exc.KazooException, k_exc.ZookeeperError): + exc.raise_with_cause(exc.StorageFailure, + "Unable to finalize client") class ZkConnection(path_based.PathBasedConnection): @@ -90,16 +91,21 @@ class ZkConnection(path_based.PathBasedConnection): """ try: yield - except self._client.handler.timeout_exception as e: - raise exc.StorageFailure("Storage backend timeout", e) - except k_exc.SessionExpiredError as e: - raise exc.StorageFailure("Storage backend session has expired", e) + except self._client.handler.timeout_exception: + exc.raise_with_cause(exc.StorageFailure, + "Storage backend timeout") + except k_exc.SessionExpiredError: + exc.raise_with_cause(exc.StorageFailure, + "Storage backend session has expired") except k_exc.NoNodeError as e: - raise exc.NotFound("Storage backend node not found: %s" % e) + exc.raise_with_cause(exc.NotFound, + "Storage backend node not found: %s" % e) except k_exc.NodeExistsError as e: - raise exc.Duplicate("Storage backend duplicate node: %s" % e) - except (k_exc.KazooException, k_exc.ZookeeperError) as e: - raise exc.StorageFailure("Storage backend internal error", e) + exc.raise_with_cause(exc.Duplicate, + "Storage backend duplicate node: %s" % e) + except (k_exc.KazooException, k_exc.ZookeeperError): + exc.raise_with_cause(exc.StorageFailure, + "Storage backend internal error") def _join_path(self, *parts): return paths.join(*parts) @@ -145,6 +151,6 @@ class ZkConnection(path_based.PathBasedConnection): try: if self._conf.get('check_compatible', True): k_utils.check_compatible(self._client, MIN_ZK_VERSION) - except exc.IncompatibleVersion as e: - raise exc.StorageFailure("Backend storage is not a" - " compatible version", e) + except exc.IncompatibleVersion: + exc.raise_with_cause(exc.StorageFailure, "Backend storage is" + " not a compatible version")