diff --git a/taskflow/__init__.py b/taskflow/__init__.py index 355c6409b..830dd2e7c 100644 --- a/taskflow/__init__.py +++ b/taskflow/__init__.py @@ -15,20 +15,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - - -# Useful to know when other tasks are being activated and finishing. -STARTING = 'STARTING' -COMPLETED = 'COMPLETED' -ERRORED = 'ERRORED' - - -class Failure(object): - """When a task failure occurs the following object will be given to revert - and can be used to interrogate what caused the failure.""" - - def __init__(self, task, name, workflow, exception): - self.task = task - self.name = name - self.workflow = workflow - self.exception = exception diff --git a/taskflow/backends/__init__.py b/taskflow/backends/__init__.py new file mode 100644 index 000000000..830dd2e7c --- /dev/null +++ b/taskflow/backends/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py new file mode 100644 index 000000000..a5e29135a --- /dev/null +++ b/taskflow/backends/memory.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from taskflow import exceptions as exc +from taskflow import job +from taskflow import jobboard +from taskflow import logbook + +LOG = logging.getLogger(__name__) + + +class MemoryJob(job.Job): + pass + + +class MemoryLogBook(logbook.LogBook): + def __init__(self, resource_uri): + super(MemoryLogBook, self).__init__(resource_uri) + self._entries = {} + + def add_record(self, name, metadata): + if name in self._entries: + raise exc.RecordAlreadyExists() + self._entries[name] = metadata + + def fetch_record(self, name): + if name not in self._entries: + raise exc.RecordNotFound() + return self._entries[name] + + +class MemoryJobBoard(jobboard.JobBoard): + pass diff --git a/taskflow/catalog.py b/taskflow/catalog.py new file mode 100644 index 000000000..217b47634 --- /dev/null +++ b/taskflow/catalog.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class Catalog(object): + """A catalog can create, fetch, erase a logbook for a jobs""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def fetch(self, job): + """Fetches a logbook for a job.""" + raise NotImplementedError() + + @abc.abstractmethod + def __contains__(self, job): + """Checks if the given catalog has a logbook for a job.""" + raise NotImplementedError() + + @abc.abstractmethod + def create(self, job): + """Creates a new logbook for a job.""" + raise NotImplementedError() + + @abc.abstractmethod + def erase(self, job): + """Erases a existing logbook for a job.""" + raise NotImplementedError() + + def close(self): + """Allows the catalog to free any resources that it has.""" + pass diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py new file mode 100644 index 000000000..f58534f11 --- /dev/null +++ b/taskflow/exceptions.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class TaskException(Exception): + """When a task failure occurs the following object will be given to revert + and can be used to interrogate what caused the failure.""" + + def __init__(self, task, name, workflow, cause=None): + super(TaskException, self).__init__() + self.task = task + self.name = name + self.workflow = workflow + self.cause = cause + + +class UnclaimableJobException(Exception): + """Raised when a job can not be claimed.""" + pass + + +class UnknownJobException(Exception): + """Raised when a job board does not know about desired job.""" + pass + + +class RecordAlreadyExists(Exception): + """Raised when a logbook entry already exists.""" + pass + + +class RecordNotFound(Exception): + """Raised when a logbook entry can not be found.""" + pass diff --git a/taskflow/job.py b/taskflow/job.py index 27e62b48f..ed1d4c1f2 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -19,48 +19,77 @@ import abc import uuid -CLAIMED = 'claimed' -UNCLAIMED = 'unclaimed' +from taskflow import exceptions as exc +from taskflow import states + + +class JobClaimer(object): + """A base class for objects that can attempt to claim a given claim a given + job, so that said job can be worked on.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def claim(self, job, owner): + """This method will attempt to claim said job and must + either succeed at this or throw an exception signaling the job can not + be claimed.""" + raise NotImplementedError() class Job(object): + """A job is connection to some set of work to be done by some agent. Basic + information is provided about said work to be able to attempt to + fullfill said work.""" + __metaclass__ = abc.ABCMeta - def __init__(self, name, type, context): + def __init__(self, name, context, catalog, claimer): self.name = name - # TBD - likely more details about this job - self.details = None - self.state = UNCLAIMED - self.owner = None - self.tracking_id = str(uuid.uuid4()) self.context = context + self.state = states.UNCLAIMED + self.owner = None + self.posted_on = [] + self._catalog = catalog + self._claimer = claimer + self._logbook = None + self._id = str(uuid.uuid4().hex) - def uri(self): - return "%s://%s/%s" % (self.type, self.name, - self.tracking_id) + @property + def logbook(self): + if self._logbook is None: + if self in self._catalog: + self._logbook = self._catalog.fetch(self) + else: + self._logbook = self._catalog.create(self) + return self._logbook - @abc.abstractproperty - def type(self): - # Returns which type of job this is. - # - # For example, a 'run_instance' job, or a 'delete_instance' job could - # be possible types. - raise NotImplementedError() - - @abc.abstractmethod def claim(self, owner): - # This can be used to transition this job from unclaimed to claimed. - # - # This must be done in a way that likely uses some type of locking or - # ownership transfer so that only a single entity gets this job to work - # on. This will avoid multi-job ownership, which can lead to - # inconsistent state. - raise NotImplementedError() + """ This can be used to attempt transition this job from unclaimed + to claimed. - @abc.abstractmethod - def consume(self): - # This can be used to transition this job from active to finished. - # - # During said transition the job and any details of it may be removed - # from some backing storage (if applicable). - raise NotImplementedError() + This must be done in a way that likely uses some type of locking or + ownership transfer so that only a single entity gets this job to work + on. This will avoid multi-job ownership, which can lead to + inconsistent state.""" + if self.state != states.UNCLAIMED: + raise exc.UnclaimableJobException("Unable to claim job when job is" + " in state %s" % (self.state)) + self._claimer.claim(self, owner) + self.owner = owner + self._change_state(states.CLAIMED) + + def _change_state(self, new_state): + self.state = new_state + # TODO(harlowja): update the logbook + + def erase(self): + """Erases any traces of this job from its associated resources.""" + for b in self.posted_on: + b.erase(self) + self._catalog.erase(self) + self._logbook = None + + @property + def tracking_id(self): + return "j-%s-%s" % (self.name, self._id) diff --git a/taskflow/jobboard.py b/taskflow/jobboard.py index 13541d66d..e9fac22b9 100644 --- a/taskflow/jobboard.py +++ b/taskflow/jobboard.py @@ -29,30 +29,48 @@ class JobBoard(object): @abc.abstractmethod def post(self, job): + """Posts a job to the underlying job board implementation + and returns a promise object which can be used to query the + state of said job.""" raise NotImplementedError() def _notify_posted(self, job): + """When a job is received, by whichever mechanism the underlying + implementation provides, the job should be given to said listeners + for them to know that a job has arrived.""" for i in self._listeners: i.notify_posted(job) + def _notify_erased(self, job): + """When a job is erased, by whichever mechanism the underlying + implementation provides, the job should be given to said listeners + for them to know that a job has been erased.""" + for i in self._listeners: + i.notify_erased(job) + @abc.abstractmethod - def await(self, blocking=True): + def erase(self, job): + """Erases the given job from this job board.""" + for i in self._listeners: + i._notify_erased(job) + + @abc.abstractmethod + def await(self, block=True, timeout=None): + """Blocks the current [thread, greenthread, process] until a new job + has arrived.""" raise NotImplementedError() def subscribe(self, listener): - self._listeners.append(listener) + """Adds a new listener who will be notified on job updates/postings.""" + if listener not in self._listeners: + self._listeners.append(listener) def unsubscribe(self, listener): + """Removes a given listener from notifications about job + updates/postings.""" if listener in self._listeners: self._listeners.remove(listener) def close(self): - """Allows the job board provider to free any resources that it has.""" + """Allows the job board to free any resources that it has.""" pass - - -class ProxyJobBoard(JobBoard): - def post(self, context, job): - - raise NotImplementedError() - diff --git a/taskflow/logbook.py b/taskflow/logbook.py index b9161f8fd..e74ca40c4 100644 --- a/taskflow/logbook.py +++ b/taskflow/logbook.py @@ -17,15 +17,15 @@ # under the License. import abc -import logging -"""Define APIs for the logbook providers.""" - -LOG = logging.getLogger(__name__) +from datetime import datetime -class RecordNotFound(Exception): - pass +class LogEntry(object): + def __init__(self, name, metadata=None): + self.created_on = datetime.utcnow() + self.name = name + self.metadata = metadata class LogBook(object): @@ -34,20 +34,17 @@ class LogBook(object): __metaclass__ = abc.ABCMeta - def __init__(self, resource_uri): - self.uri = resource_uri - @abc.abstractmethod - def add_record(self, name, metadata=None): - """Atomically adds a new entry to the given logbook with the supplied - metadata (if any).""" + def add(self, entry): + """Atomically adds a new entry to the given logbook.""" raise NotImplementedError() - @abc.abstractmethod - def fetch_record(self, name): - """Fetchs a record with the given name and returns any metadata about - said record.""" - raise NotImplementedError() + def search_by_name(self, name): + """Yields entries with the given name. The order will be in the same + order that they were added.""" + for e in self: + if e.name == name: + yield e @abc.abstractmethod def __contains__(self, name): @@ -55,38 +52,17 @@ class LogBook(object): logbook.""" raise NotImplementedError() - @abc.abstractmethod - def mark(self, name, metadata, merge_functor=None): - """Marks the given logbook entry (which must exist) with the given - metadata, if said entry already exists then the provided merge functor - or a default function, will be activated to merge the existing metadata - with the supplied metadata.""" - raise NotImplementedError() - @abc.abstractmethod def __iter__(self): - """Iterates over all names and metadata and provides back both of these - via a (name, metadata) tuple. The order will be in the same order that + """Iterates over all entries. The order will be in the same order that they were added.""" raise NotImplementedError() - def close(self): - """Allows the job board provider to free any resources that it has.""" - pass - - -class DBLogBook(LogBook): - """Base class for a logbook impl that uses a backing database.""" - - def __init__(self, context, job): - super(DBLogBook, self).__init__(job.uri) - self.context = context - self.job = job + @abc.abstractmethod + def erase(self, name): + """Erases any entries with given name.""" + raise NotImplementedError() def close(self): - # Free the db connection + """Allows the logbook to free any resources that it has.""" pass - - -class MemoryLogBook(LogBook): - pass \ No newline at end of file diff --git a/taskflow/patterns/linear_workflow.py b/taskflow/patterns/linear_workflow.py index bd6e2ae5e..98de8febe 100644 --- a/taskflow/patterns/linear_workflow.py +++ b/taskflow/patterns/linear_workflow.py @@ -25,6 +25,8 @@ if not hasattr(dict_provider, 'OrderedDict'): import ordereddict as dict_provider from taskflow.openstack.common import excutils +from taskflow import exceptions as exc +from taskflow import states LOG = logging.getLogger(__name__) @@ -51,8 +53,7 @@ class Workflow(object): # to call the parents... self.parents = parents # This should be a functor that returns whether a given task has - # already ran by returning the return value of the task or returning - # 'None' if the task has not ran. + # already ran by returning a pair of (has_result, result). # # NOTE(harlowja): This allows for resumption by skipping tasks which # have already occurred. The previous return value is needed due to @@ -65,6 +66,8 @@ class Workflow(object): # store the result of a task in some persistent or semi-persistent # storage backend). self.listeners = [] + # The state of this flow. + self.state = states.PENDING def __setitem__(self, name, task): self.tasks[name] = task @@ -73,47 +76,52 @@ class Workflow(object): return self.results[name] def run(self, context, *args, **kwargs): + self.state = states.STARTED for (name, task) in self.tasks.iteritems(): try: self._on_task_start(context, task, name) # See if we have already ran this... result = None + has_result = False if self.result_fetcher: - result = self.result_fetcher(context, name, self) - if result is None: + (has_result, result) = self.result_fetcher(context, + name, self) + if not has_result: result = task.apply(context, *args, **kwargs) # Keep a pristine copy of the result in the results table # so that if said result is altered by other further states # the one here will not be. self.results[name] = copy.deepcopy(result) self._on_task_finish(context, task, name, result) + self.state = states.SUCCESS except Exception as ex: with excutils.save_and_reraise_exception(): + self.state = states.FAILURE try: self._on_task_error(context, task, name) except Exception: - LOG.exception(_("Dropping exception catched when" - " notifying about existing task" - " exception.")) + LOG.exception("Dropping exception catched when" + " notifying about existing task" + " exception.") + self.state = states.REVERTING self.rollback(context, - workflow.Failure(task, name, self, ex)) + exc.TaskException(task, name, self, ex)) def _on_task_error(self, context, task, name): # Notify any listeners that the task has errored. for i in self.listeners: - i.notify(context, workflow.ERRORED, self, task, name) + i.notify(context, states.FAILURE, self, task, name) def _on_task_start(self, context, task, name): # Notify any listeners that we are about to start the given task. for i in self.listeners: - i.notify(context, workflow.STARTING, self, task, name) + i.notify(context, states.STARTED, self, task, name) def _on_task_finish(self, context, task, name, result): # Notify any listeners that we are finishing the given task. self.reversions.append((name, task)) for i in self.listeners: - i.notify(context, workflow.COMPLETED, self, task, - name, result=result) + i.notify(context, states.SUCCESS, self, task, name, result=result) def rollback(self, context, cause): for (i, (name, task)) in enumerate(reversed(self.reversions)): @@ -122,8 +130,8 @@ class Workflow(object): except Exception: # Ex: WARN: Failed rolling back stage 1 (validate_request) of # chain validation due to Y exception. - msg = _("Failed rolling back stage %(index)s (%(name)s)" - " of workflow %(workflow)s, due to inner exception.") + msg = ("Failed rolling back stage %(index)s (%(name)s)" + " of workflow %(workflow)s, due to inner exception.") LOG.warn(msg % {'index': (i + 1), 'stage': name, 'workflow': self.name}) if not self.tolerant: @@ -135,4 +143,3 @@ class Workflow(object): # Rollback any parents workflows if they exist... for p in self.parents: p.rollback(context, cause) - diff --git a/taskflow/promise.py b/taskflow/promise.py new file mode 100644 index 000000000..dfce9c75a --- /dev/null +++ b/taskflow/promise.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class Promise(object): + """This is an abstraction of a promise to perform some type of job which + can be returned to a user/function... for later lookup on the progress of + the underlying job.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, job): + self._job = job + + @property + def tracking_id(self): + return "p-%s" % (self._job.tracking_id) + + def await(self, timeout=None): + """Attempts to wait until the job fails or finishess.""" + raise NotImplementedError() + + @abc.abstractproperty + def state(self): + """Returns the state of the underlying job.""" + raise NotImplementedError() + + @abc.abstractproperty + def owner(self): + """Returns the owner of the job, or None if no owner + has been assigned.""" + raise NotImplementedError() diff --git a/taskflow/reservation.py b/taskflow/states.py similarity index 65% rename from taskflow/reservation.py rename to taskflow/states.py index bb0a27944..c34084a51 100644 --- a/taskflow/reservation.py +++ b/taskflow/states.py @@ -16,15 +16,22 @@ # License for the specific language governing permissions and limitations # under the License. +# Job states. +CLAIMED = 'CLAIMED' +FAILURE = 'FAILURE' +PENDING = 'PENDING' +REVERTING = 'REVERTING' +SUCCESS = 'SUCCESS' +UNCLAIMED = 'UNCLAIMED' -class Reservation(object): - """This is an abstraction of a promise to complete some type of job which - can be returned to a user for later lookup on the progress of said - promise""" +# Flow states. +FAILURE = FAILURE +PENDING = PENDING +REVERTING = REVERTING +STARTED = 'STARTED' +SUCCESS = SUCCESS - def __init__(self, for_who, id): - self.for_who = for_who - self.id = id - - def __str__(self): - return "Reservation: '%s' for '%s'" % (self.id, self.for_who) +# Task states. +FAILURE = FAILURE +STARTED = STARTED +SUCCESS = SUCCESS diff --git a/taskflow/utils.py b/taskflow/utils.py new file mode 100644 index 000000000..830dd2e7c --- /dev/null +++ b/taskflow/utils.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.