Continue on getting ready for the memory impl. to be useful.

This commit is contained in:
Joshua Harlow 2013-05-10 19:19:05 -07:00
parent 950947b0f4
commit 394ca17981
12 changed files with 381 additions and 130 deletions

View File

@ -15,20 +15,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Useful to know when other tasks are being activated and finishing.
STARTING = 'STARTING'
COMPLETED = 'COMPLETED'
ERRORED = 'ERRORED'
class Failure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure."""
def __init__(self, task, name, workflow, exception):
self.task = task
self.name = name
self.workflow = workflow
self.exception = exception

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow import exceptions as exc
from taskflow import job
from taskflow import jobboard
from taskflow import logbook
LOG = logging.getLogger(__name__)
class MemoryJob(job.Job):
pass
class MemoryLogBook(logbook.LogBook):
def __init__(self, resource_uri):
super(MemoryLogBook, self).__init__(resource_uri)
self._entries = {}
def add_record(self, name, metadata):
if name in self._entries:
raise exc.RecordAlreadyExists()
self._entries[name] = metadata
def fetch_record(self, name):
if name not in self._entries:
raise exc.RecordNotFound()
return self._entries[name]
class MemoryJobBoard(jobboard.JobBoard):
pass

49
taskflow/catalog.py Normal file
View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Catalog(object):
"""A catalog can create, fetch, erase a logbook for a jobs"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def fetch(self, job):
"""Fetches a logbook for a job."""
raise NotImplementedError()
@abc.abstractmethod
def __contains__(self, job):
"""Checks if the given catalog has a logbook for a job."""
raise NotImplementedError()
@abc.abstractmethod
def create(self, job):
"""Creates a new logbook for a job."""
raise NotImplementedError()
@abc.abstractmethod
def erase(self, job):
"""Erases a existing logbook for a job."""
raise NotImplementedError()
def close(self):
"""Allows the catalog to free any resources that it has."""
pass

49
taskflow/exceptions.py Normal file
View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskException(Exception):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure."""
def __init__(self, task, name, workflow, cause=None):
super(TaskException, self).__init__()
self.task = task
self.name = name
self.workflow = workflow
self.cause = cause
class UnclaimableJobException(Exception):
"""Raised when a job can not be claimed."""
pass
class UnknownJobException(Exception):
"""Raised when a job board does not know about desired job."""
pass
class RecordAlreadyExists(Exception):
"""Raised when a logbook entry already exists."""
pass
class RecordNotFound(Exception):
"""Raised when a logbook entry can not be found."""
pass

View File

@ -19,48 +19,77 @@
import abc
import uuid
CLAIMED = 'claimed'
UNCLAIMED = 'unclaimed'
from taskflow import exceptions as exc
from taskflow import states
class JobClaimer(object):
"""A base class for objects that can attempt to claim a given claim a given
job, so that said job can be worked on."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def claim(self, job, owner):
"""This method will attempt to claim said job and must
either succeed at this or throw an exception signaling the job can not
be claimed."""
raise NotImplementedError()
class Job(object):
"""A job is connection to some set of work to be done by some agent. Basic
information is provided about said work to be able to attempt to
fullfill said work."""
__metaclass__ = abc.ABCMeta
def __init__(self, name, type, context):
def __init__(self, name, context, catalog, claimer):
self.name = name
# TBD - likely more details about this job
self.details = None
self.state = UNCLAIMED
self.owner = None
self.tracking_id = str(uuid.uuid4())
self.context = context
self.state = states.UNCLAIMED
self.owner = None
self.posted_on = []
self._catalog = catalog
self._claimer = claimer
self._logbook = None
self._id = str(uuid.uuid4().hex)
def uri(self):
return "%s://%s/%s" % (self.type, self.name,
self.tracking_id)
@property
def logbook(self):
if self._logbook is None:
if self in self._catalog:
self._logbook = self._catalog.fetch(self)
else:
self._logbook = self._catalog.create(self)
return self._logbook
@abc.abstractproperty
def type(self):
# Returns which type of job this is.
#
# For example, a 'run_instance' job, or a 'delete_instance' job could
# be possible types.
raise NotImplementedError()
@abc.abstractmethod
def claim(self, owner):
# This can be used to transition this job from unclaimed to claimed.
#
# This must be done in a way that likely uses some type of locking or
# ownership transfer so that only a single entity gets this job to work
# on. This will avoid multi-job ownership, which can lead to
# inconsistent state.
raise NotImplementedError()
""" This can be used to attempt transition this job from unclaimed
to claimed.
@abc.abstractmethod
def consume(self):
# This can be used to transition this job from active to finished.
#
# During said transition the job and any details of it may be removed
# from some backing storage (if applicable).
raise NotImplementedError()
This must be done in a way that likely uses some type of locking or
ownership transfer so that only a single entity gets this job to work
on. This will avoid multi-job ownership, which can lead to
inconsistent state."""
if self.state != states.UNCLAIMED:
raise exc.UnclaimableJobException("Unable to claim job when job is"
" in state %s" % (self.state))
self._claimer.claim(self, owner)
self.owner = owner
self._change_state(states.CLAIMED)
def _change_state(self, new_state):
self.state = new_state
# TODO(harlowja): update the logbook
def erase(self):
"""Erases any traces of this job from its associated resources."""
for b in self.posted_on:
b.erase(self)
self._catalog.erase(self)
self._logbook = None
@property
def tracking_id(self):
return "j-%s-%s" % (self.name, self._id)

