diff --git a/taskflow/engines/action_engine/graph_analyzer.py b/taskflow/engines/action_engine/graph_analyzer.py index dcede332..7ca7182f 100644 --- a/taskflow/engines/action_engine/graph_analyzer.py +++ b/taskflow/engines/action_engine/graph_analyzer.py @@ -100,7 +100,7 @@ class GraphAnalyzer(object): for prev_task in self._graph.predecessors(task): task_names.append(prev_task.name) - task_states = self._storage.get_tasks_states(task_names) + task_states = self._storage.get_atoms_states(task_names) return all(state == st.SUCCESS and intention == st.EXECUTE for state, intention in six.itervalues(task_states)) @@ -117,7 +117,7 @@ class GraphAnalyzer(object): for prev_task in self._graph.successors(task): task_names.append(prev_task.name) - task_states = self._storage.get_tasks_states(task_names) + task_states = self._storage.get_atoms_states(task_names) return all(state in (st.PENDING, st.REVERTED) for state, intention in six.itervalues(task_states)) @@ -151,4 +151,4 @@ class GraphAnalyzer(object): return True def get_state(self, node): - return self._storage.get_task_state(node.name) + return self._storage.get_atom_state(node.name) diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index 7b1d4fb1..f4b59940 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -37,7 +37,7 @@ class RetryAction(object): return kwargs def change_state(self, retry, state, result=None): - old_state = self._storage.get_task_state(retry.name) + old_state = self._storage.get_atom_state(retry.name) if old_state == state: return state != states.PENDING if state in SAVE_RESULT_STATES: @@ -45,9 +45,8 @@ class RetryAction(object): elif state == states.REVERTED: self._storage.cleanup_retry_history(retry.name, state) else: - self._storage.set_task_state(retry.name, state) - - retry_uuid = self._storage.get_task_uuid(retry.name) + self._storage.set_atom_state(retry.name, state) + retry_uuid = self._storage.get_atom_uuid(retry.name) details = dict(retry_name=retry.name, retry_uuid=retry_uuid, result=result) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 3f05f6e1..3f9bd193 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -32,17 +32,16 @@ class TaskAction(object): self._notifier = notifier def change_state(self, task, state, result=None, progress=None): - old_state = self._storage.get_task_state(task.name) + old_state = self._storage.get_atom_state(task.name) if old_state == state: return state != states.PENDING if state in SAVE_RESULT_STATES: self._storage.save(task.name, result, state) else: - self._storage.set_task_state(task.name, state) + self._storage.set_atom_state(task.name, state) if progress is not None: self._storage.set_task_progress(task.name, progress) - - task_uuid = self._storage.get_task_uuid(task.name) + task_uuid = self._storage.get_atom_uuid(task.name) details = dict(task_name=task.name, task_uuid=task_uuid, result=result) @@ -65,7 +64,7 @@ class TaskAction(object): if not self.change_state(task, states.RUNNING, progress=0.0): return kwargs = self._storage.fetch_mapped_args(task.rebind) - task_uuid = self._storage.get_task_uuid(task.name) + task_uuid = self._storage.get_atom_uuid(task.name) return self._task_executor.execute_task(task, task_uuid, kwargs, self._on_update_progress) @@ -80,7 +79,7 @@ class TaskAction(object): if not self.change_state(task, states.REVERTING, progress=0.0): return kwargs = self._storage.fetch_mapped_args(task.rebind) - task_uuid = self._storage.get_task_uuid(task.name) + task_uuid = self._storage.get_atom_uuid(task.name) task_result = self._storage.get(task.name) failures = self._storage.get_failures() future = self._task_executor.revert_task(task, task_uuid, kwargs, diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index f93e815c..70859053 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -22,7 +22,6 @@ from concurrent import futures from taskflow.engines.action_engine import executor from taskflow.utils import misc -from taskflow.utils import persistence_utils as pu from taskflow.utils import reflection # NOTE(skudriashev): This is protocol events, not related to the task states. @@ -146,14 +145,14 @@ class Request(Message): if 'result' in self._kwargs: result = self._kwargs['result'] if isinstance(result, misc.Failure): - request['result'] = ('failure', pu.failure_to_dict(result)) + request['result'] = ('failure', result.to_dict()) else: request['result'] = ('success', result) if 'failures' in self._kwargs: failures = self._kwargs['failures'] request['failures'] = {} for task, failure in six.iteritems(failures): - request['failures'][task] = pu.failure_to_dict(failure) + request['failures'][task] = failure.to_dict() return request def set_result(self, result): @@ -183,7 +182,7 @@ class Response(Message): state = data['state'] data = data['data'] if state == FAILURE and 'result' in data: - data['result'] = pu.failure_from_dict(data['result']) + data['result'] = misc.Failure.from_dict(data['result']) return cls(state, **data) @property diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index ff92603b..02f56647 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -22,7 +22,6 @@ from kombu import exceptions as kombu_exc from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow.utils import misc -from taskflow.utils import persistence_utils as pu LOG = logging.getLogger(__name__) @@ -86,13 +85,13 @@ class Server(object): if result is not None: data_type, data = result if data_type == 'failure': - action_args['result'] = pu.failure_from_dict(data) + action_args['result'] = misc.Failure.from_dict(data) else: action_args['result'] = data if failures is not None: action_args['failures'] = {} for k, v in failures.items(): - action_args['failures'][k] = pu.failure_from_dict(v) + action_args['failures'][k] = misc.Failure.from_dict(v) return task_cls, action, action_args @staticmethod @@ -164,7 +163,7 @@ class Server(object): except ValueError: with misc.capture_failure() as failure: LOG.exception("Failed to parse request") - reply_callback(result=pu.failure_to_dict(failure)) + reply_callback(result=failure.to_dict()) return # get task endpoint @@ -174,7 +173,7 @@ class Server(object): with misc.capture_failure() as failure: LOG.exception("The '%s' task endpoint does not exist", task_cls) - reply_callback(result=pu.failure_to_dict(failure)) + reply_callback(result=failure.to_dict()) return else: reply_callback(state=pr.RUNNING) @@ -185,10 +184,10 @@ class Server(object): except Exception: with misc.capture_failure() as failure: LOG.exception("The %s task execution failed", endpoint) - reply_callback(result=pu.failure_to_dict(failure)) + reply_callback(result=failure.to_dict()) else: if isinstance(result, misc.Failure): - reply_callback(result=pu.failure_to_dict(result)) + reply_callback(result=result.to_dict()) else: reply_callback(state=pr.SUCCESS, result=result) diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index 449488c5..c6cb3cd4 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -48,7 +48,7 @@ class TimingListener(base.ListenerBase): } try: # Don't let storage failures throw exceptions in a listener method. - self._engine.storage.update_task_metadata(task_name, meta_update) + self._engine.storage.update_atom_metadata(task_name, meta_update) except excp.StorageError: LOG.warn("Failure to store duration update %s for task %s", meta_update, task_name, exc_info=True) diff --git a/taskflow/persistence/backends/base.py b/taskflow/persistence/backends/base.py index 4287c98a..58aa3554 100644 --- a/taskflow/persistence/backends/base.py +++ b/taskflow/persistence/backends/base.py @@ -18,6 +18,8 @@ import abc import six +from taskflow.persistence import logbook + @six.add_metaclass(abc.ABCMeta) class Backend(object): @@ -75,11 +77,11 @@ class Connection(object): pass @abc.abstractmethod - def update_task_details(self, task_detail): - """Updates a given task details and returns the updated version. + def update_atom_details(self, atom_detail): + """Updates a given atom details and returns the updated version. NOTE(harlowja): the details that is to be updated must already have - been created by saving a flow details with the given task detail inside + been created by saving a flow details with the given atom detail inside of it. """ pass @@ -113,3 +115,10 @@ class Connection(object): def get_logbooks(self): """Return an iterable of logbook objects.""" pass + + +def _format_atom(atom_detail): + return { + 'atom': atom_detail.to_dict(), + 'type': logbook.atom_detail_type(atom_detail), + } diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 20431ff6..7c0b3c9b 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -25,9 +25,9 @@ import six from taskflow import exceptions as exc from taskflow.openstack.common import jsonutils from taskflow.persistence.backends import base +from taskflow.persistence import logbook from taskflow.utils import lock_utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) @@ -64,7 +64,7 @@ class Connection(base.Connection): self._backend = backend self._file_cache = self._backend._file_cache self._flow_path = os.path.join(self._backend.base_path, 'flows') - self._task_path = os.path.join(self._backend.base_path, 'tasks') + self._atom_path = os.path.join(self._backend.base_path, 'atoms') self._book_path = os.path.join(self._backend.base_path, 'books') def validate(self): @@ -73,7 +73,7 @@ class Connection(base.Connection): self._backend.base_path, self._backend.lock_path, self._flow_path, - self._task_path, + self._atom_path, self._book_path, ] for p in paths: @@ -141,37 +141,38 @@ class Connection(base.Connection): def close(self): pass - def _save_task_details(self, task_detail, ignore_missing): - # See if we have an existing task detail to merge with. - e_td = None + def _save_atom_details(self, atom_detail, ignore_missing): + # See if we have an existing atom detail to merge with. + e_ad = None try: - e_td = self._get_task_details(task_detail.uuid, lock=False) + e_ad = self._get_atom_details(atom_detail.uuid, lock=False) except EnvironmentError: if not ignore_missing: - raise exc.NotFound("No task details found with id: %s" - % task_detail.uuid) - if e_td is not None: - task_detail = p_utils.task_details_merge(e_td, task_detail) - td_path = os.path.join(self._task_path, task_detail.uuid) - td_data = p_utils.format_task_detail(task_detail) - self._write_to(td_path, jsonutils.dumps(td_data)) - return task_detail + raise exc.NotFound("No atom details found with id: %s" + % atom_detail.uuid) + if e_ad is not None: + atom_detail = e_ad.merge(atom_detail) + ad_path = os.path.join(self._atom_path, atom_detail.uuid) + ad_data = base._format_atom(atom_detail) + self._write_to(ad_path, jsonutils.dumps(ad_data)) + return atom_detail - def update_task_details(self, task_detail): - return self._run_with_process_lock("task", - self._save_task_details, - task_detail, + def update_atom_details(self, atom_detail): + return self._run_with_process_lock("atom", + self._save_atom_details, + atom_detail, ignore_missing=False) - def _get_task_details(self, uuid, lock=True): + def _get_atom_details(self, uuid, lock=True): def _get(): - td_path = os.path.join(self._task_path, uuid) - td_data = misc.decode_json(self._read_from(td_path)) - return p_utils.unformat_task_detail(uuid, td_data) + ad_path = os.path.join(self._atom_path, uuid) + ad_data = misc.decode_json(self._read_from(ad_path)) + ad_cls = logbook.atom_detail_class(ad_data['type']) + return ad_cls.from_dict(ad_data['atom']) if lock: - return self._run_with_process_lock('task', _get) + return self._run_with_process_lock('atom', _get) else: return _get() @@ -181,17 +182,17 @@ class Connection(base.Connection): fd_path = os.path.join(self._flow_path, uuid) meta_path = os.path.join(fd_path, 'metadata') meta = misc.decode_json(self._read_from(meta_path)) - fd = p_utils.unformat_flow_detail(uuid, meta) - td_to_load = [] - td_path = os.path.join(fd_path, 'tasks') + fd = logbook.FlowDetail.from_dict(meta) + ad_to_load = [] + ad_path = os.path.join(fd_path, 'atoms') try: - td_to_load = [f for f in os.listdir(td_path) - if os.path.islink(os.path.join(td_path, f))] + ad_to_load = [f for f in os.listdir(ad_path) + if os.path.islink(os.path.join(ad_path, f))] except EnvironmentError as e: if e.errno != errno.ENOENT: raise - for t_uuid in td_to_load: - fd.add(self._get_task_details(t_uuid)) + for ad_uuid in ad_to_load: + fd.add(self._get_atom_details(ad_uuid)) return fd if lock: @@ -199,13 +200,13 @@ class Connection(base.Connection): else: return _get() - def _save_tasks_and_link(self, task_details, local_task_path): - for task_detail in task_details: - self._save_task_details(task_detail, ignore_missing=True) - src_td_path = os.path.join(self._task_path, task_detail.uuid) - target_td_path = os.path.join(local_task_path, task_detail.uuid) + def _save_atoms_and_link(self, atom_details, local_atom_path): + for atom_detail in atom_details: + self._save_atom_details(atom_detail, ignore_missing=True) + src_ad_path = os.path.join(self._atom_path, atom_detail.uuid) + target_ad_path = os.path.join(local_atom_path, atom_detail.uuid) try: - os.symlink(src_td_path, target_td_path) + os.symlink(src_ad_path, target_ad_path) except EnvironmentError as e: if e.errno != errno.EEXIST: raise @@ -220,22 +221,21 @@ class Connection(base.Connection): raise exc.NotFound("No flow details found with id: %s" % flow_detail.uuid) if e_fd is not None: - e_fd = p_utils.flow_details_merge(e_fd, flow_detail) - for td in flow_detail: - if e_fd.find(td.uuid) is None: - e_fd.add(td) + e_fd = e_fd.merge(flow_detail) + for ad in flow_detail: + if e_fd.find(ad.uuid) is None: + e_fd.add(ad) flow_detail = e_fd flow_path = os.path.join(self._flow_path, flow_detail.uuid) misc.ensure_tree(flow_path) - self._write_to( - os.path.join(flow_path, 'metadata'), - jsonutils.dumps(p_utils.format_flow_detail(flow_detail))) + self._write_to(os.path.join(flow_path, 'metadata'), + jsonutils.dumps(flow_detail.to_dict())) if len(flow_detail): - task_path = os.path.join(flow_path, 'tasks') - misc.ensure_tree(task_path) - self._run_with_process_lock('task', - self._save_tasks_and_link, - list(flow_detail), task_path) + atom_path = os.path.join(flow_path, 'atoms') + misc.ensure_tree(atom_path) + self._run_with_process_lock('atom', + self._save_atoms_and_link, + list(flow_detail), atom_path) return flow_detail def update_flow_details(self, flow_detail): @@ -263,18 +263,15 @@ class Connection(base.Connection): except exc.NotFound: pass if e_lb is not None: - e_lb = p_utils.logbook_merge(e_lb, book) + e_lb = e_lb.merge(book) for fd in book: if e_lb.find(fd.uuid) is None: e_lb.add(fd) book = e_lb book_path = os.path.join(self._book_path, book.uuid) misc.ensure_tree(book_path) - created_at = None - if e_lb is not None: - created_at = e_lb.created_at - self._write_to(os.path.join(book_path, 'metadata'), jsonutils.dumps( - p_utils.format_logbook(book, created_at=created_at))) + self._write_to(os.path.join(book_path, 'metadata'), + jsonutils.dumps(book.to_dict(marshal_time=True))) if len(book): flow_path = os.path.join(book_path, 'flows') misc.ensure_tree(flow_path) @@ -290,7 +287,7 @@ class Connection(base.Connection): def upgrade(self): def _step_create(): - for path in (self._book_path, self._flow_path, self._task_path): + for path in (self._book_path, self._flow_path, self._atom_path): try: misc.ensure_tree(path) except EnvironmentError as e: @@ -310,15 +307,15 @@ class Connection(base.Connection): def clear_all(self): def _step_clear(): - for d in (self._book_path, self._flow_path, self._task_path): + for d in (self._book_path, self._flow_path, self._atom_path): if os.path.isdir(d): shutil.rmtree(d) - def _step_task(): - self._run_with_process_lock("task", _step_clear) + def _step_atom(): + self._run_with_process_lock("atom", _step_clear) def _step_flow(): - self._run_with_process_lock("flow", _step_task) + self._run_with_process_lock("flow", _step_atom) def _step_book(): self._run_with_process_lock("book", _step_flow) @@ -328,21 +325,21 @@ class Connection(base.Connection): def destroy_logbook(self, book_uuid): - def _destroy_tasks(task_details): - for task_detail in task_details: - task_path = os.path.join(self._task_path, task_detail.uuid) + def _destroy_atoms(atom_details): + for atom_detail in atom_details: + atom_path = os.path.join(self._atom_path, atom_detail.uuid) try: - shutil.rmtree(task_path) + shutil.rmtree(atom_path) except EnvironmentError as e: if e.errno != errno.ENOENT: - raise exc.StorageFailure("Unable to remove task" - " directory %s" % task_path, + raise exc.StorageFailure("Unable to remove atom" + " directory %s" % atom_path, e) def _destroy_flows(flow_details): for flow_detail in flow_details: flow_path = os.path.join(self._flow_path, flow_detail.uuid) - self._run_with_process_lock("task", _destroy_tasks, + self._run_with_process_lock("atom", _destroy_atoms, list(flow_detail)) try: shutil.rmtree(flow_path) @@ -376,7 +373,7 @@ class Connection(base.Connection): raise exc.NotFound("No logbook found with id: %s" % book_uuid) else: raise - lb = p_utils.unformat_logbook(book_uuid, meta) + lb = logbook.LogBook.from_dict(meta, unmarshal_time=True) fd_path = os.path.join(book_path, 'flows') fd_uuids = [] try: diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index b42ebdc1..d3ad18fa 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -19,11 +19,11 @@ import logging +import six + from taskflow import exceptions as exc -from taskflow.openstack.common import timeutils from taskflow.persistence.backends import base from taskflow.persistence import logbook -from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) @@ -36,7 +36,7 @@ class MemoryBackend(base.Backend): super(MemoryBackend, self).__init__(conf) self._log_books = {} self._flow_details = {} - self._task_details = {} + self._atom_details = {} @property def log_books(self): @@ -47,8 +47,8 @@ class MemoryBackend(base.Backend): return self._flow_details @property - def task_details(self): - return self._task_details + def atom_details(self): + return self._atom_details def get_connection(self): return Connection(self) @@ -76,8 +76,8 @@ class Connection(base.Connection): def clear_all(self): count = 0 - for uuid in list(self.backend.log_books.keys()): - self.destroy_logbook(uuid) + for book_uuid in list(six.iterkeys(self.backend.log_books)): + self.destroy_logbook(book_uuid) count += 1 return count @@ -87,33 +87,27 @@ class Connection(base.Connection): lb = self.backend.log_books.pop(book_uuid) for fd in lb: self.backend.flow_details.pop(fd.uuid, None) - for td in fd: - self.backend.task_details.pop(td.uuid, None) + for ad in fd: + self.backend.atom_details.pop(ad.uuid, None) except KeyError: raise exc.NotFound("No logbook found with id: %s" % book_uuid) - def update_task_details(self, task_detail): + def update_atom_details(self, atom_detail): try: - e_td = self.backend.task_details[task_detail.uuid] + e_ad = self.backend.atom_details[atom_detail.uuid] except KeyError: - raise exc.NotFound("No task details found with id: %s" - % task_detail.uuid) - return p_utils.task_details_merge(e_td, task_detail, deep_copy=True) + raise exc.NotFound("No atom details found with id: %s" + % atom_detail.uuid) + return e_ad.merge(atom_detail, deep_copy=True) - def _save_flowdetail_tasks(self, e_fd, flow_detail): - for task_detail in flow_detail: - e_td = e_fd.find(task_detail.uuid) - if e_td is None: - if task_detail.atom_type == logbook.TASK_DETAIL: - e_td = logbook.TaskDetail(name=task_detail.name, - uuid=task_detail.uuid) - else: - e_td = logbook.RetryDetail(name=task_detail.name, - uuid=task_detail.uuid) - e_fd.add(e_td) - if task_detail.uuid not in self.backend.task_details: - self.backend.task_details[task_detail.uuid] = e_td - p_utils.task_details_merge(e_td, task_detail, deep_copy=True) + def _save_flowdetail_atoms(self, e_fd, flow_detail): + for atom_detail in flow_detail: + e_ad = e_fd.find(atom_detail.uuid) + if e_ad is None: + e_fd.add(atom_detail) + self.backend.atom_details[atom_detail.uuid] = atom_detail + else: + e_ad.merge(atom_detail, deep_copy=True) def update_flow_details(self, flow_detail): try: @@ -121,8 +115,8 @@ class Connection(base.Connection): except KeyError: raise exc.NotFound("No flow details found with id: %s" % flow_detail.uuid) - p_utils.flow_details_merge(e_fd, flow_detail, deep_copy=True) - self._save_flowdetail_tasks(e_fd, flow_detail) + e_fd.merge(flow_detail, deep_copy=True) + self._save_flowdetail_atoms(e_fd, flow_detail) return e_fd def save_logbook(self, book): @@ -130,28 +124,21 @@ class Connection(base.Connection): try: e_lb = self.backend.log_books[book.uuid] except KeyError: - e_lb = logbook.LogBook(book.name, book.uuid, - updated_at=book.updated_at, - created_at=timeutils.utcnow()) + e_lb = logbook.LogBook(book.name, uuid=book.uuid) self.backend.log_books[e_lb.uuid] = e_lb - else: - # TODO(harlowja): figure out a better way to set this property - # without actually setting a 'private' property. - e_lb._updated_at = timeutils.utcnow() - p_utils.logbook_merge(e_lb, book, deep_copy=True) + e_lb.merge(book, deep_copy=True) # Add anything in to the new logbook that isn't already in the existing # logbook. for flow_detail in book: try: e_fd = self.backend.flow_details[flow_detail.uuid] except KeyError: - e_fd = logbook.FlowDetail(name=flow_detail.name, - uuid=flow_detail.uuid) - e_lb.add(flow_detail) - self.backend.flow_details[flow_detail.uuid] = e_fd - p_utils.flow_details_merge(e_fd, flow_detail, deep_copy=True) - self._save_flowdetail_tasks(e_fd, flow_detail) + e_fd = logbook.FlowDetail(flow_detail.name, flow_detail.uuid) + e_lb.add(e_fd) + self.backend.flow_details[e_fd.uuid] = e_fd + e_fd.merge(flow_detail, deep_copy=True) + self._save_flowdetail_atoms(e_fd, flow_detail) return e_lb def get_logbook(self, book_uuid): @@ -160,9 +147,6 @@ class Connection(base.Connection): except KeyError: raise exc.NotFound("No logbook found with id: %s" % book_uuid) - def _get_logbooks(self): - return list(self.backend.log_books.values()) - def get_logbooks(self): - for lb in self._get_logbooks(): + for lb in list(six.itervalues(self.backend.log_books)): yield lb diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index c61c357e..2067ff5a 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -37,7 +37,6 @@ from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence import logbook from taskflow.utils import eventlet_utils from taskflow.utils import misc -from taskflow.utils import persistence_utils LOG = logging.getLogger(__name__) @@ -347,17 +346,17 @@ class Connection(base.Connection): def clear_all(self): return self._run_in_session(self._clear_all) - def _update_task_details(self, session, td): - # Must already exist since a tasks details has a strong connection to - # a flow details, and tasks details can not be saved on there own since - # they *must* have a connection to an existing flow details. - td_m = _task_details_get_model(td.uuid, session=session) - td_m = _taskdetails_merge(td_m, td) - td_m = session.merge(td_m) - return _convert_td_to_external(td_m) + def _update_atom_details(self, session, ad): + # Must already exist since a atoms details has a strong connection to + # a flow details, and atom details can not be saved on there own since + # they *must* have a connection to an existing flow detail. + ad_m = _atom_details_get_model(ad.uuid, session=session) + ad_m = _atomdetails_merge(ad_m, ad) + ad_m = session.merge(ad_m) + return _convert_ad_to_external(ad_m) - def update_task_details(self, task_detail): - return self._run_in_session(self._update_task_details, td=task_detail) + def update_atom_details(self, atom_detail): + return self._run_in_session(self._update_atom_details, ad=atom_detail) def _update_flow_details(self, session, fd): # Must already exist since a flow details has a strong connection to @@ -432,12 +431,63 @@ class Connection(base.Connection): ### +def _atomdetails_merge(ad_m, ad): + atom_type = logbook.atom_detail_type(ad) + if atom_type != ad_m.atom_type: + raise exc.StorageError("Can not merge differing atom types (%s != %s)" + % (atom_type, ad_m.atom_type)) + ad_d = ad.to_dict() + ad_m.state = ad_d['state'] + ad_m.intention = ad_d['intention'] + ad_m.results = ad_d['results'] + ad_m.version = ad_d['version'] + ad_m.failure = ad_d['failure'] + ad_m.meta = ad_d['meta'] + ad_m.name = ad_d['name'] + return ad_m + + +def _flowdetails_merge(fd_m, fd): + fd_d = fd.to_dict() + fd_m.state = fd_d['state'] + fd_m.name = fd_d['name'] + fd_m.meta = fd_d['meta'] + for ad in fd: + existing_ad = False + for ad_m in fd_m.atomdetails: + if ad_m.uuid == ad.uuid: + existing_ad = True + ad_m = _atomdetails_merge(ad_m, ad) + break + if not existing_ad: + ad_m = _convert_ad_to_internal(ad, fd_m.uuid) + fd_m.atomdetails.append(ad_m) + return fd_m + + +def _logbook_merge(lb_m, lb): + lb_d = lb.to_dict() + lb_m.meta = lb_d['meta'] + lb_m.name = lb_d['name'] + lb_m.created_at = lb_d['created_at'] + lb_m.updated_at = lb_d['updated_at'] + for fd in lb: + existing_fd = False + for fd_m in lb_m.flowdetails: + if fd_m.uuid == fd.uuid: + existing_fd = True + fd_m = _flowdetails_merge(fd_m, fd) + if not existing_fd: + lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid)) + return lb_m + + def _convert_fd_to_external(fd): fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid) fd_c.meta = fd.meta fd_c.state = fd.state - for td in fd.taskdetails: - fd_c.add(_convert_td_to_external(td)) + for ad_m in fd.atomdetails: + fd_c.add(_convert_ad_to_external(ad_m)) return fd_c @@ -445,47 +495,40 @@ def _convert_fd_to_internal(fd, parent_uuid): fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid, parent_uuid=parent_uuid, meta=fd.meta, state=fd.state) - fd_m.taskdetails = [] - for td in fd: - fd_m.taskdetails.append(_convert_td_to_internal(td, fd_m.uuid)) + fd_m.atomdetails = [] + for ad in fd: + fd_m.atomdetails.append(_convert_ad_to_internal(ad, fd_m.uuid)) return fd_m -def _convert_td_to_internal(td, parent_uuid): - results = td.results - if td.atom_type == logbook.RETRY_DETAIL: - results = persistence_utils.encode_retry_results(results) - return models.TaskDetail(name=td.name, uuid=td.uuid, - atom_type=td.atom_type, - intention=td.intention, - state=td.state, results=results, - failure=td.failure, meta=td.meta, - version=td.version, parent_uuid=parent_uuid) +def _convert_ad_to_internal(ad, parent_uuid): + converted = ad.to_dict() + converted['atom_type'] = logbook.atom_detail_type(ad) + converted['parent_uuid'] = parent_uuid + return models.AtomDetail(**converted) -def _convert_td_to_external(td): +def _convert_ad_to_external(ad): # Convert from sqlalchemy model -> external model, this allows us # to change the internal sqlalchemy model easily by forcing a defined # interface (that isn't the sqlalchemy model itself). - results = td.results - if td.atom_type == logbook.RETRY_DETAIL: - results = persistence_utils.decode_retry_results(results) - atom_cls = logbook.get_atom_detail_class(td.atom_type) - td_c = atom_cls(td.name, uuid=td.uuid) - td_c.state = td.state - td_c.intention = td.intention - td_c.results = results - td_c.failure = td.failure - td_c.meta = td.meta - td_c.version = td.version - return td_c + atom_cls = logbook.atom_detail_class(ad.atom_type) + return atom_cls.from_dict({ + 'state': ad.state, + 'intention': ad.intention, + 'results': ad.results, + 'failure': ad.failure, + 'meta': ad.meta, + 'version': ad.version, + 'name': ad.name, + 'uuid': ad.uuid, + }) def _convert_lb_to_external(lb_m): - """Don't expose the internal sqlalchemy ORM model to the external api.""" - lb_c = logbook.LogBook(lb_m.name, lb_m.uuid, - updated_at=lb_m.updated_at, - created_at=lb_m.created_at) + lb_c = logbook.LogBook(lb_m.name, lb_m.uuid) + lb_c.updated_at = lb_m.updated_at + lb_c.created_at = lb_m.created_at lb_c.meta = lb_m.meta for fd_m in lb_m.flowdetails: lb_c.add(_convert_fd_to_external(fd_m)) @@ -493,7 +536,6 @@ def _convert_lb_to_external(lb_m): def _convert_lb_to_internal(lb_c): - """Don't expose the external model to the sqlalchemy ORM model.""" lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name) lb_m.flowdetails = [] for fd_c in lb_c: @@ -508,48 +550,15 @@ def _logbook_get_model(lb_id, session): return entry -def _flow_details_get_model(f_id, session): - entry = session.query(models.FlowDetail).filter_by(uuid=f_id).first() +def _flow_details_get_model(flow_id, session): + entry = session.query(models.FlowDetail).filter_by(uuid=flow_id).first() if entry is None: - raise exc.NotFound("No flow details found with id: %s" % f_id) + raise exc.NotFound("No flow details found with id: %s" % flow_id) return entry -def _task_details_get_model(t_id, session): - entry = session.query(models.TaskDetail).filter_by(uuid=t_id).first() +def _atom_details_get_model(atom_id, session): + entry = session.query(models.AtomDetail).filter_by(uuid=atom_id).first() if entry is None: - raise exc.NotFound("No task details found with id: %s" % t_id) + raise exc.NotFound("No atom details found with id: %s" % atom_id) return entry - - -def _logbook_merge(lb_m, lb): - lb_m = persistence_utils.logbook_merge(lb_m, lb) - for fd in lb: - existing_fd = False - for fd_m in lb_m.flowdetails: - if fd_m.uuid == fd.uuid: - existing_fd = True - fd_m = _flowdetails_merge(fd_m, fd) - if not existing_fd: - lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid)) - return lb_m - - -def _flowdetails_merge(fd_m, fd): - fd_m = persistence_utils.flow_details_merge(fd_m, fd) - for td in fd: - existing_td = False - for td_m in fd_m.taskdetails: - if td_m.uuid == td.uuid: - existing_td = True - td_m = _taskdetails_merge(td_m, td) - break - if not existing_td: - td_m = _convert_td_to_internal(td, fd_m.uuid) - fd_m.taskdetails.append(td_m) - return fd_m - - -def _taskdetails_merge(td_m, td): - td_i = _convert_td_to_internal(td, td_m.parent_uuid) - return persistence_utils.task_details_merge(td_m, td_i) diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 667379cf..8f42374c 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -26,7 +26,6 @@ from taskflow.persistence.backends import base from taskflow.persistence import logbook from taskflow.utils import kazoo_utils as k_utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) @@ -87,7 +86,7 @@ class ZkConnection(base.Connection): self._client = client self._book_path = paths.join(self._backend.path, "books") self._flow_path = paths.join(self._backend.path, "flow_details") - self._task_path = paths.join(self._backend.path, "task_details") + self._atom_path = paths.join(self._backend.path, "atom_details") with self._exc_wrapper(): # NOOP if already started. self._client.start() @@ -113,8 +112,8 @@ class ZkConnection(base.Connection): return self._flow_path @property - def task_path(self): - return self._task_path + def atom_path(self): + return self._atom_path def close(self): pass @@ -122,7 +121,7 @@ class ZkConnection(base.Connection): def upgrade(self): """Creates the initial paths (if they already don't exist).""" with self._exc_wrapper(): - for path in (self.book_path, self.flow_path, self.task_path): + for path in (self.book_path, self.flow_path, self.atom_path): self._client.ensure_path(path) @contextlib.contextmanager @@ -144,56 +143,61 @@ class ZkConnection(base.Connection): except (k_exc.KazooException, k_exc.ZookeeperError) as e: raise exc.StorageFailure("Storage backend internal error", e) - def update_task_details(self, td): - """Update a task_detail transactionally.""" + def update_atom_details(self, ad): + """Update a atom detail transactionally.""" with self._exc_wrapper(): with self._client.transaction() as txn: - return self._update_task_details(td, txn) + return self._update_atom_details(ad, txn) - def _update_task_details(self, td, txn, create_missing=False): + def _update_atom_details(self, ad, txn, create_missing=False): # Determine whether the desired data exists or not. - td_path = paths.join(self.task_path, td.uuid) + ad_path = paths.join(self.atom_path, ad.uuid) + e_ad = None try: - td_data, _zstat = self._client.get(td_path) + ad_data, _zstat = self._client.get(ad_path) except k_exc.NoNodeError: # Not-existent: create or raise exception. - if create_missing: - txn.create(td_path) - e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid) - else: - raise exc.NotFound("No task details found with id: %s" - % td.uuid) + raise exc.NotFound("No atom details found with id: %s" % ad.uuid) else: # Existent: read it out. - e_td = p_utils.unformat_task_detail(td.uuid, - misc.decode_json(td_data)) + try: + ad_data = misc.decode_json(ad_data) + ad_cls = logbook.atom_detail_class(ad_data['type']) + e_ad = ad_cls.from_dict(ad_data['atom']) + except KeyError: + pass # Update and write it back - e_td = p_utils.task_details_merge(e_td, td) - td_data = p_utils.format_task_detail(e_td) - txn.set_data(td_path, misc.binary_encode(jsonutils.dumps(td_data))) - return e_td + if e_ad: + e_ad = e_ad.merge(ad) + else: + e_ad = ad + ad_data = base._format_atom(e_ad) + txn.set_data(ad_path, + misc.binary_encode(jsonutils.dumps(ad_data))) + return e_ad - def get_task_details(self, td_uuid): - """Read a taskdetail. + def get_atom_details(self, ad_uuid): + """Read a atom detail. *Read-only*, so no need of zk transaction. """ with self._exc_wrapper(): - return self._get_task_details(td_uuid) + return self._get_atom_details(ad_uuid) - def _get_task_details(self, td_uuid): - td_path = paths.join(self.task_path, td_uuid) + def _get_atom_details(self, ad_uuid): + ad_path = paths.join(self.atom_path, ad_uuid) try: - td_data, _zstat = self._client.get(td_path) + ad_data, _zstat = self._client.get(ad_path) except k_exc.NoNodeError: - raise exc.NotFound("No task details found with id: %s" % td_uuid) + raise exc.NotFound("No atom details found with id: %s" % ad_uuid) else: - return p_utils.unformat_task_detail(td_uuid, - misc.decode_json(td_data)) + ad_data = misc.decode_json(ad_data) + ad_cls = logbook.atom_detail_class(ad_data['type']) + return ad_cls.from_dict(ad_data['atom']) def update_flow_details(self, fd): - """Update a flowdetail transactionally.""" + """Update a flow detail transactionally.""" with self._exc_wrapper(): with self._client.transaction() as txn: return self._update_flow_details(fd, txn) @@ -213,25 +217,24 @@ class ZkConnection(base.Connection): % fd.uuid) else: # Existent: read it out - e_fd = p_utils.unformat_flow_detail(fd.uuid, - misc.decode_json(fd_data)) + e_fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data)) # Update and write it back - e_fd = p_utils.flow_details_merge(e_fd, fd) - fd_data = p_utils.format_flow_detail(e_fd) + e_fd = e_fd.merge(fd) + fd_data = e_fd.to_dict() txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data))) - for td in fd: - td_path = paths.join(fd_path, td.uuid) + for ad in fd: + ad_path = paths.join(fd_path, ad.uuid) # NOTE(harlowja): create an entry in the flow detail path - # for the provided task detail so that a reference exists - # from the flow detail to its task details. - if not self._client.exists(td_path): - txn.create(td_path) - e_fd.add(self._update_task_details(td, txn, create_missing=True)) + # for the provided atom detail so that a reference exists + # from the flow detail to its atom details. + if not self._client.exists(ad_path): + txn.create(ad_path) + e_fd.add(self._update_atom_details(ad, txn, create_missing=True)) return e_fd def get_flow_details(self, fd_uuid): - """Read a flowdetail. + """Read a flow detail. *Read-only*, so no need of zk transaction. """ @@ -245,16 +248,16 @@ class ZkConnection(base.Connection): except k_exc.NoNodeError: raise exc.NotFound("No flow details found with id: %s" % fd_uuid) - fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(fd_data)) - for td_uuid in self._client.get_children(fd_path): - fd.add(self._get_task_details(td_uuid)) + fd = logbook.FlowDetail.from_dict(misc.decode_json(fd_data)) + for ad_uuid in self._client.get_children(fd_path): + fd.add(self._get_atom_details(ad_uuid)) return fd def save_logbook(self, lb): """Save (update) a log_book transactionally.""" def _create_logbook(lb_path, txn): - lb_data = p_utils.format_logbook(lb, created_at=None) + lb_data = lb.to_dict(marshal_time=True) txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) for fd in lb: # NOTE(harlowja): create an entry in the logbook path @@ -262,22 +265,24 @@ class ZkConnection(base.Connection): # from the logbook to its flow details. txn.create(paths.join(lb_path, fd.uuid)) fd_path = paths.join(self.flow_path, fd.uuid) - fd_data = jsonutils.dumps(p_utils.format_flow_detail(fd)) + fd_data = jsonutils.dumps(fd.to_dict()) txn.create(fd_path, misc.binary_encode(fd_data)) - for td in fd: + for ad in fd: # NOTE(harlowja): create an entry in the flow detail path - # for the provided task detail so that a reference exists - # from the flow detail to its task details. - txn.create(paths.join(fd_path, td.uuid)) - td_path = paths.join(self.task_path, td.uuid) - td_data = jsonutils.dumps(p_utils.format_task_detail(td)) - txn.create(td_path, misc.binary_encode(td_data)) + # for the provided atom detail so that a reference exists + # from the flow detail to its atom details. + txn.create(paths.join(fd_path, ad.uuid)) + ad_path = paths.join(self.atom_path, ad.uuid) + ad_data = base._format_atom(ad) + txn.create(ad_path, + misc.binary_encode(jsonutils.dumps(ad_data))) return lb def _update_logbook(lb_path, lb_data, txn): - e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data)) - e_lb = p_utils.logbook_merge(e_lb, lb) - lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at) + e_lb = logbook.LogBook.from_dict(misc.decode_json(lb_data), + unmarshal_time=True) + e_lb = e_lb.merge(lb) + lb_data = e_lb.to_dict(marshal_time=True) txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data))) for fd in lb: fd_path = paths.join(lb_path, fd.uuid) @@ -312,8 +317,8 @@ class ZkConnection(base.Connection): except k_exc.NoNodeError: raise exc.NotFound("No logbook found with id: %s" % lb_uuid) else: - lb = p_utils.unformat_logbook(lb_uuid, - misc.decode_json(lb_data)) + lb = logbook.LogBook.from_dict(misc.decode_json(lb_data), + unmarshal_time=True) for fd_uuid in self._client.get_children(lb_path): lb.add(self._get_flow_details(fd_uuid)) return lb @@ -338,21 +343,21 @@ class ZkConnection(base.Connection): def destroy_logbook(self, lb_uuid): """Destroy (delete) a log_book transactionally.""" - def _destroy_task_details(td_uuid, txn): - td_path = paths.join(self.task_path, td_uuid) - if not self._client.exists(td_path): - raise exc.NotFound("No task details found with id: %s" - % td_uuid) - txn.delete(td_path) + def _destroy_atom_details(ad_uuid, txn): + ad_path = paths.join(self.atom_path, ad_uuid) + if not self._client.exists(ad_path): + raise exc.NotFound("No atom details found with id: %s" + % ad_uuid) + txn.delete(ad_path) def _destroy_flow_details(fd_uuid, txn): fd_path = paths.join(self.flow_path, fd_uuid) if not self._client.exists(fd_path): raise exc.NotFound("No flow details found with id: %s" % fd_uuid) - for td_uuid in self._client.get_children(fd_path): - _destroy_task_details(td_uuid, txn) - txn.delete(paths.join(fd_path, td_uuid)) + for ad_uuid in self._client.get_children(fd_path): + _destroy_atom_details(ad_uuid, txn) + txn.delete(paths.join(fd_path, ad_uuid)) txn.delete(fd_path) def _destroy_logbook(lb_uuid, txn): @@ -380,20 +385,20 @@ class ZkConnection(base.Connection): txn.delete(paths.join(lb_path, fd_uuid)) txn.delete(lb_path) - # Delete all data under flowdetail path. + # Delete all data under flow detail path. for fd_uuid in self._client.get_children(self.flow_path): fd_path = paths.join(self.flow_path, fd_uuid) - for td_uuid in self._client.get_children(fd_path): - txn.delete(paths.join(fd_path, td_uuid)) + for ad_uuid in self._client.get_children(fd_path): + txn.delete(paths.join(fd_path, ad_uuid)) txn.delete(fd_path) - # Delete all data under taskdetail path. - for td_uuid in self._client.get_children(self.task_path): - td_path = paths.join(self.task_path, td_uuid) - txn.delete(td_path) + # Delete all data under atom detail path. + for ad_uuid in self._client.get_children(self.atom_path): + ad_path = paths.join(self.atom_path, ad_uuid) + txn.delete(ad_path) # Delete containing directories. if delete_dirs: txn.delete(self.book_path) - txn.delete(self.task_path) + txn.delete(self.atom_path) txn.delete(self.flow_path) diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/589dccdf2b6e_rename_taskdetails_to_atomdetails.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/589dccdf2b6e_rename_taskdetails_to_atomdetails.py new file mode 100644 index 00000000..079aaa23 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/589dccdf2b6e_rename_taskdetails_to_atomdetails.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Rename taskdetails to atomdetails + +Revision ID: 589dccdf2b6e +Revises: 14b227d79a87 +Create Date: 2014-03-19 11:49:16.533227 + +""" + +# revision identifiers, used by Alembic. +revision = '589dccdf2b6e' +down_revision = '14b227d79a87' + +from alembic import op + + +def upgrade(): + op.rename_table("taskdetails", "atomdetails") + + +def downgrade(): + op.rename_table("atomdetails", "taskdetails") diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py index 1e47583b..d03b6528 100644 --- a/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/84d6e888850_add_task_detail_type.py @@ -33,8 +33,7 @@ from taskflow.persistence import logbook def upgrade(): atom_types = sa.Enum(*logbook.ATOM_TYPES, name='atom_types') - column = sa.Column('atom_type', atom_types, - server_default=logbook.TASK_DETAIL) + column = sa.Column('atom_type', atom_types) bind = op.get_bind() impl = atom_types.dialect_impl(bind.dialect) impl.create(bind, checkfirst=True) diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py index e06e8a39..cad86628 100644 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ b/taskflow/persistence/backends/sqlalchemy/models.py @@ -28,7 +28,6 @@ from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states -from taskflow.utils import persistence_utils BASE = declarative_base() @@ -49,27 +48,6 @@ class Json(types.TypeDecorator): return jsonutils.loads(value) -class Failure(types.TypeDecorator): - """Put misc.Failure object into database column. - - We convert Failure object to dict, serialize that dict into - JSON and save it. None is stored as NULL. - - The conversion is lossy since we cannot save exc_info. - """ - impl = types.Text - - def process_bind_param(self, value, dialect): - if value is None: - return None - return jsonutils.dumps(persistence_utils.failure_to_dict(value)) - - def process_result_value(self, value, dialect): - if value is None: - return None - return persistence_utils.failure_from_dict(jsonutils.loads(value)) - - class ModelBase(TimestampMixin): """Base model for all taskflow objects.""" uuid = Column(String, default=uuidutils.generate_uuid, @@ -98,23 +76,23 @@ class FlowDetail(BASE, ModelBase): # Relationships parent_uuid = Column(String, ForeignKey('logbooks.uuid')) - taskdetails = relationship("TaskDetail", + atomdetails = relationship("AtomDetail", single_parent=True, backref=backref("flowdetails", cascade="save-update, delete, " "merge")) -class TaskDetail(BASE, ModelBase): - __tablename__ = 'taskdetails' +class AtomDetail(BASE, ModelBase): + __tablename__ = 'atomdetails' - atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types')) # Member variables + atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types')) state = Column(String) intention = Column(Enum(*states.INTENTIONS, name='intentions')) results = Column(Json) - failure = Column(Failure) - version = Column(String) + failure = Column(Json) + version = Column(Json) # Relationships parent_uuid = Column(String, ForeignKey('flowdetails.uuid')) diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index d60fdac0..4d17dd45 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -15,20 +15,37 @@ # License for the specific language governing permissions and limitations # under the License. +import abc +import copy import logging -import abc import six +from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow import states +from taskflow.utils import misc LOG = logging.getLogger(__name__) -TASK_DETAIL = 'TASK_DETAIL' -RETRY_DETAIL = 'RETRY_DETAIL' -ATOM_TYPES = [TASK_DETAIL, RETRY_DETAIL] +def _copy_function(deep_copy): + if deep_copy: + return copy.deepcopy + else: + return lambda x: x + + +def _safe_marshal_time(when): + if not when: + return None + return timeutils.marshall_now(now=when) + + +def _safe_unmarshal_time(when): + if not when: + return None + return timeutils.unmarshall_time(when) class LogBook(object): @@ -41,35 +58,75 @@ class LogBook(object): storage in real time. The data in this class will only be guaranteed to be persisted when a save occurs via some backend connection. """ - def __init__(self, name, uuid=None, updated_at=None, created_at=None): + def __init__(self, name, uuid=None): if uuid: self._uuid = uuid else: self._uuid = uuidutils.generate_uuid() self._name = name self._flowdetails_by_id = {} - self._updated_at = updated_at - self._created_at = created_at + self.created_at = timeutils.utcnow() + self.updated_at = None self.meta = None - @property - def created_at(self): - return self._created_at - - @property - def updated_at(self): - return self._updated_at - def add(self, fd): """Adds a new entry to the underlying logbook. Does not *guarantee* that the details will be immediately saved. """ self._flowdetails_by_id[fd.uuid] = fd + self.updated_at = timeutils.utcnow() def find(self, flow_uuid): return self._flowdetails_by_id.get(flow_uuid, None) + def merge(self, lb, deep_copy=False): + """Merges the current object state with the given ones state. + + NOTE(harlowja): Does not merge the flow details contained in either. + """ + if lb is self: + return self + copy_fn = _copy_function(deep_copy) + if self.meta != lb.meta: + self.meta = copy_fn(lb.meta) + if lb.created_at != self.created_at: + self.created_at = copy_fn(lb.created_at) + if lb.updated_at != self.updated_at: + self.updated_at = copy_fn(lb.updated_at) + return self + + def to_dict(self, marshal_time=False): + """Translates the internal state of this object to a dictionary. + + NOTE(harlowja): Does not include the contained flow details. + """ + if not marshal_time: + marshal_fn = lambda x: x + else: + marshal_fn = _safe_marshal_time + data = { + 'name': self.name, + 'meta': self.meta, + 'uuid': self.uuid, + 'updated_at': marshal_fn(self.updated_at), + 'created_at': marshal_fn(self.created_at), + } + return data + + @classmethod + def from_dict(cls, data, unmarshal_time=False): + """Translates the given data into an instance of this class.""" + if not unmarshal_time: + unmarshal_fn = lambda x: x + else: + unmarshal_fn = _safe_unmarshal_time + obj = cls(data['name'], uuid=data['uuid']) + obj.updated_at = unmarshal_fn(data['updated_at']) + obj.created_at = unmarshal_fn(data['created_at']) + obj.meta = data.get('meta') + return obj + @property def uuid(self): return self._uuid @@ -87,7 +144,7 @@ class LogBook(object): class FlowDetail(object): - """This class contains a dict of task detail entries for a given + """This class contains a dict of atom detail entries for a given flow along with any metadata associated with that flow. The data contained within this class need *not* be backed by the backend @@ -97,7 +154,7 @@ class FlowDetail(object): def __init__(self, name, uuid): self._uuid = uuid self._name = name - self._taskdetails_by_id = {} + self._atomdetails_by_id = {} self.state = None # Any other metadata to include about this flow while storing. For # example timing information could be stored here, other misc. flow @@ -107,16 +164,52 @@ class FlowDetail(object): def update(self, fd): """Updates the objects state to be the same as the given one.""" if fd is self: - return - self._taskdetails_by_id = dict(fd._taskdetails_by_id) + return self + self._atomdetails_by_id = dict(fd._atomdetails_by_id) self.state = fd.state self.meta = fd.meta + return self - def add(self, td): - self._taskdetails_by_id[td.uuid] = td + def merge(self, fd, deep_copy=False): + """Merges the current object state with the given ones state. - def find(self, td_uuid): - return self._taskdetails_by_id.get(td_uuid) + NOTE(harlowja): Does not merge the atom details contained in either. + """ + if fd is self: + return self + copy_fn = _copy_function(deep_copy) + if self.meta != fd.meta: + self.meta = copy_fn(fd.meta) + if self.state != fd.state: + # NOTE(imelnikov): states are just strings, no need to copy. + self.state = fd.state + return self + + def to_dict(self): + """Translates the internal state of this object to a dictionary. + + NOTE(harlowja): Does not include the contained atom details. + """ + return { + 'name': self.name, + 'meta': self.meta, + 'state': self.state, + 'uuid': self.uuid, + } + + @classmethod + def from_dict(cls, data): + """Translates the given data into an instance of this class.""" + obj = cls(data['name'], data['uuid']) + obj.state = data.get('state') + obj.meta = data.get('meta') + return obj + + def add(self, ad): + self._atomdetails_by_id[ad.uuid] = ad + + def find(self, ad_uuid): + return self._atomdetails_by_id.get(ad_uuid) @property def uuid(self): @@ -127,11 +220,11 @@ class FlowDetail(object): return self._name def __iter__(self): - for td in six.itervalues(self._taskdetails_by_id): - yield td + for ad in six.itervalues(self._atomdetails_by_id): + yield ad def __len__(self): - return len(self._taskdetails_by_id) + return len(self._atomdetails_by_id) @six.add_metaclass(abc.ABCMeta) @@ -172,16 +265,71 @@ class AtomDetail(object): # information can be associated with. self.version = None - def update(self, td): + def update(self, ad): """Updates the objects state to be the same as the given one.""" - if td is self: - return - self.state = td.state - self.intention = td.intention - self.meta = td.meta - self.failure = td.failure - self.results = td.results - self.version = td.version + if ad is self: + return self + self.state = ad.state + self.intention = ad.intention + self.meta = ad.meta + self.failure = ad.failure + self.results = ad.results + self.version = ad.version + return self + + @abc.abstractmethod + def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state.""" + copy_fn = _copy_function(deep_copy) + # NOTE(imelnikov): states and intentions are just strings, + # so there is no need to copy them (strings are immutable in python). + self.state = other.state + self.intention = other.intention + if self.failure != other.failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if other.failure: + if deep_copy: + self.failure = other.failure.copy() + else: + self.failure = other.failure + else: + self.failure = None + if self.meta != other.meta: + self.meta = copy_fn(other.meta) + if self.version != other.version: + self.version = copy_fn(other.version) + return self + + @abc.abstractmethod + def to_dict(self): + """Translates the internal state of this object to a dictionary.""" + + def _to_dict_shared(self): + if self.failure: + failure = self.failure.to_dict() + else: + failure = None + return { + 'failure': failure, + 'meta': self.meta, + 'name': self.name, + 'results': self.results, + 'state': self.state, + 'version': self.version, + 'intention': self.intention, + 'uuid': self.uuid, + } + + def _from_dict_shared(self, data): + self.state = data.get('state') + self.intention = data.get('intention') + self.results = data.get('results') + self.version = data.get('version') + self.meta = data.get('meta') + failure = data.get('failure') + if failure: + self.failure = misc.Failure.from_dict(failure) @property def uuid(self): @@ -191,10 +339,6 @@ class AtomDetail(object): def name(self): return self._name - @abc.abstractproperty - def atom_type(self): - """Identifies atom type represented by this detail.""" - @abc.abstractmethod def reset(self, state): """Resets detail results ans failures.""" @@ -205,16 +349,34 @@ class TaskDetail(AtomDetail): def __init__(self, name, uuid): super(TaskDetail, self).__init__(name, uuid) - @property - def atom_type(self): - return TASK_DETAIL - def reset(self, state): self.results = None self.failure = None self.state = state self.intention = states.EXECUTE + @classmethod + def from_dict(cls, data): + """Translates the given data into an instance of this class.""" + obj = cls(data['name'], data['uuid']) + obj._from_dict_shared(data) + return obj + + def to_dict(self): + """Translates the internal state of this object to a dictionary.""" + return self._to_dict_shared() + + def merge(self, other, deep_copy=False): + if not isinstance(other, TaskDetail): + raise NotImplemented("Can only merge with other task details") + if other is self: + return self + super(TaskDetail, self).merge(other, deep_copy=deep_copy) + copy_fn = _copy_function(deep_copy) + if self.results != other.results: + self.results = copy_fn(other.results) + return self + class RetryDetail(AtomDetail): """This class represents a retry detail for retry controller object.""" @@ -222,21 +384,89 @@ class RetryDetail(AtomDetail): super(RetryDetail, self).__init__(name, uuid) self.results = [] - @property - def atom_type(self): - return RETRY_DETAIL - def reset(self, state): self.results = [] self.failure = None self.state = state self.intention = states.EXECUTE + @classmethod + def from_dict(cls, data): + """Translates the given data into an instance of this class.""" -def get_atom_detail_class(atom_type): - if atom_type == TASK_DETAIL: - return TaskDetail - elif atom_type == RETRY_DETAIL: - return RetryDetail - else: - raise TypeError("Unknown atom type") + def decode_results(results): + if not results: + return [] + new_results = [] + for (data, failures) in results: + new_failures = {} + for (key, failure_data) in six.iteritems(failures): + new_failures[key] = misc.Failure.from_dict(failure_data) + new_results.append((data, new_failures)) + return new_results + + obj = cls(data['name'], data['uuid']) + obj._from_dict_shared(data) + obj.results = decode_results(obj.results) + return obj + + def to_dict(self): + """Translates the internal state of this object to a dictionary.""" + + def encode_results(results): + if not results: + return [] + new_results = [] + for (data, failures) in results: + new_failures = {} + for (key, failure) in six.iteritems(failures): + new_failures[key] = failure.to_dict() + new_results.append((data, new_failures)) + return new_results + + base = self._to_dict_shared() + base['results'] = encode_results(base.get('results')) + return base + + def merge(self, other, deep_copy=False): + if not isinstance(other, RetryDetail): + raise NotImplemented("Can only merge with other retry details") + if other is self: + return self + super(RetryDetail, self).merge(other, deep_copy=deep_copy) + results = [] + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + for (data, failures) in other.results: + copied_failures = {} + for (key, failure) in six.iteritems(failures): + if deep_copy: + copied_failures[key] = failure.copy() + else: + copied_failures[key] = failure + results.append((data, copied_failures)) + self.results = results + return self + + +_DETAIL_TO_NAME = { + RetryDetail: 'RETRY_DETAIL', + TaskDetail: 'TASK_DETAIL', +} +_NAME_TO_DETAIL = dict((name, cls) + for (cls, name) in six.iteritems(_DETAIL_TO_NAME)) +ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL)) + + +def atom_detail_class(atom_type): + try: + return _NAME_TO_DETAIL[atom_type] + except KeyError: + raise TypeError("Unknown atom type: %s" % (atom_type)) + + +def atom_detail_type(atom_detail): + try: + return _DETAIL_TO_NAME[type(atom_detail)] + except KeyError: + raise TypeError("Unknown atom type: %s" % type(atom_detail)) diff --git a/taskflow/storage.py b/taskflow/storage.py index 8875eef5..3689da99 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -36,10 +36,11 @@ STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) class Storage(object): """Interface between engines and logbook. - This class provides a simple interface to save tasks of a given flow and + This class provides a simple interface to save atoms of a given flow and associated activity and results to persistence layer (logbook, - task_details, flow_details) for use by engines, making it easier to - interact with the underlying storage & backend mechanism. + atom_details, flow_details) for use by engines. This makes it easier to + interact with the underlying storage & backend mechanism through this + interface rather than accessing those objects directly. """ injector_name = '_TaskFlow_INJECTOR' @@ -54,15 +55,17 @@ class Storage(object): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in task name -> misc.Failure mapping. self._failures = {} - for td in self._flowdetail: - if td.failure is not None: - self._failures[td.name] = td.failure + for ad in self._flowdetail: + if ad.failure is not None: + self._failures[ad.name] = ad.failure - self._task_name_to_uuid = dict((td.name, td.uuid) - for td in self._flowdetail) + self._atom_name_to_uuid = dict((ad.name, ad.uuid) + for ad in self._flowdetail) try: - injector_td = self._taskdetail_by_name(self.injector_name) + injector_td = self._atomdetail_by_name( + self.injector_name, + expected_type=logbook.TaskDetail) except exceptions.NotFound: pass else: @@ -100,16 +103,16 @@ class Storage(object): """ with self._lock.write_lock(): try: - task_id = self._task_name_to_uuid[task_name] + task_id = self._atom_name_to_uuid[task_name] except KeyError: task_id = uuidutils.generate_uuid() self._create_atom_detail(logbook.TaskDetail, task_name, task_id, task_version) else: - td = self._flowdetail.find(task_id) - if td.atom_type != logbook.TASK_DETAIL: + ad = self._flowdetail.find(task_id) + if not isinstance(ad, logbook.TaskDetail): raise exceptions.Duplicate( - "Task detail %s already exists in flow detail %s." % + "Atom detail %s already exists in flow detail %s." % (task_name, self._flowdetail.name)) self._set_result_mapping(task_name, result_mapping) return task_id @@ -128,16 +131,16 @@ class Storage(object): """ with self._lock.write_lock(): try: - retry_id = self._task_name_to_uuid[retry_name] + retry_id = self._atom_name_to_uuid[retry_name] except KeyError: retry_id = uuidutils.generate_uuid() self._create_atom_detail(logbook.RetryDetail, retry_name, retry_id, retry_version) else: - td = self._flowdetail.find(retry_id) - if td.atom_type != logbook.RETRY_DETAIL: + ad = self._flowdetail.find(retry_id) + if not isinstance(ad, logbook.RetryDetail): raise exceptions.Duplicate( - "Task detail %s already exists in flow detail %s." % + "Atom detail %s already exists in flow detail %s." % (retry_name, self._flowdetail.name)) self._set_result_mapping(retry_name, result_mapping) return retry_id @@ -153,7 +156,7 @@ class Storage(object): ad.version = task_version self._flowdetail.add(ad) self._with_connection(self._save_flow_detail) - self._task_name_to_uuid[ad.name] = ad.uuid + self._atom_name_to_uuid[ad.name] = ad.uuid @property def flow_name(self): @@ -171,73 +174,93 @@ class Storage(object): # added item to the flow detail). self._flowdetail.update(conn.update_flow_details(self._flowdetail)) - def _taskdetail_by_name(self, task_name): + def _atomdetail_by_name(self, atom_name, expected_type=None): try: - return self._flowdetail.find(self._task_name_to_uuid[task_name]) + ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name]) except KeyError: - raise exceptions.NotFound("Unknown task name: %s" % task_name) + raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + else: + # TODO(harlowja): we need to figure out how to get away from doing + # these kinds of type checks in general (since they likely mean + # we aren't doing something right). + if expected_type and not isinstance(ad, expected_type): + raise TypeError("Atom %s is not of the expected type: %s" + % (atom_name, + reflection.get_class_name(expected_type))) + return ad - def _save_task_detail(self, conn, task_detail): - # NOTE(harlowja): we need to update our contained task detail if + def _save_atom_detail(self, conn, atom_detail): + # NOTE(harlowja): we need to update our contained atom detail if # the result of the update actually added more (aka another process - # is also modifying the task detail). - task_detail.update(conn.update_task_details(task_detail)) + # is also modifying the task detail), since python is by reference + # and the contained atom detail will reflect the old state if we don't + # do this update. + atom_detail.update(conn.update_atom_details(atom_detail)) - def get_task_uuid(self, task_name): - """Get task uuid by given name.""" + def get_atom_uuid(self, atom_name): + """Gets an atoms uuid given a atoms name.""" with self._lock.read_lock(): - td = self._taskdetail_by_name(task_name) - return td.uuid + ad = self._atomdetail_by_name(atom_name) + return ad.uuid - def set_task_state(self, task_name, state): - """Set task or retry state.""" + def set_atom_state(self, atom_name, state): + """Sets an atoms state.""" with self._lock.write_lock(): - td = self._taskdetail_by_name(task_name) - td.state = state - self._with_connection(self._save_task_detail, td) + ad = self._atomdetail_by_name(atom_name) + ad.state = state + self._with_connection(self._save_atom_detail, ad) - def get_task_state(self, task_name): - """Get state of task with given name.""" + def get_atom_state(self, atom_name): + """Gets the state of an atom given an atoms name.""" with self._lock.read_lock(): - td = self._taskdetail_by_name(task_name) - return td.state + ad = self._atomdetail_by_name(atom_name) + return ad.state def set_atom_intention(self, atom_name, intention): - """Set intention for atom with given name.""" - td = self._taskdetail_by_name(atom_name) - td.intention = intention - self._with_connection(self._save_task_detail, task_detail=td) + """Sets the intention of an atom given an atoms name.""" + ad = self._atomdetail_by_name(atom_name) + ad.intention = intention + self._with_connection(self._save_atom_detail, ad) def get_atom_intention(self, atom_name): - """Get intention of atom with given name.""" - return self._taskdetail_by_name(atom_name).intention + """Gets the intention of an atom given an atoms name.""" + ad = self._atomdetail_by_name(atom_name) + return ad.intention - def get_tasks_states(self, task_names): - """Gets all task states.""" + def get_atoms_states(self, atom_names): + """Gets all atoms states given a set of names.""" with self._lock.read_lock(): - return dict((name, (self.get_task_state(name), + return dict((name, (self.get_atom_state(name), self.get_atom_intention(name))) - for name in task_names) + for name in atom_names) - def update_task_metadata(self, task_name, update_with): - """Updates a tasks metadata.""" + def _update_atom_metadata(self, atom_name, update_with, + expected_type=None): if not update_with: - return + update_with = {} with self._lock.write_lock(): - td = self._taskdetail_by_name(task_name) - if not td.meta: - td.meta = {} - td.meta.update(update_with) - self._with_connection(self._save_task_detail, td) + ad = self._atomdetail_by_name(atom_name, + expected_type=expected_type) + if not ad.meta: + ad.meta = {} + ad.meta.update(update_with) + self._with_connection(self._save_atom_detail, ad) + + def update_atom_metadata(self, atom_name, update_with): + """Updates a atoms metadata given another dictionary or a list of + (key, value) pairs to include in the updated metadata (newer keys will + overwrite older keys). + """ + self._update_atom_metadata(atom_name, update_with) def set_task_progress(self, task_name, progress, details=None): - """Set task progress. + """Set a tasks progress. :param task_name: task name - :param progress: task progress - :param details: task specific progress information + :param progress: tasks progress (0.0 <-> 1.0) + :param details: any task specific progress details """ - metadata_update = { + update_with = { 'progress': progress, } if details is not None: @@ -245,109 +268,118 @@ class Storage(object): # updating details (e.g. automatically from engine) # we save progress value with details, too. if details: - metadata_update['progress_details'] = { + update_with['progress_details'] = { 'at_progress': progress, 'details': details, } else: - metadata_update['progress_details'] = None - self.update_task_metadata(task_name, metadata_update) + update_with['progress_details'] = None + self._update_atom_metadata(task_name, update_with, + expected_type=logbook.TaskDetail) def get_task_progress(self, task_name): - """Get progress of task with given name. + """Get the progress of a task given a tasks name. - :param task_name: task name + :param task_name: tasks name :returns: current task progress value """ with self._lock.read_lock(): - td = self._taskdetail_by_name(task_name) - if not td.meta: + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress'] + except (TypeError, KeyError): return 0.0 - return td.meta.get('progress', 0.0) def get_task_progress_details(self, task_name): - """Get progress details of task with given name. + """Get the progress details of a task given a tasks name. :param task_name: task name :returns: None if progress_details not defined, else progress_details dict """ with self._lock.read_lock(): - td = self._taskdetail_by_name(task_name) - if not td.meta: + ad = self._atomdetail_by_name(task_name, + expected_type=logbook.TaskDetail) + try: + return ad.meta['progress_details'] + except (TypeError, KeyError): return None - return td.meta.get('progress_details') - def _check_all_results_provided(self, task_name, data): - """Warn if task did not provide some of results. + def _check_all_results_provided(self, atom_name, data): + """Warn if an atom did not provide some of its expected results. - This may happen if task returns shorter tuple or list or dict - without all needed keys. It may also happen if task returns + This may happen if atom returns shorter tuple or list or dict + without all needed keys. It may also happen if atom returns result of wrong type. """ - result_mapping = self._result_mappings.get(task_name) + result_mapping = self._result_mappings.get(atom_name) if not result_mapping: return for name, index in six.iteritems(result_mapping): try: misc.item_from(data, index, name=name) except exceptions.NotFound: - LOG.warning("Task %s did not supply result " - "with index %r (name %s)", task_name, index, name) + LOG.warning("Atom %s did not supply result " + "with index %r (name %s)", atom_name, index, name) - def save(self, task_name, data, state=states.SUCCESS): - """Put result for task with id 'uuid' to storage.""" + def save(self, atom_name, data, state=states.SUCCESS): + """Put result for atom with id 'uuid' to storage.""" with self._lock.write_lock(): - td = self._taskdetail_by_name(task_name) - td.state = state + ad = self._atomdetail_by_name(atom_name) + ad.state = state if state == states.FAILURE and isinstance(data, misc.Failure): + # FIXME(harlowja): this seems like it should be internal logic + # in the atom detail object and not in here. Fix that soon... + # # Do not clean retry history - if td.atom_type != logbook.RETRY_DETAIL: - td.results = None - td.failure = data - self._failures[td.name] = data + if not isinstance(ad, logbook.RetryDetail): + ad.results = None + ad.failure = data + self._failures[ad.name] = data else: - if td.atom_type == logbook.RETRY_DETAIL: - td.results.append((data, {})) + # FIXME(harlowja): this seems like it should be internal logic + # in the atom detail object and not in here. Fix that soon... + if isinstance(ad, logbook.RetryDetail): + ad.results.append((data, {})) else: - td.results = data - td.failure = None - self._check_all_results_provided(td.name, data) - self._with_connection(self._save_task_detail, task_detail=td) + ad.results = data + ad.failure = None + self._check_all_results_provided(ad.name, data) + self._with_connection(self._save_atom_detail, ad) def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" with self._lock.write_lock(): - td = self._taskdetail_by_name(retry_name) - if td.atom_type != logbook.RETRY_DETAIL: - raise TypeError( - "Atom %s is not a retry controller." % retry_name) - failures = td.results[-1][1] + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + failures = ad.results[-1][1] if failed_atom_name not in failures: failures[failed_atom_name] = failure - self._with_connection(self._save_task_detail, task_detail=td) + self._with_connection(self._save_atom_detail, ad) def cleanup_retry_history(self, retry_name, state): - """Cleanup history of retry with given name.""" + """Cleanup history of retry atom with given name.""" with self._lock.write_lock(): - td = self._taskdetail_by_name(retry_name) - td.state = state - td.results = [] - self._with_connection(self._save_task_detail, task_detail=td) + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + ad.state = state + ad.results = [] + self._with_connection(self._save_atom_detail, ad) - def get(self, task_name): - """Get result for task with name 'task_name' to storage.""" + def get(self, atom_name): + """Gets the result for an atom with a given name from storage.""" with self._lock.read_lock(): - td = self._taskdetail_by_name(task_name) - if td.failure is not None: - cached = self._failures.get(task_name) - if td.failure.matches(cached): + ad = self._atomdetail_by_name(atom_name) + if ad.failure is not None: + cached = self._failures.get(atom_name) + if ad.failure.matches(cached): return cached - return td.failure - if td.state not in STATES_WITH_RESULTS: - raise exceptions.NotFound("Result for task %s is not known" - % task_name) - return td.results + return ad.failure + if ad.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for atom %s is not currently" + " known" % atom_name) + return ad.results def get_failures(self): """Get list of failures that happened with this flow. @@ -362,21 +394,21 @@ class Storage(object): with self._lock.read_lock(): return bool(self._failures) - def _reset_task(self, td, state): - if td.name == self.injector_name: + def _reset_atom(self, ad, state): + if ad.name == self.injector_name: return False - if td.state == state: + if ad.state == state: return False - td.reset(state) - self._failures.pop(td.name, None) + ad.reset(state) + self._failures.pop(ad.name, None) return True - def reset(self, task_name, state=states.PENDING): - """Remove result for task with id 'uuid' from storage.""" + def reset(self, atom_name, state=states.PENDING): + """Reset atom with given name (if the task is in a given state).""" with self._lock.write_lock(): - td = self._taskdetail_by_name(task_name) - if self._reset_task(td, state): - self._with_connection(self._save_task_detail, td) + ad = self._atomdetail_by_name(atom_name) + if self._reset_atom(ad, state): + self._with_connection(self._save_atom_detail, ad) def inject(self, pairs): """Add values into storage. @@ -386,23 +418,26 @@ class Storage(object): """ with self._lock.write_lock(): try: - td = self._taskdetail_by_name(self.injector_name) + ad = self._atomdetail_by_name( + self.injector_name, + expected_type=logbook.TaskDetail) except exceptions.NotFound: uuid = uuidutils.generate_uuid() self._create_atom_detail(logbook.TaskDetail, self.injector_name, uuid) - td = self._taskdetail_by_name(self.injector_name) - td.results = dict(pairs) - td.state = states.SUCCESS + ad = self._atomdetail_by_name(self.injector_name, + expected_type=logbook.TaskDetail) + ad.results = dict(pairs) + ad.state = states.SUCCESS else: - td.results.update(pairs) - self._with_connection(self._save_task_detail, td) - names = six.iterkeys(td.results) + ad.results.update(pairs) + self._with_connection(self._save_atom_detail, ad) + names = six.iterkeys(ad.results) self._set_result_mapping(self.injector_name, dict((name, name) for name in names)) - def _set_result_mapping(self, task_name, mapping): - """Set mapping for naming task results. + def _set_result_mapping(self, atom_name, mapping): + """Sets the result mapping for an atom. The result saved with given name would be accessible by names defined in mapping. Mapping is a dict name => index. If index @@ -411,42 +446,42 @@ class Storage(object): """ if not mapping: return - self._result_mappings[task_name] = mapping + self._result_mappings[atom_name] = mapping for name, index in six.iteritems(mapping): entries = self._reverse_mapping.setdefault(name, []) # NOTE(imelnikov): We support setting same result mapping for - # the same task twice (e.g when we are injecting 'a' and then + # the same atom twice (e.g when we are injecting 'a' and then # injecting 'a' again), so we should not log warning below in # that case and we should have only one item for each pair - # (task_name, index) in entries. It should be put to the end of + # (atom_name, index) in entries. It should be put to the end of # entries list because order matters on fetching. try: - entries.remove((task_name, index)) + entries.remove((atom_name, index)) except ValueError: pass - entries.append((task_name, index)) + entries.append((atom_name, index)) if len(entries) > 1: LOG.warning("Multiple provider mappings being created for %r", name) def fetch(self, name): - """Fetch named task result.""" + """Fetch a named atoms result.""" with self._lock.read_lock(): try: indexes = self._reverse_mapping[name] except KeyError: raise exceptions.NotFound("Name %r is not mapped" % name) # Return the first one that is found. - for (task_name, index) in reversed(indexes): + for (atom_name, index) in reversed(indexes): try: - result = self.get(task_name) - td = self._taskdetail_by_name(task_name) + result = self.get(atom_name) + ad = self._atomdetail_by_name(atom_name) # If it is a retry's result then fetch values from the - # latest retry run. - if td.atom_type == logbook.RETRY_DETAIL: + # latest retry run only. + if isinstance(ad, logbook.RetryDetail): if result: result = result[-1][0] else: @@ -457,7 +492,7 @@ class Storage(object): raise exceptions.NotFound("Unable to find result %r" % name) def fetch_all(self): - """Fetch all named task results known so far. + """Fetch all named atom results known so far. Should be used for debugging and testing purposes mostly. """ @@ -471,7 +506,7 @@ class Storage(object): return results def fetch_mapped_args(self, args_mapping): - """Fetch arguments for the task using arguments mapping.""" + """Fetch arguments for an atom using an atoms arguments mapping.""" with self._lock.read_lock(): return dict((key, self.fetch(name)) for key, name in six.iteritems(args_mapping)) @@ -490,19 +525,20 @@ class Storage(object): state = states.PENDING return state - def get_retry_history(self, name): + def get_retry_history(self, retry_name): """Fetch retry results history.""" with self._lock.read_lock(): - td = self._taskdetail_by_name(name) - if td.failure is not None: - cached = self._failures.get(name) - history = list(td.results) - if td.failure.matches(cached): + ad = self._atomdetail_by_name(retry_name, + expected_type=logbook.RetryDetail) + if ad.failure is not None: + cached = self._failures.get(retry_name) + history = list(ad.results) + if ad.failure.matches(cached): history.append((cached, {})) else: - history.append((td.failure, {})) + history.append((ad.failure, {})) return history - return td.results + return ad.results class MultiThreadedStorage(Storage): diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index ccc2b449..3d28695c 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -102,13 +102,13 @@ class PersistenceTestMixin(object): # their parent existing). with contextlib.closing(self._get_connection()) as conn: self.assertRaises(exc.NotFound, conn.update_flow_details, fd) - self.assertRaises(exc.NotFound, conn.update_task_details, td) + self.assertRaises(exc.NotFound, conn.update_atom_details, td) # Ok now we should be able to save them. with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(td) def test_task_detail_meta_update(self): lb_id = uuidutils.generate_uuid() @@ -123,18 +123,18 @@ class PersistenceTestMixin(object): with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(td) td.meta['test'] = 43 with contextlib.closing(self._get_connection()) as conn: - conn.update_task_details(td) + conn.update_atom_details(td) with contextlib.closing(self._get_connection()) as conn: lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) self.assertEqual(td2.meta.get('test'), 43) - self.assertEqual(td2.atom_type, logbook.TASK_DETAIL) + self.assertIsInstance(td2, logbook.TaskDetail) def test_task_detail_with_failure(self): lb_id = uuidutils.generate_uuid() @@ -154,7 +154,7 @@ class PersistenceTestMixin(object): with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(td) # Read failure back with contextlib.closing(self._get_connection()) as conn: @@ -165,7 +165,7 @@ class PersistenceTestMixin(object): self.assertEqual(failure.exception_str, 'Woot!') self.assertIs(failure.check(RuntimeError), RuntimeError) self.assertEqual(failure.traceback_str, td.failure.traceback_str) - self.assertEqual(td2.atom_type, logbook.TASK_DETAIL) + self.assertIsInstance(td2, logbook.TaskDetail) def test_logbook_merge_flow_detail(self): lb_id = uuidutils.generate_uuid() @@ -246,21 +246,21 @@ class PersistenceTestMixin(object): lb = logbook.LogBook(name=lb_name, uuid=lb_id) fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid()) - td.intention = states.REVERT - fd.add(td) + rd = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid()) + rd.intention = states.REVERT + fd.add(rd) with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(rd) with contextlib.closing(self._get_connection()) as conn: lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) - td2 = fd2.find(td.uuid) - self.assertEqual(td2.atom_type, logbook.RETRY_DETAIL) - self.assertEqual(td2.intention, states.REVERT) + rd2 = fd2.find(rd.uuid) + self.assertEqual(rd2.intention, states.REVERT) + self.assertIsInstance(rd2, logbook.RetryDetail) def test_retry_detail_save_with_task_failure(self): lb_id = uuidutils.generate_uuid() @@ -268,23 +268,24 @@ class PersistenceTestMixin(object): lb = logbook.LogBook(name=lb_name, uuid=lb_id) fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) + rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) fail = misc.Failure.from_exception(RuntimeError('fail')) - td.results.append((42, {'some-task': fail})) - fd.add(td) + rd.results.append((42, {'some-task': fail})) + fd.add(rd) # save it with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(rd) # now read it back with contextlib.closing(self._get_connection()) as conn: lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) - td2 = fd2.find(td.uuid) - fail2 = td2.results[0][1].get('some-task') + rd2 = fd2.find(rd.uuid) + self.assertIsInstance(rd2, logbook.RetryDetail) + fail2 = rd2.results[0][1].get('some-task') self.assertIsInstance(fail2, misc.Failure) self.assertTrue(fail.matches(fail2)) @@ -294,23 +295,24 @@ class PersistenceTestMixin(object): lb = logbook.LogBook(name=lb_name, uuid=lb_id) fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) - td = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) - fd.add(td) + rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) + fd.add(rd) # save it with contextlib.closing(self._get_connection()) as conn: conn.save_logbook(lb) conn.update_flow_details(fd) - conn.update_task_details(td) + conn.update_atom_details(rd) # change intention and save - td.intention = states.REVERT + rd.intention = states.REVERT with contextlib.closing(self._get_connection()) as conn: - conn.update_task_details(td) + conn.update_atom_details(rd) # now read it back with contextlib.closing(self._get_connection()) as conn: lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) - td2 = fd2.find(td.uuid) - self.assertEqual(td2.intention, states.REVERT) + rd2 = fd2.find(rd.uuid) + self.assertEqual(rd2.intention, states.REVERT) + self.assertIsInstance(rd2, logbook.RetryDetail) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 4218571c..ad69ec59 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -263,7 +263,7 @@ class EngineParallelFlowTest(utils.EngineTestBase): with contextlib.closing(self.backend.get_connection()) as conn: fd.update(conn.update_flow_details(fd)) - td.update(conn.update_task_details(td)) + td.update(conn.update_atom_details(td)) engine = self._make_engine(flow, fd) engine.run() diff --git a/taskflow/tests/unit/test_duration.py b/taskflow/tests/unit/test_duration.py index b2b636c5..2e5ab84b 100644 --- a/taskflow/tests/unit/test_duration.py +++ b/taskflow/tests/unit/test_duration.py @@ -55,7 +55,7 @@ class TestDuration(test.TestCase): e = self.make_engine(flo, fd, be) with timing.TimingListener(e): e.run() - t_uuid = e.storage.get_task_uuid("test-1") + t_uuid = e.storage.get_atom_uuid("test-1") td = fd.find(t_uuid) self.assertIsNotNone(td) self.assertIsNotNone(td.meta) diff --git a/taskflow/tests/unit/test_progress.py b/taskflow/tests/unit/test_progress.py index c74aeab0..987c237c 100644 --- a/taskflow/tests/unit/test_progress.py +++ b/taskflow/tests/unit/test_progress.py @@ -100,7 +100,7 @@ class TestProgress(test.TestCase): e.run() end_progress = e.storage.get_task_progress("test") self.assertEqual(1.0, end_progress) - task_uuid = e.storage.get_task_uuid("test") + task_uuid = e.storage.get_atom_uuid("test") td = fd.find(task_uuid) self.assertEqual(1.0, td.meta['progress']) self.assertFalse(td.meta['progress_details']) @@ -135,7 +135,7 @@ class TestProgress(test.TestCase): end_progress = e.storage.get_task_progress("test") self.assertEqual(1.0, end_progress) - task_uuid = e.storage.get_task_uuid("test") + task_uuid = e.storage.get_atom_uuid("test") td = fd.find(task_uuid) self.assertEqual(1.0, td.meta['progress']) self.assertFalse(td.meta['progress_details']) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index d09944f5..97d33e77 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -395,10 +395,10 @@ class RetryTest(utils.EngineTestBase): engine = self._make_engine(flow) engine.compile() utils.register_notifiers(engine, self.values) - engine.storage.set_task_state('r1', st.RETRYING) - engine.storage.set_task_state('t1', st.PENDING) - engine.storage.set_task_state('t2', st.REVERTED) - engine.storage.set_task_state('t3', st.REVERTED) + engine.storage.set_atom_state('r1', st.RETRYING) + engine.storage.set_atom_state('t1', st.PENDING) + engine.storage.set_atom_state('t2', st.REVERTED) + engine.storage.set_atom_state('t3', st.REVERTED) engine.run() expected = ['flow RUNNING', @@ -427,9 +427,9 @@ class RetryTest(utils.EngineTestBase): engine.compile() utils.register_notifiers(engine, self.values) engine.storage.set_atom_intention('r1', st.RETRY) - engine.storage.set_task_state('r1', st.SUCCESS) - engine.storage.set_task_state('t1', st.REVERTED) - engine.storage.set_task_state('t2', st.REVERTED) + engine.storage.set_atom_state('r1', st.SUCCESS) + engine.storage.set_atom_state('t1', st.REVERTED) + engine.storage.set_atom_state('t2', st.REVERTED) engine.run() expected = ['flow RUNNING', @@ -575,7 +575,7 @@ class RetryTest(utils.EngineTestBase): if when == 'task updated': return engine # we schedule task1 for reversion - engine.storage.set_task_state('task1', st.REVERTING) + engine.storage.set_atom_state('task1', st.REVERTING) if when == 'revert scheduled': return engine raise ValueError('Invalid crash point: %s' % when) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 32d4d492..eb088190 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -61,7 +61,7 @@ class StorageTestMixin(object): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) s = storage.SingleThreadedStorage(flow_detail=flow_detail) s.ensure_task('my_task') - self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my_task'))) + self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task'))) def test_flow_name_and_uuid(self): flow_detail = logbook.FlowDetail(name='test-fd', uuid='aaaa') @@ -72,8 +72,8 @@ class StorageTestMixin(object): def test_ensure_task(self): s = self._get_storage() s.ensure_task('my task') - self.assertEqual(s.get_task_state('my task'), states.PENDING) - self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task'))) + self.assertEqual(s.get_atom_state('my task'), states.PENDING) + self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task'))) def test_get_tasks_states(self): s = self._get_storage() @@ -84,13 +84,13 @@ class StorageTestMixin(object): 'my task': (states.SUCCESS, states.EXECUTE), 'my task2': (states.PENDING, states.EXECUTE), } - self.assertEqual(s.get_tasks_states(['my task', 'my task2']), expected) + self.assertEqual(s.get_atoms_states(['my task', 'my task2']), expected) def test_ensure_task_flow_detail(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) s = self._get_storage(flow_detail) s.ensure_task('my task', '3.11') - td = flow_detail.find(s.get_task_uuid('my task')) + td = flow_detail.find(s.get_atom_uuid('my task')) self.assertIsNotNone(td) self.assertEqual(td.name, 'my task') self.assertEqual(td.version, '3.11') @@ -101,7 +101,7 @@ class StorageTestMixin(object): td = logbook.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) s = self._get_storage(flow_detail) - self.assertEqual('42', s.get_task_uuid('my_task')) + self.assertEqual('42', s.get_atom_uuid('my_task')) def test_ensure_existing_task(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) @@ -109,7 +109,7 @@ class StorageTestMixin(object): flow_detail.add(td) s = self._get_storage(flow_detail) s.ensure_task('my_task') - self.assertEqual('42', s.get_task_uuid('my_task')) + self.assertEqual('42', s.get_atom_uuid('my_task')) def test_save_and_get(self): s = self._get_storage() @@ -117,22 +117,22 @@ class StorageTestMixin(object): s.save('my task', 5) self.assertEqual(s.get('my task'), 5) self.assertEqual(s.fetch_all(), {}) - self.assertEqual(s.get_task_state('my task'), states.SUCCESS) + self.assertEqual(s.get_atom_state('my task'), states.SUCCESS) def test_save_and_get_other_state(self): s = self._get_storage() s.ensure_task('my task') s.save('my task', 5, states.FAILURE) self.assertEqual(s.get('my task'), 5) - self.assertEqual(s.get_task_state('my task'), states.FAILURE) + self.assertEqual(s.get_atom_state('my task'), states.FAILURE) - def test_save_and_get_failure(self): + def test_save_and_get_cached_failure(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') s.save('my task', failure, states.FAILURE) self.assertEqual(s.get('my task'), failure) - self.assertEqual(s.get_task_state('my task'), states.FAILURE) + self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertTrue(s.has_failures()) self.assertEqual(s.get_failures(), {'my task': failure}) @@ -143,18 +143,19 @@ class StorageTestMixin(object): s.save('my task', failure, states.FAILURE) self.assertEqual(s.get('my task'), failure) s._failures['my task'] = None - self.assertEqual(s.get('my task'), failure) + self.assertTrue(failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) + s = self._get_storage() s.ensure_task('my task') s.save('my task', failure, states.FAILURE) - s.set_task_state('my task', states.REVERTING) + s.set_atom_state('my task', states.REVERTING) self.assertEqual(s.get('my task'), failure) - s.set_task_state('my task', states.REVERTED) + s.set_atom_state('my task', states.REVERTED) self.assertEqual(s.get('my task'), failure) def test_get_failure_after_reload(self): @@ -163,10 +164,10 @@ class StorageTestMixin(object): s.ensure_task('my task') s.save('my task', failure, states.FAILURE) s2 = self._get_storage(s._flowdetail) - self.assertIs(s2.has_failures(), True) - self.assertEqual(s2.get_failures(), {'my task': failure}) - self.assertEqual(s2.get('my task'), failure) - self.assertEqual(s2.get_task_state('my task'), states.FAILURE) + self.assertTrue(s2.has_failures()) + self.assertEqual(1, len(s2.get_failures())) + self.assertTrue(failure.matches(s2.get('my task'))) + self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) def test_get_non_existing_var(self): s = self._get_storage() @@ -178,7 +179,7 @@ class StorageTestMixin(object): s.ensure_task('my task') s.save('my task', 5) s.reset('my task') - self.assertEqual(s.get_task_state('my task'), states.PENDING) + self.assertEqual(s.get_atom_state('my task'), states.PENDING) self.assertRaises(exceptions.NotFound, s.get, 'my task') def test_reset_unknown_task(self): @@ -203,11 +204,11 @@ class StorageTestMixin(object): def test_task_metadata_update_with_none(self): s = self._get_storage() s.ensure_task('my task') - s.update_task_metadata('my task', None) + s.update_atom_metadata('my task', None) self.assertEqual(s.get_task_progress('my task'), 0.0) s.set_task_progress('my task', 0.5) self.assertEqual(s.get_task_progress('my task'), 0.5) - s.update_task_metadata('my task', None) + s.update_atom_metadata('my task', None) self.assertEqual(s.get_task_progress('my task'), 0.5) def test_default_task_progress(self): @@ -357,24 +358,24 @@ class StorageTestMixin(object): s = self._get_storage() state = states.PENDING s.ensure_task('my task') - s.set_task_state('my task', state) - self.assertEqual(s.get_task_state('my task'), state) + s.set_atom_state('my task', state) + self.assertEqual(s.get_atom_state('my task'), state) def test_get_state_of_unknown_task(self): s = self._get_storage() self.assertRaisesRegexp(exceptions.NotFound, '^Unknown', - s.get_task_state, 'my task') + s.get_atom_state, 'my task') def test_task_by_name(self): s = self._get_storage() s.ensure_task('my task') - self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task'))) + self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task'))) def test_unknown_task_by_name(self): s = self._get_storage() self.assertRaisesRegexp(exceptions.NotFound, - '^Unknown task name:', - s.get_task_uuid, '42') + '^Unknown atom', + s.get_atom_uuid, '42') def test_initial_flow_state(self): s = self._get_storage() @@ -451,7 +452,7 @@ class StorageTestMixin(object): s = self._get_storage() s.ensure_task('my retry') self.assertRaisesRegexp(exceptions.Duplicate, - '^Task detail', s.ensure_retry, 'my retry') + '^Atom detail', s.ensure_retry, 'my retry') def test_save_retry_results(self): s = self._get_storage() @@ -481,21 +482,21 @@ class StorageTestMixin(object): self.assertEqual(history, []) self.assertEqual(s.fetch_all(), {}) - def test_retry_failure(self): - fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + def test_cached_retry_failure(self): + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_retry('my retry', result_mapping={'x': 0}) s.save('my retry', 'a') - s.save('my retry', fail, states.FAILURE) + s.save('my retry', failure, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (fail, {})]) + self.assertEqual(history, [('a', {}), (failure, {})]) self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my retry': fail}) + self.assertEqual(s.get_failures(), {'my retry': failure}) def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, - 'Unknown atom type', - logbook.get_atom_detail_class, 'some_detail') + 'Unknown atom', + logbook.atom_detail_class, 'some_detail') def test_save_task_intention(self): s = self._get_storage() diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 85ccf753..75092003 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -26,7 +26,6 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.tests import utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as pu class TestWorkerTaskExecutor(test.MockTestCase): @@ -113,7 +112,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): def test_on_message_response_state_failure(self): failure = misc.Failure.from_exception(Exception('test')) - failure_dict = pu.failure_to_dict(failure) + failure_dict = failure.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() ex._requests_cache.set(self.task_uuid, self.request_inst_mock) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index c0709ba7..27d6e00d 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -22,7 +22,6 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.tests import utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as pu class TestProtocol(test.TestCase): @@ -83,15 +82,14 @@ class TestProtocol(test.TestCase): def test_to_dict_with_result_failure(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) - expected = self.request_to_dict( - result=('failure', pu.failure_to_dict(failure))) + expected = self.request_to_dict(result=('failure', failure.to_dict())) self.assertEqual(self.request(result=failure).to_dict(), expected) def test_to_dict_with_failures(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) request = self.request(failures={self.task.name: failure}) expected = self.request_to_dict( - failures={self.task.name: pu.failure_to_dict(failure)}) + failures={self.task.name: failure.to_dict()}) self.assertEqual(request.to_dict(), expected) @mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock') diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 93234c31..a4eab7a8 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -274,12 +274,15 @@ class TestServer(test.MockTestCase): self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) - @mock.patch('taskflow.engines.worker_based.server.pu') - def test_process_request_parse_request_failure(self, pu_mock): - failure_dict = 'failure_dict' + @mock.patch.object(misc.Failure, 'from_dict') + @mock.patch.object(misc.Failure, 'to_dict') + def test_process_request_parse_request_failure(self, to_mock, from_mock): + failure_dict = { + 'failure': 'failure', + } failure = misc.Failure.from_exception(RuntimeError('Woot!')) - pu_mock.failure_to_dict.return_value = failure_dict - pu_mock.failure_from_dict.side_effect = ValueError('Woot!') + to_mock.return_value = failure_dict + from_mock.side_effect = ValueError('Woot!') request = self.make_request(result=failure) # create server and process request @@ -293,12 +296,14 @@ class TestServer(test.MockTestCase): self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.assertEqual(master_mock_calls, self.master_mock.mock_calls) - @mock.patch('taskflow.engines.worker_based.server.pu') - def test_process_request_endpoint_not_found(self, pu_mock): - failure_dict = 'failure_dict' - pu_mock.failure_to_dict.return_value = failure_dict + @mock.patch.object(misc.Failure, 'to_dict') + def test_process_request_endpoint_not_found(self, to_mock): + failure_dict = { + 'failure': 'failure', + } + to_mock.return_value = failure_dict request = self.make_request(task=mock.MagicMock(name='')) # create server and process request @@ -314,10 +319,12 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch('taskflow.engines.worker_based.server.pu') - def test_process_request_execution_failure(self, pu_mock): - failure_dict = 'failure_dict' - pu_mock.failure_to_dict.return_value = failure_dict + @mock.patch.object(misc.Failure, 'to_dict') + def test_process_request_execution_failure(self, to_mock): + failure_dict = { + 'failure': 'failure', + } + to_mock.return_value = failure_dict request = self.make_request() request['action'] = '' @@ -337,10 +344,12 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch('taskflow.engines.worker_based.server.pu') - def test_process_request_task_failure(self, pu_mock): - failure_dict = 'failure_dict' - pu_mock.failure_to_dict.return_value = failure_dict + @mock.patch.object(misc.Failure, 'to_dict') + def test_process_request_task_failure(self, to_mock): + failure_dict = { + 'failure': 'failure', + } + to_mock.return_value = failure_dict request = self.make_request(task=utils.TaskWithFailure(), arguments={}) # create server and process request diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index ee8d7e2b..8ca0f74a 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -531,6 +531,7 @@ class Failure(object): Failure objects encapsulate exception information so that it can be re-used later to re-raise or inspect. """ + DICT_VERSION = 1 def __init__(self, exc_info=None, **kwargs): if not kwargs: @@ -663,6 +664,23 @@ class Failure(object): for et in self._exc_type_names: yield et + @classmethod + def from_dict(cls, data): + data = dict(data) + version = data.pop('version', None) + if version != cls.DICT_VERSION: + raise ValueError('Invalid dict version of failure object: %r' + % version) + return cls(**data) + + def to_dict(self): + return { + 'exception_str': self.exception_str, + 'traceback_str': self.traceback_str, + 'exc_type_names': list(self), + 'version': self.DICT_VERSION, + } + def copy(self): return Failure(exc_info=copy_exc_info(self.exc_info), exception_str=self.exception_str, diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 0f14f383..f58cea09 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -15,11 +15,8 @@ # under the License. import contextlib -import copy import logging -import six - from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook @@ -92,130 +89,6 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): return flow_detail -def _copy_function(deep_copy): - if deep_copy: - return copy.deepcopy - else: - return lambda x: x - - -def task_details_merge(td_e, td_new, deep_copy=False): - """Merges an existing task details with a new task details object. - - The new task details fields, if they differ will replace the existing - objects fields (except name, version, uuid which can not be replaced). - - If 'deep_copy' is True, fields are copied deeply (by value) if possible. - """ - if td_e is td_new: - return td_e - - copy_fn = _copy_function(deep_copy) - # NOTE(imelnikov): states and intentions are just strings, no need to copy. - td_e.state = td_new.state - td_e.intention = td_new.intention - if td_e.results != td_new.results: - td_e.results = copy_fn(td_new.results) - if td_e.failure != td_new.failure: - # NOTE(imelnikov): we can't just deep copy Failures, as they - # contain tracebacks, which are not copyable. - if deep_copy: - td_e.failure = td_new.failure.copy() - else: - td_e.failure = td_new.failure - if td_e.meta != td_new.meta: - td_e.meta = copy_fn(td_new.meta) - if td_e.version != td_new.version: - td_e.version = copy_fn(td_new.version) - return td_e - - -def flow_details_merge(fd_e, fd_new, deep_copy=False): - """Merges an existing flow details with a new flow details object. - - The new flow details fields, if they differ will replace the existing - objects fields (except name and uuid which can not be replaced). - - If 'deep_copy' is True, fields are copied deeply (by value) if possible. - """ - if fd_e is fd_new: - return fd_e - - copy_fn = _copy_function(deep_copy) - if fd_e.meta != fd_new.meta: - fd_e.meta = copy_fn(fd_new.meta) - if fd_e.state != fd_new.state: - # NOTE(imelnikov): states are just strings, no need to copy. - fd_e.state = fd_new.state - return fd_e - - -def logbook_merge(lb_e, lb_new, deep_copy=False): - """Merges an existing logbook with a new logbook object. - - The new logbook fields, if they differ will replace the existing - objects fields (except name and uuid which can not be replaced). - - If 'deep_copy' is True, fields are copied deeply (by value) if possible. - """ - if lb_e is lb_new: - return lb_e - - copy_fn = _copy_function(deep_copy) - if lb_e.meta != lb_new.meta: - lb_e.meta = copy_fn(lb_new.meta) - return lb_e - - -def failure_to_dict(failure): - """Convert misc.Failure object to JSON-serializable dict.""" - if not failure: - return None - if not isinstance(failure, misc.Failure): - raise TypeError('Failure object expected, but got %r' - % failure) - return { - 'exception_str': failure.exception_str, - 'traceback_str': failure.traceback_str, - 'exc_type_names': list(failure), - 'version': 1 - } - - -def failure_from_dict(data): - """Restore misc.Failure object from dict. - - The dict should be similar to what failure_to_dict() function produces. - """ - if not data: - return None - version = data.pop('version', None) - if version != 1: - raise ValueError('Invalid version of saved Failure object: %r' - % version) - return misc.Failure(**data) - - -def encode_retry_results(results): - new_results = [] - for (data, failures) in results: - new_failures = {} - for key, value in six.iteritems(failures): - new_failures[key] = failure_to_dict(value) - new_results.append((data, new_failures)) - return new_results - - -def decode_retry_results(results): - new_results = [] - for (data, failures) in results: - new_failures = {} - for key, value in six.iteritems(failures): - new_failures[key] = failure_from_dict(value) - new_results.append((data, new_failures)) - return new_results - - def _format_meta(metadata, indent): """Format the common metadata dictionary in the same manner.""" if not metadata: @@ -245,18 +118,18 @@ def _format_shared(obj, indent): return lines -def pformat_task_detail(task_detail, indent=0): - """Pretty formats a task detail.""" - detail_type = task_detail.atom_type - lines = ["%s%s: '%s'" % (" " * (indent), detail_type, task_detail.name)] - lines.extend(_format_shared(task_detail, indent=indent + 1)) +def pformat_atom_detail(atom_detail, indent=0): + """Pretty formats a atom detail.""" + detail_type = logbook.atom_detail_type(atom_detail) + lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)] + lines.extend(_format_shared(atom_detail, indent=indent + 1)) lines.append("%s- version = %s" - % (" " * (indent + 1), misc.get_version_string(task_detail))) + % (" " * (indent + 1), misc.get_version_string(atom_detail))) lines.append("%s- results = %s" - % (" " * (indent + 1), task_detail.results)) + % (" " * (indent + 1), atom_detail.results)) lines.append("%s- failure = %s" % (" " * (indent + 1), - bool(task_detail.failure))) - lines.extend(_format_meta(task_detail.meta, indent=indent + 1)) + bool(atom_detail.failure))) + lines.extend(_format_meta(atom_detail.meta, indent=indent + 1)) return "\n".join(lines) @@ -266,7 +139,7 @@ def pformat_flow_detail(flow_detail, indent=0): lines.extend(_format_shared(flow_detail, indent=indent + 1)) lines.extend(_format_meta(flow_detail.meta, indent=indent + 1)) for task_detail in flow_detail: - lines.append(pformat_task_detail(task_detail, indent=indent + 1)) + lines.append(pformat_atom_detail(task_detail, indent=indent + 1)) return "\n".join(lines) @@ -286,86 +159,3 @@ def pformat(book, indent=0): for flow_detail in book: lines.append(pformat_flow_detail(flow_detail, indent=indent + 1)) return "\n".join(lines) - - -def _str_2_datetime(text): - """Converts an iso8601 string/text into a datetime object (or none).""" - if text is None: - return None - if not isinstance(text, six.string_types): - raise ValueError("Can only convert strings into a datetime object and" - " not %r" % (text)) - if not len(text): - return None - return timeutils.parse_isotime(text) - - -def format_task_detail(td): - results = td.results - if td.atom_type == logbook.RETRY_DETAIL: - results = encode_retry_results(results) - - return { - 'failure': failure_to_dict(td.failure), - 'meta': td.meta, - 'name': td.name, - 'results': results, - 'state': td.state, - 'version': td.version, - 'atom_type': td.atom_type, - 'intention': td.intention, - } - - -def unformat_task_detail(uuid, td_data): - results = td_data.get('results') - if td_data['atom_type'] == logbook.RETRY_DETAIL: - results = decode_retry_results(results) - - atom_cls = logbook.get_atom_detail_class(td_data['atom_type']) - td = atom_cls(name=td_data['name'], uuid=uuid) - td.state = td_data.get('state') - td.failure = failure_from_dict(td_data.get('failure')) - td.intention = td_data.get('intention') - td.results = results - td.meta = td_data.get('meta') - td.version = td_data.get('version') - return td - - -def format_flow_detail(fd): - return { - 'name': fd.name, - 'meta': fd.meta, - 'state': fd.state, - } - - -def unformat_flow_detail(uuid, fd_data): - fd = logbook.FlowDetail(name=fd_data['name'], uuid=uuid) - fd.state = fd_data.get('state') - fd.meta = fd_data.get('meta') - return fd - - -def format_logbook(lb, created_at=None): - lb_data = { - 'name': lb.name, - 'meta': lb.meta, - } - if created_at: - lb_data['created_at'] = timeutils.isotime(at=created_at) - lb_data['updated_at'] = timeutils.isotime() - else: - lb_data['created_at'] = timeutils.isotime() - lb_data['updated_at'] = None - return lb_data - - -def unformat_logbook(uuid, lb_data): - lb = logbook.LogBook(name=lb_data['name'], - uuid=uuid, - updated_at=_str_2_datetime(lb_data['updated_at']), - created_at=_str_2_datetime(lb_data['created_at'])) - lb.meta = lb_data.get('meta') - return lb