Get a memory test example working

This commit is contained in:
Joshua Harlow
2013-05-15 17:59:26 -07:00
parent e19f67c418
commit a606032655
8 changed files with 263 additions and 46 deletions

View File

@@ -60,6 +60,10 @@ class MemoryCatalog(catalog.Catalog):
self._closed = False self._closed = False
self._lock = threading.RLock() self._lock = threading.RLock()
def __len__(self):
with self._lock:
return len(self._catalogs)
def __contains__(self, job): def __contains__(self, job):
with self._lock: with self._lock:
for (j, b) in self._catalogs: for (j, b) in self._catalogs:
@@ -77,7 +81,7 @@ class MemoryCatalog(catalog.Catalog):
if j == job: if j == job:
return b return b
b = MemoryLogBook() b = MemoryLogBook()
self._catalogs.append((j, b)) self._catalogs.append((job, b))
return b return b
@check_not_closed @check_not_closed
@@ -95,12 +99,15 @@ class MemoryChapter(logbook.Chapter):
for p in self._pages: for p in self._pages:
yield p yield p
def __contains__(self, name): def __contains__(self, page_name):
for p in self._pages: for p in self._pages:
if p.name == name: if p.name == page_name:
return True return True
return False return False
def fetch_pages(self, page_name):
return [p for p in self._pages if p.name == page_name]
def __len__(self): def __len__(self):
return len(self._pages) return len(self._pages)
@@ -118,10 +125,19 @@ class MemoryLogBook(logbook.LogBook):
@check_not_closed @check_not_closed
def add_chapter(self, chapter_name): def add_chapter(self, chapter_name):
if chapter_name in self._chapter_names: if chapter_name in self._chapter_names:
raise exc.ChapterAlreadyExists() raise exc.ChapterAlreadyExists("Chapter %s already exists" %
(chapter_name))
self._chapters.append(MemoryChapter(self, chapter_name)) self._chapters.append(MemoryChapter(self, chapter_name))
self._chapter_names.add(chapter_name) self._chapter_names.add(chapter_name)
@check_not_closed
def fetch_chapter(self, chapter_name):
if chapter_name not in self._chapter_names:
raise exc.ChapterNotFound("No chapter named %s" % (chapter_name))
for c in self._chapters:
if c.name == chapter_name:
return c
@check_not_closed @check_not_closed
def __iter__(self): def __iter__(self):
for c in self._chapters: for c in self._chapters:
@@ -133,7 +149,7 @@ class MemoryLogBook(logbook.LogBook):
@check_not_closed @check_not_closed
def __contains__(self, chapter_name): def __contains__(self, chapter_name):
for c in self: for c in self:
if c.name == name: if c.name == chapter_name:
return True return True
return False return False

View File

@@ -29,6 +29,10 @@ class Catalog(object):
"""Checks if the given catalog has a logbook for a job.""" """Checks if the given catalog has a logbook for a job."""
raise NotImplementedError() raise NotImplementedError()
def __len__(self):
"""Gets how many logbooks are in this catalog."""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def create_or_fetch(self, job): def create_or_fetch(self, job):
"""Creates a new logbook for a job or gives back an old one.""" """Creates a new logbook for a job or gives back an old one."""

View File

@@ -17,7 +17,12 @@
# under the License. # under the License.
class TaskException(Exception): class TaskFlowException(Exception):
"""Base class for exceptions emitted from this library."""
pass
class TaskException(TaskFlowException):
"""When a task failure occurs the following object will be given to revert """When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.""" and can be used to interrogate what caused the failure."""
@@ -28,27 +33,32 @@ class TaskException(Exception):
self.cause = cause self.cause = cause
class ChapterAlreadyExists(Exception): class ChapterNotFound(TaskFlowException):
"""Raised when a chapter of a logbook doesn't exist."""
pass
class ChapterAlreadyExists(TaskFlowException):
"""Raised when a chapter of a logbook already exists.""" """Raised when a chapter of a logbook already exists."""
pass pass
class ClosedException(Exception): class ClosedException(TaskFlowException):
"""Raised when an access on a closed object occurs.""" """Raised when an access on a closed object occurs."""
pass pass
class InvalidStateException(Exception): class InvalidStateException(TaskFlowException):
"""Raised when a task/job/workflow is in an invalid state when an """Raised when a task/job/workflow is in an invalid state when an
operation is attempting to apply to said task/job/workflow.""" operation is attempting to apply to said task/job/workflow."""
pass pass
class UnclaimableJobException(Exception): class UnclaimableJobException(TaskFlowException):
"""Raised when a job can not be claimed.""" """Raised when a job can not be claimed."""
pass pass
class JobNotFound(Exception): class JobNotFound(TaskFlowException):
"""Raised when a job entry can not be found.""" """Raised when a job entry can not be found."""
pass pass