View File

@ -29,30 +29,48 @@ class JobBoard(object):
@abc.abstractmethod
def post(self, job):
"""Posts a job to the underlying job board implementation
and returns a promise object which can be used to query the
state of said job."""
raise NotImplementedError()
def _notify_posted(self, job):
"""When a job is received, by whichever mechanism the underlying
implementation provides, the job should be given to said listeners
for them to know that a job has arrived."""
for i in self._listeners:
i.notify_posted(job)
def _notify_erased(self, job):
"""When a job is erased, by whichever mechanism the underlying
implementation provides, the job should be given to said listeners
for them to know that a job has been erased."""
for i in self._listeners:
i.notify_erased(job)
@abc.abstractmethod
def await(self, blocking=True):
def erase(self, job):
"""Erases the given job from this job board."""
for i in self._listeners:
i._notify_erased(job)
@abc.abstractmethod
def await(self, block=True, timeout=None):
"""Blocks the current [thread, greenthread, process] until a new job
has arrived."""
raise NotImplementedError()
def subscribe(self, listener):
self._listeners.append(listener)
"""Adds a new listener who will be notified on job updates/postings."""
if listener not in self._listeners:
self._listeners.append(listener)
def unsubscribe(self, listener):
"""Removes a given listener from notifications about job
updates/postings."""
if listener in self._listeners:
self._listeners.remove(listener)
def close(self):
"""Allows the job board provider to free any resources that it has."""
"""Allows the job board to free any resources that it has."""
pass
class ProxyJobBoard(JobBoard):
def post(self, context, job):
raise NotImplementedError()

View File

@ -17,15 +17,15 @@
# under the License.
import abc
import logging
"""Define APIs for the logbook providers."""
LOG = logging.getLogger(__name__)
from datetime import datetime
class RecordNotFound(Exception):
pass
class LogEntry(object):
def __init__(self, name, metadata=None):
self.created_on = datetime.utcnow()
self.name = name
self.metadata = metadata
class LogBook(object):
@ -34,20 +34,17 @@ class LogBook(object):
__metaclass__ = abc.ABCMeta
def __init__(self, resource_uri):
self.uri = resource_uri
@abc.abstractmethod
def add_record(self, name, metadata=None):
"""Atomically adds a new entry to the given logbook with the supplied
metadata (if any)."""
def add(self, entry):
"""Atomically adds a new entry to the given logbook."""
raise NotImplementedError()
@abc.abstractmethod
def fetch_record(self, name):
"""Fetchs a record with the given name and returns any metadata about
said record."""
raise NotImplementedError()
def search_by_name(self, name):
"""Yields entries with the given name. The order will be in the same
order that they were added."""
for e in self:
if e.name == name:
yield e
@abc.abstractmethod
def __contains__(self, name):
@ -55,38 +52,17 @@ class LogBook(object):
logbook."""
raise NotImplementedError()
@abc.abstractmethod
def mark(self, name, metadata, merge_functor=None):
"""Marks the given logbook entry (which must exist) with the given
metadata, if said entry already exists then the provided merge functor
or a default function, will be activated to merge the existing metadata
with the supplied metadata."""
raise NotImplementedError()
@abc.abstractmethod
def __iter__(self):
"""Iterates over all names and metadata and provides back both of these
via a (name, metadata) tuple. The order will be in the same order that
"""Iterates over all entries. The order will be in the same order that
they were added."""
raise NotImplementedError()
def close(self):
"""Allows the job board provider to free any resources that it has."""
pass
class DBLogBook(LogBook):
"""Base class for a logbook impl that uses a backing database."""
def __init__(self, context, job):
super(DBLogBook, self).__init__(job.uri)
self.context = context
self.job = job
@abc.abstractmethod
def erase(self, name):
"""Erases any entries with given name."""
raise NotImplementedError()
def close(self):
# Free the db connection
"""Allows the logbook to free any resources that it has."""
pass
class MemoryLogBook(LogBook):
pass

View File

