diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py index be1d0dc6..5ac1a662 100644 --- a/taskflow/backends/memory.py +++ b/taskflow/backends/memory.py @@ -60,6 +60,10 @@ class MemoryCatalog(catalog.Catalog): self._closed = False self._lock = threading.RLock() + def __len__(self): + with self._lock: + return len(self._catalogs) + def __contains__(self, job): with self._lock: for (j, b) in self._catalogs: @@ -77,7 +81,7 @@ class MemoryCatalog(catalog.Catalog): if j == job: return b b = MemoryLogBook() - self._catalogs.append((j, b)) + self._catalogs.append((job, b)) return b @check_not_closed @@ -95,12 +99,15 @@ class MemoryChapter(logbook.Chapter): for p in self._pages: yield p - def __contains__(self, name): + def __contains__(self, page_name): for p in self._pages: - if p.name == name: + if p.name == page_name: return True return False + def fetch_pages(self, page_name): + return [p for p in self._pages if p.name == page_name] + def __len__(self): return len(self._pages) @@ -118,10 +125,19 @@ class MemoryLogBook(logbook.LogBook): @check_not_closed def add_chapter(self, chapter_name): if chapter_name in self._chapter_names: - raise exc.ChapterAlreadyExists() + raise exc.ChapterAlreadyExists("Chapter %s already exists" % + (chapter_name)) self._chapters.append(MemoryChapter(self, chapter_name)) self._chapter_names.add(chapter_name) + @check_not_closed + def fetch_chapter(self, chapter_name): + if chapter_name not in self._chapter_names: + raise exc.ChapterNotFound("No chapter named %s" % (chapter_name)) + for c in self._chapters: + if c.name == chapter_name: + return c + @check_not_closed def __iter__(self): for c in self._chapters: @@ -133,7 +149,7 @@ class MemoryLogBook(logbook.LogBook): @check_not_closed def __contains__(self, chapter_name): for c in self: - if c.name == name: + if c.name == chapter_name: return True return False diff --git a/taskflow/catalog.py b/taskflow/catalog.py index 55124f80..2f64438d 100644 --- a/taskflow/catalog.py +++ b/taskflow/catalog.py @@ -29,6 +29,10 @@ class Catalog(object): """Checks if the given catalog has a logbook for a job.""" raise NotImplementedError() + def __len__(self): + """Gets how many logbooks are in this catalog.""" + raise NotImplementedError() + @abc.abstractmethod def create_or_fetch(self, job): """Creates a new logbook for a job or gives back an old one.""" diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 0c9e9199..16866f80 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -17,7 +17,12 @@ # under the License. -class TaskException(Exception): +class TaskFlowException(Exception): + """Base class for exceptions emitted from this library.""" + pass + + +class TaskException(TaskFlowException): """When a task failure occurs the following object will be given to revert and can be used to interrogate what caused the failure.""" @@ -28,27 +33,32 @@ class TaskException(Exception): self.cause = cause -class ChapterAlreadyExists(Exception): +class ChapterNotFound(TaskFlowException): + """Raised when a chapter of a logbook doesn't exist.""" + pass + + +class ChapterAlreadyExists(TaskFlowException): """Raised when a chapter of a logbook already exists.""" pass -class ClosedException(Exception): +class ClosedException(TaskFlowException): """Raised when an access on a closed object occurs.""" pass -class InvalidStateException(Exception): +class InvalidStateException(TaskFlowException): """Raised when a task/job/workflow is in an invalid state when an operation is attempting to apply to said task/job/workflow.""" pass -class UnclaimableJobException(Exception): +class UnclaimableJobException(TaskFlowException): """Raised when a job can not be claimed.""" pass -class JobNotFound(Exception): +class JobNotFound(TaskFlowException): """Raised when a job entry can not be found.""" pass diff --git a/taskflow/job.py b/taskflow/job.py index 31c1b3ed..02e531ec 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -42,25 +42,33 @@ class Job(object): information is provided about said work to be able to attempt to fullfill said work.""" + __metaclass__ = abc.ABCMeta + def __init__(self, name, context, catalog, claimer): self.name = name self.context = context - self.state = states.UNCLAIMED self.owner = None self.posted_on = [] self._catalog = catalog self._claimer = claimer self._logbook = None self._id = str(uuid.uuid4().hex) + self._state = states.UNCLAIMED + + @property + def state(self): + return self._state + + def _change_state(self, new_state): + if self.state != new_state: + self._state = new_state + # TODO(harlowja): add logbook info? @property def logbook(self): """Fetches (or creates) a logbook entry for this job.""" if self._logbook is None: - if self in self._catalog: - self._logbook = self._catalog.fetch(self) - else: - self._logbook = self._catalog.create(self) + self._logbook = self._catalog.create_or_fetch(self) return self._logbook def claim(self, owner): @@ -78,14 +86,6 @@ class Job(object): self.owner = owner self._change_state(states.CLAIMED) - def _change_state(self, new_state): - self.state = new_state - # TODO(harlowja): update the logbook - - def await(self, blocking=True, timeout=None): - """Attempts to wait until the job fails or finishes.""" - raise NotImplementedError() - def erase(self): """Erases any traces of this job from its associated resources.""" for b in self.posted_on: diff --git a/taskflow/logbook.py b/taskflow/logbook.py index d65128b9..e6c9c503 100644 --- a/taskflow/logbook.py +++ b/taskflow/logbook.py @@ -30,6 +30,10 @@ class Page(object): self.name = name self.metadata = metadata + def __str__(self): + return "Page (%s, %s): %s" % (self.name, self.date_created, + self.metadata) + class Chapter(object): """Base class for what a chapter of a logbook should provide.""" @@ -48,11 +52,16 @@ class Chapter(object): raise NotImplementedError() @abc.abstractmethod - def __contains__(self, name): + def __contains__(self, page_name): """Determines if any page with the given name exists in this chapter.""" raise NotImplementedError() + @abc.abstractmethod + def fetch_pages(self, page_name): + """Fetch any pages that match the given page name.""" + raise NotImplementedError() + @abc.abstractmethod def add_page(self, page): """Adds a page to the underlying chapter.""" @@ -63,6 +72,9 @@ class Chapter(object): """Returns how many pages the underlying chapter has.""" raise NotImplementedError() + def __str__(self): + return "Chapter (%s): %s pages" % (self.name, len(self)) + class LogBook(object): """Base class for what a logbook should provide""" @@ -77,6 +89,7 @@ class LogBook(object): """ raise NotImplementedError() + @abc.abstractmethod def fetch_chapter(self, chapter_name): """Fetches the given chapter or raises an exception if that chapter does not exist.""" diff --git a/taskflow/patterns/ordered_workflow.py b/taskflow/patterns/ordered_workflow.py index 3942a418..2408ac95 100644 --- a/taskflow/patterns/ordered_workflow.py +++ b/taskflow/patterns/ordered_workflow.py @@ -18,6 +18,7 @@ import abc import copy +import functools import logging from taskflow.openstack.common import excutils @@ -54,17 +55,22 @@ class Workflow(object): # the contract we have with tasks that they will be given the value # they returned if reversion is triggered. self.result_fetcher = None - # Any objects that want to listen when a task starts/stops/completes + # Any objects that want to listen when a wf/task starts/stops/completes # or errors should be registered here. This can be used to monitor # progress and record tasks finishing (so that it becomes possible to # store the result of a task in some persistent or semi-persistent # storage backend). + self.task_listeners = [] self.listeners = [] # The state of this flow. - self.state = states.PENDING + self._state = states.PENDING # Tasks results are stored here... self.results = [] + @property + def state(self): + return self._state + @abc.abstractmethod def add(self, task): raise NotImplementedError() @@ -108,8 +114,8 @@ class Workflow(object): self._change_state(context, states.RESUMING) last_task = 0 if result_fetcher: - for (i, task) in enumerate(self.tasks): - (has_result, result) = result_fetcher(context, self, task) + for (i, task) in enumerate(self.order()): + (has_result, result) = result_fetcher(self, task) if not has_result: break # Fake running the task so that we trigger the same @@ -129,12 +135,16 @@ class Workflow(object): do_rollback_for(task, ex) self._change_state(context, states.RUNNING) + was_interrupted = False for task in self.order(): + if self.state == states.INTERRUPTED: + was_interrupted = True + break try: has_result = False result = None if result_fetcher: - (has_result, result) = result_fetcher(context, self, task) + (has_result, result) = result_fetcher(self, task) self._on_task_start(context, task) if not has_result: inputs = self._fetch_inputs(task) @@ -150,34 +160,44 @@ class Workflow(object): except Exception as ex: do_rollback_for(task, ex) - # Only gets here if everything went successfully. - self._change_state(context, states.SUCCESS) + if not was_interrupted: + # Only gets here if everything went successfully. + self._change_state(context, states.SUCCESS) + + def reset(self): + self._state = states.PENDING + self.results = [] + self._reversions = [] + + def interrupt(self): + self._change_state(None, states.INTERRUPTED) def _change_state(self, context, new_state): if self.state != new_state: - self.state = new_state - self._on_flow_state_change(context) + old_state = self.state + self._state = new_state + self._on_flow_state_change(context, old_state) - def _on_flow_state_change(self, context): + def _on_flow_state_change(self, context, old_state): # Notify any listeners that the internal state has changed. - for i in self.listeners: - i.notify(context, self) + for f in self.listeners: + f(context, self, old_state) def _on_task_error(self, context, task): # Notify any listeners that the task has errored. - for i in self.listeners: - i.notify(context, states.FAILURE, self, task) + for f in self.task_listeners: + f(context, states.FAILURE, self, task) def _on_task_start(self, context, task): # Notify any listeners that we are about to start the given task. - for i in self.listeners: - i.notify(context, states.STARTED, self, task) + for f in self.task_listeners: + f(context, states.STARTED, self, task) def _on_task_finish(self, context, task, result): # Notify any listeners that we are finishing the given task. self._reversions.append((task, result)) - for i in self.listeners: - i.notify(context, states.SUCCESS, self, task, result=result) + for f in self.task_listeners: + f(context, states.SUCCESS, self, task, result=result) def rollback(self, context, cause): for (i, (task, result)) in enumerate(reversed(self._reversions)): diff --git a/taskflow/states.py b/taskflow/states.py index d894f0cb..861a957e 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -27,12 +27,13 @@ RESUMING = 'RESUMING' # Flow states. FAILURE = FAILURE +INTERRUPTED = 'INTERRUPTED' PENDING = PENDING +RESUMING = RESUMING REVERTING = REVERTING +RUNNING = 'RUNNING' STARTED = 'STARTED' SUCCESS = SUCCESS -RESUMING = RESUMING -RUNNING = 'RUNNING' # Task states. FAILURE = FAILURE diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index 3ca3d084..0a567473 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -18,15 +18,168 @@ from datetime import datetime -import time +import functools +import inspect import threading +import time import unittest +from taskflow import exceptions as exc from taskflow import job +from taskflow import logbook +from taskflow import states +from taskflow import task + from taskflow.backends import memory +from taskflow.patterns import linear_workflow as lw + + +class FunctorTask(task.Task): + def __init__(self, functor): + super(FunctorTask, self).__init__(functor.__name__) + self._functor = functor + + def apply(self, context, *args, **kwargs): + return self._functor(context, *args, **kwargs) + + def revert(self, context, result, cause): + pass class MemoryBackendTest(unittest.TestCase): + def testWorkJobLinearInterrupted(self): + job_claimer = memory.MemoryClaimer() + book_catalog = memory.MemoryCatalog() + + j = job.Job("the-big-action-job", {}, book_catalog, job_claimer) + self.assertEquals(states.UNCLAIMED, j.state) + j.claim("me") + self.assertEquals(states.CLAIMED, j.state) + self.assertEquals('me', j.owner) + + def wf_state_change_listener(context, wf, old_state): + if wf.name in j.logbook: + return + j.logbook.add_chapter(wf.name) + + stop_after = [] + + def task_state_change_listener(context, state, wf, task, result=None): + metadata = None + chp = j.logbook.fetch_chapter(wf.name) + if state in (states.SUCCESS,): + metadata = { + 'result': result, + } + if task.name in stop_after: + wf.interrupt() + stop_after.remove(task.name) + page_name = "%s:%s" % (task.name, state) + if page_name not in chp: + chp.add_page(logbook.Page(page_name, metadata)) + + def task_result_fetcher(context, wf, task): + chp = j.logbook.fetch_chapter(wf.name) + # Attempt to find the results page for the given workflow + # and task. + results_page = "%s:%s" % (task.name, states.SUCCESS) + if results_page in chp: + page = chp.fetch_pages(results_page)[0] + return (True, page.metadata['result']) + return (False, None) + + wf = lw.Workflow("the-big-action") + self.assertEquals(states.PENDING, wf.state) + + call_log = [] + + def do_1(context, *args, **kwargs): + call_log.append(1) + + def do_2(context, *args, **kwargs): + call_log.append(2) + + task_1 = FunctorTask(do_1) + task_2 = FunctorTask(do_2) + wf.add(task_1) + wf.add(task_2) + wf.task_listeners.append(task_state_change_listener) + wf.listeners.append(wf_state_change_listener) + wf.result_fetcher = task_result_fetcher + + # Interrupt it after task_1 finishes + stop_after.append(task_1.name) + wf.run({}) + + self.assertEquals(1, len(j.logbook)) + self.assertEquals(2, len(j.logbook.fetch_chapter("the-big-action"))) + self.assertEquals(1, len(call_log)) + + wf.reset() + self.assertEquals(states.PENDING, wf.state) + wf.run({}) + + self.assertEquals(1, len(j.logbook)) + self.assertEquals(4, len(j.logbook.fetch_chapter("the-big-action"))) + self.assertEquals(2, len(call_log)) + self.assertEquals(states.SUCCESS, wf.state) + + def testWorkJobLinearClean(self): + job_claimer = memory.MemoryClaimer() + book_catalog = memory.MemoryCatalog() + + j = job.Job("the-big-action-job", {}, book_catalog, job_claimer) + self.assertEquals(states.UNCLAIMED, j.state) + j.claim("me") + self.assertEquals(states.CLAIMED, j.state) + self.assertEquals('me', j.owner) + + def wf_state_change_listener(context, wf, old_state): + if wf.name in j.logbook: + return + j.logbook.add_chapter(wf.name) + + def task_state_change_listener(context, state, wf, task, result=None): + metadata = None + chp = j.logbook.fetch_chapter(wf.name) + if state in (states.SUCCESS,): + metadata = { + 'result': result, + } + page_name = "%s:%s" % (task.name, state) + chp.add_page(logbook.Page(page_name, metadata)) + + def task_result_fetcher(context, wf, task): + chp = j.logbook.fetch_chapter(wf.name) + results_page = "%s:%s" % (task.name, states.SUCCESS) + if results_page in chp: + page = chp.fetch_pages(results_page)[0] + return (True, page.metadata['result']) + return (False, None) + + wf = lw.Workflow("the-big-action") + self.assertEquals(states.PENDING, wf.state) + + call_log = [] + + def do_1(context, *args, **kwargs): + call_log.append(1) + + def do_2(context, *args, **kwargs): + call_log.append(2) + + wf.add(FunctorTask(do_1)) + wf.add(FunctorTask(do_2)) + wf.task_listeners.append(task_state_change_listener) + wf.listeners.append(wf_state_change_listener) + wf.result_fetcher = task_result_fetcher + wf.run({}) + + self.assertEquals(1, len(j.logbook)) + self.assertEquals(4, len(j.logbook.fetch_chapter("the-big-action"))) + self.assertEquals(2, len(call_log)) + self.assertEquals(states.SUCCESS, wf.state) + def testPostRecvJob(self): job_claimer = memory.MemoryClaimer() book_catalog = memory.MemoryCatalog()