View File

@@ -42,25 +42,33 @@ class Job(object):
information is provided about said work to be able to attempt to information is provided about said work to be able to attempt to
fullfill said work.""" fullfill said work."""
__metaclass__ = abc.ABCMeta
def __init__(self, name, context, catalog, claimer): def __init__(self, name, context, catalog, claimer):
self.name = name self.name = name
self.context = context self.context = context
self.state = states.UNCLAIMED
self.owner = None self.owner = None
self.posted_on = [] self.posted_on = []
self._catalog = catalog self._catalog = catalog
self._claimer = claimer self._claimer = claimer
self._logbook = None self._logbook = None
self._id = str(uuid.uuid4().hex) self._id = str(uuid.uuid4().hex)
self._state = states.UNCLAIMED
@property
def state(self):
return self._state
def _change_state(self, new_state):
if self.state != new_state:
self._state = new_state
# TODO(harlowja): add logbook info?
@property @property
def logbook(self): def logbook(self):
"""Fetches (or creates) a logbook entry for this job.""" """Fetches (or creates) a logbook entry for this job."""
if self._logbook is None: if self._logbook is None:
if self in self._catalog: self._logbook = self._catalog.create_or_fetch(self)
self._logbook = self._catalog.fetch(self)
else:
self._logbook = self._catalog.create(self)
return self._logbook return self._logbook
def claim(self, owner): def claim(self, owner):
@@ -78,14 +86,6 @@ class Job(object):
self.owner = owner self.owner = owner
self._change_state(states.CLAIMED) self._change_state(states.CLAIMED)
def _change_state(self, new_state):
self.state = new_state
# TODO(harlowja): update the logbook
def await(self, blocking=True, timeout=None):
"""Attempts to wait until the job fails or finishes."""
raise NotImplementedError()
def erase(self): def erase(self):
"""Erases any traces of this job from its associated resources.""" """Erases any traces of this job from its associated resources."""
for b in self.posted_on: for b in self.posted_on:

View File