@ -25,6 +25,8 @@ if not hasattr(dict_provider, 'OrderedDict'):
import ordereddict as dict_provider
from taskflow.openstack.common import excutils
from taskflow import exceptions as exc
from taskflow import states
LOG = logging.getLogger(__name__)
@ -51,8 +53,7 @@ class Workflow(object):
# to call the parents...
self.parents = parents
# This should be a functor that returns whether a given task has
# already ran by returning the return value of the task or returning
# 'None' if the task has not ran.
# already ran by returning a pair of (has_result, result).
#
# NOTE(harlowja): This allows for resumption by skipping tasks which
# have already occurred. The previous return value is needed due to
@ -65,6 +66,8 @@ class Workflow(object):
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.listeners = []
# The state of this flow.
self.state = states.PENDING
def __setitem__(self, name, task):
self.tasks[name] = task
@ -73,47 +76,52 @@ class Workflow(object):
return self.results[name]
def run(self, context, *args, **kwargs):
self.state = states.STARTED
for (name, task) in self.tasks.iteritems():
try:
self._on_task_start(context, task, name)
# See if we have already ran this...
result = None
has_result = False
if self.result_fetcher:
result = self.result_fetcher(context, name, self)
if result is None:
(has_result, result) = self.result_fetcher(context,
name, self)
if not has_result:
result = task.apply(context, *args, **kwargs)
# Keep a pristine copy of the result in the results table
# so that if said result is altered by other further states
# the one here will not be.
self.results[name] = copy.deepcopy(result)
self._on_task_finish(context, task, name, result)
self.state = states.SUCCESS
except Exception as ex:
with excutils.save_and_reraise_exception():
self.state = states.FAILURE
try:
self._on_task_error(context, task, name)
except Exception:
LOG.exception(_("Dropping exception catched when"
" notifying about existing task"
" exception."))
LOG.exception("Dropping exception catched when"
" notifying about existing task"
" exception.")
self.state = states.REVERTING
self.rollback(context,
workflow.Failure(task, name, self, ex))
exc.TaskException(task, name, self, ex))
def _on_task_error(self, context, task, name):
# Notify any listeners that the task has errored.
for i in self.listeners:
i.notify(context, workflow.ERRORED, self, task, name)
i.notify(context, states.FAILURE, self, task, name)
def _on_task_start(self, context, task, name):
# Notify any listeners that we are about to start the given task.
for i in self.listeners:
i.notify(context, workflow.STARTING, self, task, name)
i.notify(context, states.STARTED, self, task, name)
def _on_task_finish(self, context, task, name, result):
# Notify any listeners that we are finishing the given task.
self.reversions.append((name, task))
for i in self.listeners:
i.notify(context, workflow.COMPLETED, self, task,
name, result=result)
i.notify(context, states.SUCCESS, self, task, name, result=result)
def rollback(self, context, cause):
for (i, (name, task)) in enumerate(reversed(self.reversions)):
@ -122,8 +130,8 @@ class Workflow(object):
except Exception:
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
# chain validation due to Y exception.
msg = _("Failed rolling back stage %(index)s (%(name)s)"
" of workflow %(workflow)s, due to inner exception.")
msg = ("Failed rolling back stage %(index)s (%(name)s)"
" of workflow %(workflow)s, due to inner exception.")
LOG.warn(msg % {'index': (i + 1), 'stage': name,
'workflow': self.name})
if not self.tolerant:
@ -135,4 +143,3 @@ class Workflow(object):
# Rollback any parents workflows if they exist...
for p in self.parents:
p.rollback(context, cause)

49
taskflow/promise.py Normal file
View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Promise(object):
"""This is an abstraction of a promise to perform some type of job which
can be returned to a user/function... for later lookup on the progress of
the underlying job."""
__metaclass__ = abc.ABCMeta
def __init__(self, job):
self._job = job
@property
def tracking_id(self):
return "p-%s" % (self._job.tracking_id)
def await(self, timeout=None):
"""Attempts to wait until the job fails or finishess."""
raise NotImplementedError()
@abc.abstractproperty
def state(self):
"""Returns the state of the underlying job."""
raise NotImplementedError()
@abc.abstractproperty
def owner(self):
"""Returns the owner of the job, or None if no owner
has been assigned."""
raise NotImplementedError()

View File

@ -16,15 +16,22 @@
# License for the specific language governing permissions and limitations
# under the License.
# Job states.
CLAIMED = 'CLAIMED'
FAILURE = 'FAILURE'
PENDING = 'PENDING'
REVERTING = 'REVERTING'
SUCCESS = 'SUCCESS'
UNCLAIMED = 'UNCLAIMED'
class Reservation(object):
"""This is an abstraction of a promise to complete some type of job which
can be returned to a user for later lookup on the progress of said
promise"""
# Flow states.
FAILURE = FAILURE
PENDING = PENDING
REVERTING = REVERTING
STARTED = 'STARTED'
SUCCESS = SUCCESS
def __init__(self, for_who, id):
self.for_who = for_who
self.id = id
def __str__(self):
return "Reservation: '%s' for '%s'" % (self.id, self.for_who)
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS

17
taskflow/utils.py Normal file
View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.