From a2c8930b52e63c6739d503e2f3eb273bf676a272 Mon Sep 17 00:00:00 2001 From: Victor Rodionov Date: Thu, 12 Sep 2013 22:15:56 +0400 Subject: [PATCH] Task progress Add update progress method to base task class Implements blueprint: task-progress Change-Id: Id7d554ce6c31778f1f1a084a3b268257e66a5fea --- .gitignore | 3 ++ taskflow/engines/action_engine/task_action.py | 18 +++++++ taskflow/storage.py | 41 ++++++++++++++++ taskflow/task.py | 49 +++++++++++++++++++ taskflow/tests/unit/test_action_engine.py | 10 ++++ 5 files changed, 121 insertions(+) diff --git a/.gitignore b/.gitignore index e731d131d..52bed0ef3 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,6 @@ nosetests.xml build AUTHORS ChangeLog + +.idea +env diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 31341c0bd..4944debc6 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -51,6 +51,10 @@ class TaskAction(base.Action): def _change_state(self, engine, state): """Check and update state of task.""" + if state in (states.RUNNING, states.REVERTING, states.PENDING): + self._task.update_progress(0.0) + elif state in (states.SUCCESS, states.REVERTED): + self._task.update_progress(1.0) engine.storage.set_task_state(self.uuid, state) engine.on_task_state_change(self, state) @@ -62,9 +66,16 @@ class TaskAction(base.Action): engine.storage.save(self.uuid, result, state) engine.on_task_state_change(self, state, result) + def _update_progress(self, task, event_data, progress, **kwargs): + """Update task progress value that stored in engine.""" + engine = event_data['engine'] + engine.storage.set_task_progress(self.uuid, progress, **kwargs) + def execute(self, engine): if engine.storage.get_task_state(self.uuid) == states.SUCCESS: return + self._task.bind('update_progress', self._update_progress, + engine=engine) try: kwargs = engine.storage.fetch_mapped_args(self._args_mapping) self._change_state(engine, states.RUNNING) @@ -75,6 +86,8 @@ class TaskAction(base.Action): failure.reraise() else: self._update_result(engine, states.SUCCESS, result) + finally: + self._task.unbind('update_progress', self._update_progress) def revert(self, engine): if engine.storage.get_task_state(self.uuid) == states.PENDING: @@ -82,7 +95,10 @@ class TaskAction(base.Action): # execution was at least attempted, so we should give # task a chance for cleanup return + self._task.bind('update_progress', self._update_progress, + engine=engine) kwargs = engine.storage.fetch_mapped_args(self._args_mapping) + self._change_state(engine, states.REVERTING) try: self._task.revert(result=engine.storage.get(self._id), @@ -93,3 +109,5 @@ class TaskAction(base.Action): self._change_state(engine, states.FAILURE) else: self._update_result(engine, states.PENDING) + finally: + self._task.unbind('update_progress', self._update_progress) diff --git a/taskflow/storage.py b/taskflow/storage.py index 5e8837906..b87f860b5 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -120,6 +120,47 @@ class Storage(object): """Get state of task with given uuid""" return self._taskdetail_by_uuid(uuid).state + def set_task_progress(self, uuid, progress, **kwargs): + """Set task progress. + + :param uuid: task uuid + :param progress: task progress + :param kwargs: task specific progress information + """ + td = self._taskdetail_by_uuid(uuid) + if not td.meta: + td.meta = {} + td.meta['progress'] = progress + if kwargs: + td.meta['progress_details'] = kwargs + else: + if 'progress_details' in td.meta: + td.meta.pop('progress_details') + self._with_connection(self._save_task_detail, task_detail=td) + + def get_task_progress(self, uuid): + """Get progress of task with given uuid. + + :param uuid: task uuid + :returns: current task progress value + """ + meta = self._taskdetail_by_uuid(uuid).meta + if not meta: + return 0.0 + return meta.get('progress', 0.0) + + def get_task_progress_details(self, uuid): + """Get progress details of task with given uuid. + + :param uuid: task uuid + :returns: None if progress_details not defined, else progress_details + dict + """ + meta = self._taskdetail_by_uuid(uuid).meta + if not meta: + return None + return meta.get('progress_details') + def _check_all_results_provided(self, uuid, task_name, data): """Warn if task did not provide some of results diff --git a/taskflow/task.py b/taskflow/task.py index 787a346fa..073adbff0 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -100,6 +100,8 @@ class BaseTask(object): """ __metaclass__ = abc.ABCMeta + TASK_EVENTS = ('update_progress', ) + def __init__(self, name, provides=None): self._name = name # An *immutable* input 'resource' name mapping this task depends @@ -116,6 +118,9 @@ class BaseTask(object): # can be useful in resuming older versions of tasks. Standard # major, minor version semantics apply. self.version = (1, 0) + # List of callback functions to invoke when progress updated. + self._on_update_progress_notify = [] + self._events_listeners = {} @property def name(self): @@ -148,6 +153,50 @@ class BaseTask(object): def requires(self): return set(self.rebind.values()) + def update_progress(self, progress, **kwargs): + """Update task progress and notify all registered listeners. + + :param progress: task progress float value between 0 and 1 + :param kwargs: task specific progress information + """ + self._trigger('update_progress', progress, **kwargs) + + def _trigger(self, event, *args, **kwargs): + """Execute all handlers for the given event type.""" + if event in self._events_listeners: + for handler in self._events_listeners[event]: + event_data = self._events_listeners[event][handler] + handler(self, event_data, *args, **kwargs) + + def bind(self, event, handler, **kwargs): + """Attach a handler to an event for the task. + + :param event: event type + :param handler: function to execute each time event is triggered + :param kwargs: optional named parameters that will be passed to the + event handler + :raises ValueError: if invalid event type passed + """ + if event not in self.TASK_EVENTS: + raise ValueError("Unknown task event %s" % event) + if event not in self._events_listeners: + self._events_listeners[event] = {} + self._events_listeners[event][handler] = kwargs + + def unbind(self, event, handler=None): + """Remove a previously-attached event handler from the task. If handler + function not passed, then unbind all event handlers. + + :param event: event type + :param handler: previously attached to event function + """ + if event in self._events_listeners: + if not handler: + self._events_listeners[event] = {} + else: + if handler in self._events_listeners[event]: + self._events_listeners[event].pop(handler) + class Task(BaseTask): """Base class for user-defined tasks diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 4e46777fa..507805bcb 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -47,23 +47,29 @@ class TestTask(task.Task): self._sleep = sleep def execute(self, **kwargs): + self.update_progress(0.0) if self._sleep: time.sleep(self._sleep) self.values.append(self.name) + self.update_progress(1.0) return 5 def revert(self, **kwargs): + self.update_progress(0) if self._sleep: time.sleep(self._sleep) self.values.append(self.name + ' reverted(%s)' % kwargs.get('result')) + self.update_progress(1.0) class FailingTask(TestTask): def execute(self, **kwargs): + self.update_progress(0) if self._sleep: time.sleep(self._sleep) + self.update_progress(0.99) raise RuntimeError('Woot!') @@ -95,9 +101,13 @@ class MultiargsTask(task.Task): class MultiDictTask(task.Task): def execute(self): + self.update_progress(0) output = {} + total = len(sorted(self.provides)) for i, k in enumerate(sorted(self.provides)): output[k] = i + self.update_progress(i / total) + self.update_progress(1.0) return output