@@ -30,6 +30,10 @@ class Page(object):
self.name = name self.name = name
self.metadata = metadata self.metadata = metadata
def __str__(self):
return "Page (%s, %s): %s" % (self.name, self.date_created,
self.metadata)
class Chapter(object): class Chapter(object):
"""Base class for what a chapter of a logbook should provide.""" """Base class for what a chapter of a logbook should provide."""
@@ -48,11 +52,16 @@ class Chapter(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def __contains__(self, name): def __contains__(self, page_name):
"""Determines if any page with the given name exists in this """Determines if any page with the given name exists in this
chapter.""" chapter."""
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def fetch_pages(self, page_name):
"""Fetch any pages that match the given page name."""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def add_page(self, page): def add_page(self, page):
"""Adds a page to the underlying chapter.""" """Adds a page to the underlying chapter."""
@@ -63,6 +72,9 @@ class Chapter(object):
"""Returns how many pages the underlying chapter has.""" """Returns how many pages the underlying chapter has."""
raise NotImplementedError() raise NotImplementedError()
def __str__(self):
return "Chapter (%s): %s pages" % (self.name, len(self))
class LogBook(object): class LogBook(object):
"""Base class for what a logbook should provide""" """Base class for what a logbook should provide"""
@@ -77,6 +89,7 @@ class LogBook(object):
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def fetch_chapter(self, chapter_name): def fetch_chapter(self, chapter_name):
"""Fetches the given chapter or raises an exception if that chapter """Fetches the given chapter or raises an exception if that chapter
does not exist.""" does not exist."""

View File

@@ -18,6 +18,7 @@
import abc import abc
import copy import copy
import functools
import logging import logging
from taskflow.openstack.common import excutils from taskflow.openstack.common import excutils
@@ -54,17 +55,22 @@ class Workflow(object):
# the contract we have with tasks that they will be given the value # the contract we have with tasks that they will be given the value
# they returned if reversion is triggered. # they returned if reversion is triggered.
self.result_fetcher = None self.result_fetcher = None
# Any objects that want to listen when a task starts/stops/completes # Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor # or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to # progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent # store the result of a task in some persistent or semi-persistent
# storage backend). # storage backend).
self.task_listeners = []
self.listeners = [] self.listeners = []
# The state of this flow. # The state of this flow.
self.state = states.PENDING self._state = states.PENDING
# Tasks results are stored here... # Tasks results are stored here...
self.results = [] self.results = []
@property
def state(self):
return self._state
@abc.abstractmethod @abc.abstractmethod
def add(self, task): def add(self, task):
raise NotImplementedError() raise NotImplementedError()
@@ -108,8 +114,8 @@ class Workflow(object):
self._change_state(context, states.RESUMING) self._change_state(context, states.RESUMING)
last_task = 0 last_task = 0
if result_fetcher: if result_fetcher:
for (i, task) in enumerate(self.tasks): for (i, task) in enumerate(self.order()):
(has_result, result) = result_fetcher(context, self, task) (has_result, result) = result_fetcher(self, task)
if not has_result: if not has_result:
break break
# Fake running the task so that we trigger the same # Fake running the task so that we trigger the same
@@ -129,12 +135,16 @@ class Workflow(object):
do_rollback_for(task, ex) do_rollback_for(task, ex)
self._change_state(context, states.RUNNING) self._change_state(context, states.RUNNING)
was_interrupted = False
for task in self.order(): for task in self.order():
if self.state == states.INTERRUPTED:
was_interrupted = True
break
try: try:
has_result = False has_result = False
result = None result = None
if result_fetcher: if result_fetcher:
(has_result, result) = result_fetcher(context, self, task) (has_result, result) = result_fetcher(self, task)
self._on_task_start(context, task) self._on_task_start(context, task)
if not has_result: if not has_result:
inputs = self._fetch_inputs(task) inputs = self._fetch_inputs(task)
@@ -150,34 +160,44 @@ class Workflow(object):
except Exception as ex: except Exception as ex:
do_rollback_for(task, ex) do_rollback_for(task, ex)
# Only gets here if everything went successfully. if not was_interrupted:
self._change_state(context, states.SUCCESS) # Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
def reset(self):
self._state = states.PENDING
self.results = []
self._reversions = []
def interrupt(self):
self._change_state(None, states.INTERRUPTED)
def _change_state(self, context, new_state): def _change_state(self, context, new_state):
if self.state != new_state: if self.state != new_state:
self.state = new_state old_state = self.state
self._on_flow_state_change(context) self._state = new_state
self._on_flow_state_change(context, old_state)
def _on_flow_state_change(self, context): def _on_flow_state_change(self, context, old_state):
# Notify any listeners that the internal state has changed. # Notify any listeners that the internal state has changed.
for i in self.listeners: for f in self.listeners:
i.notify(context, self) f(context, self, old_state)
def _on_task_error(self, context, task): def _on_task_error(self, context, task):
# Notify any listeners that the task has errored. # Notify any listeners that the task has errored.
for i in self.listeners: for f in self.task_listeners:
i.notify(context, states.FAILURE, self, task) f(context, states.FAILURE, self, task)
def _on_task_start(self, context, task): def _on_task_start(self, context, task):
# Notify any listeners that we are about to start the given task. # Notify any listeners that we are about to start the given task.
for i in self.listeners: for f in self.task_listeners:
i.notify(context, states.STARTED, self, task) f(context, states.STARTED, self, task)
def _on_task_finish(self, context, task, result): def _on_task_finish(self, context, task, result):
# Notify any listeners that we are finishing the given task. # Notify any listeners that we are finishing the given task.
self._reversions.append((task, result)) self._reversions.append((task, result))
for i in self.listeners: for f in self.task_listeners:
i.notify(context, states.SUCCESS, self, task, result=result) f(context, states.SUCCESS, self, task, result=result)
def rollback(self, context, cause): def rollback(self, context, cause):
for (i, (task, result)) in enumerate(reversed(self._reversions)): for (i, (task, result)) in enumerate(reversed(self._reversions)):

View File

@@ -27,12 +27,13 @@ RESUMING = 'RESUMING'
# Flow states. # Flow states.
FAILURE = FAILURE FAILURE = FAILURE
INTERRUPTED = 'INTERRUPTED'
PENDING = PENDING PENDING = PENDING
RESUMING = RESUMING
REVERTING = REVERTING REVERTING = REVERTING
RUNNING = 'RUNNING'
STARTED = 'STARTED' STARTED = 'STARTED'
SUCCESS = SUCCESS SUCCESS = SUCCESS
RESUMING = RESUMING
RUNNING = 'RUNNING'
# Task states. # Task states.
FAILURE = FAILURE FAILURE = FAILURE

View File

@@ -18,15 +18,168 @@
from datetime import datetime from datetime import datetime
import time import functools
import inspect
import threading import threading
import time
import unittest import unittest
from taskflow import exceptions as exc
from taskflow import job from taskflow import job
from taskflow import logbook
from taskflow import states
from taskflow import task
from taskflow.backends import memory from taskflow.backends import memory
from taskflow.patterns import linear_workflow as lw
class FunctorTask(task.Task):
def __init__(self, functor):
super(FunctorTask, self).__init__(functor.__name__)
self._functor = functor
def apply(self, context, *args, **kwargs):
return self._functor(context, *args, **kwargs)
def revert(self, context, result, cause):
pass
class MemoryBackendTest(unittest.TestCase): class MemoryBackendTest(unittest.TestCase):
def testWorkJobLinearInterrupted(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("the-big-action-job", {}, book_catalog, job_claimer)
self.assertEquals(states.UNCLAIMED, j.state)
j.claim("me")
self.assertEquals(states.CLAIMED, j.state)
self.assertEquals('me', j.owner)
def wf_state_change_listener(context, wf, old_state):
if wf.name in j.logbook:
return
j.logbook.add_chapter(wf.name)
stop_after = []
def task_state_change_listener(context, state, wf, task, result=None):
metadata = None
chp = j.logbook.fetch_chapter(wf.name)
if state in (states.SUCCESS,):
metadata = {
'result': result,
}
if task.name in stop_after:
wf.interrupt()
stop_after.remove(task.name)
page_name = "%s:%s" % (task.name, state)
if page_name not in chp:
chp.add_page(logbook.Page(page_name, metadata))
def task_result_fetcher(context, wf, task):
chp = j.logbook.fetch_chapter(wf.name)
# Attempt to find the results page for the given workflow
# and task.
results_page = "%s:%s" % (task.name, states.SUCCESS)
if results_page in chp:
page = chp.fetch_pages(results_page)[0]
return (True, page.metadata['result'])
return (False, None)
wf = lw.Workflow("the-big-action")
self.assertEquals(states.PENDING, wf.state)
call_log = []
def do_1(context, *args, **kwargs):
call_log.append(1)
def do_2(context, *args, **kwargs):
call_log.append(2)
task_1 = FunctorTask(do_1)
task_2 = FunctorTask(do_2)
wf.add(task_1)
wf.add(task_2)
wf.task_listeners.append(task_state_change_listener)
wf.listeners.append(wf_state_change_listener)
wf.result_fetcher = task_result_fetcher
# Interrupt it after task_1 finishes
stop_after.append(task_1.name)
wf.run({})
self.assertEquals(1, len(j.logbook))
self.assertEquals(2, len(j.logbook.fetch_chapter("the-big-action")))
self.assertEquals(1, len(call_log))
wf.reset()
self.assertEquals(states.PENDING, wf.state)
wf.run({})
self.assertEquals(1, len(j.logbook))
self.assertEquals(4, len(j.logbook.fetch_chapter("the-big-action")))
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
def testWorkJobLinearClean(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("the-big-action-job", {}, book_catalog, job_claimer)
self.assertEquals(states.UNCLAIMED, j.state)
j.claim("me")
self.assertEquals(states.CLAIMED, j.state)
self.assertEquals('me', j.owner)
def wf_state_change_listener(context, wf, old_state):
if wf.name in j.logbook:
return
j.logbook.add_chapter(wf.name)
def task_state_change_listener(context, state, wf, task, result=None):
metadata = None
chp = j.logbook.fetch_chapter(wf.name)
if state in (states.SUCCESS,):
metadata = {
'result': result,
}
page_name = "%s:%s" % (task.name, state)
chp.add_page(logbook.Page(page_name, metadata))
def task_result_fetcher(context, wf, task):
chp = j.logbook.fetch_chapter(wf.name)
results_page = "%s:%s" % (task.name, states.SUCCESS)
if results_page in chp:
page = chp.fetch_pages(results_page)[0]
return (True, page.metadata['result'])
return (False, None)
wf = lw.Workflow("the-big-action")
self.assertEquals(states.PENDING, wf.state)
call_log = []
def do_1(context, *args, **kwargs):
call_log.append(1)
def do_2(context, *args, **kwargs):
call_log.append(2)
wf.add(FunctorTask(do_1))
wf.add(FunctorTask(do_2))
wf.task_listeners.append(task_state_change_listener)
wf.listeners.append(wf_state_change_listener)
wf.result_fetcher = task_result_fetcher
wf.run({})
self.assertEquals(1, len(j.logbook))
self.assertEquals(4, len(j.logbook.fetch_chapter("the-big-action")))
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
def testPostRecvJob(self): def testPostRecvJob(self):
job_claimer = memory.MemoryClaimer() job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog() book_catalog = memory.MemoryCatalog()