Added Memory Persistence API and Generic Datatypes

Added a generic persistence API for JobBoards,
Jobs, LogBooks, Workflows, FlowDetails, Tasks,
and TaskDetails. The currently implemented
backends are in-memory. Also implemented generic
return types (listed above) for the generic API.

Change-Id: I6d09860ee08a900faf2c213a230429bf9e0dec01
This commit is contained in:
kchenweijie
2013-08-02 09:18:32 -05:00
parent be5762a333
commit 4cee45e2e3
34 changed files with 2143 additions and 2165 deletions

View File

@@ -3,6 +3,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain

493
taskflow/backends/api.py Normal file
View File

@@ -0,0 +1,493 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Backend persistence API"""
from oslo.config import cfg
from taskflow.common import config
from taskflow import utils
db_opts = [
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db')]
mem_opts = [
cfg.StrOpt('mem_backend',
default='memory',
help='The backend to use for in-memory')]
CONF = cfg.CONF
CONF.register_opts(db_opts)
CONF.register_opts(mem_opts)
IMPL = utils.LazyPluggable('mem_backend',
memory='taskflow.backends.memory.api',
sqlalchemy='taskflow.backends.sqlalchemy.api')
def configure(pivot='mem_backend'):
IMPL.set_pivot(pivot)
if pivot == 'db_backend':
global SQL_CONNECTION
global SQL_IDLE_TIMEOUT
config.register_db_opts()
SQL_CONNECTION = cfg.CONF.sql_connection
SQL_IDLE_TIMEOUT = cfg.CONF.sql_idle_timeout
"""
LOGBOOK
"""
def logbook_create(name, lb_id=None):
"""Create a logbook
This method creates a logbook representation in the currently selected
backend to be persisted.
PARAMETERS:
name: str
Specifies the name of the logbook represented.
lb_id: str
Specifies the uuid of the logbook represented. If this element is
left as None, the table will generate a uuid for the logbook. This
field must be unique for all logbooks stored in the backend. In the
event that a logbook with this uuid already exists, the backend
will raise an exception.
"""
return IMPL.logbook_create(name, lb_id)
def logbook_destroy(lb_id):
"""Destroy a logbook
This method will destroy a logbook representation with a uuid matching
lb_id from the currently selected backend.
PARAMETERS:
lb_id: str
Specifies the uuid of the logbook representation to be destroyed
from the currently selected backend. If a logbook with matching
uuid is not present, the backend will raise an exception.
"""
return IMPL.logbook_destroy(lb_id)
def logbook_save(lb):
"""Save a logbook
This method creates a logbook representation in the currently selected
backend if one does not already exists and updates it with attributes from
the generic LogBook passed in as a parameter.
PARAMETERS:
lb: taskflow.generics.LogBook
The generic type LogBook to be persisted in the backend. If a
representation does not yet exist, one will be created. The
representation will be updated to match the attributes of this
object.
"""
return IMPL.logbook_save(lb)
def logbook_delete(lb):
"""Delete a logbook
This method destroys a logbook representation in the curretly selected
backend.
PARAMETERS:
lb: taskflow.generics.LogBook
The generic type LogBook whose representation is to be destroyed
from the backend. If a representation does not exist, the backend
will raise an exception.
"""
return IMPL.logbook_delete(lb)
def logbook_get(lb_id):
"""Get a logbook
This method returns a generic type LogBook based on its representation in
the currently selected backend.
PARAMETERS:
lb_id: str
Specifies the uuid of the logbook representation to be used. If a
logbook with this uuid does not exist, the backend will raise an
exception.
RETURNS:
a generic type LogBook that reflects what is stored by a logbook
representation in the backend.
"""
return IMPL.logbook_get(lb_id)
def logbook_add_flow_detail(lb_id, fd_id):
"""Add a flowdetail
This method adds a flowdetail in the backend to the list of flowdetails
contained by the specified logbook representation.
PARAMETERS:
lb_id: str
Specifies the uuid of the logbook representation to which the
flowdetail will be added. If a logbook with this uuid does not
exist, the backend will raise an exception.
fd_id: str
Specifies the uuid of the flowdetail representation to be added
to the specified logbook representation. If a flowdetail with this
uuid does not exist, the backend will raise an exception.
"""
return IMPL.logbook_add_flow_detail(lb_id, fd_id)
def logbook_remove_flowdetail(lb_id, fd_id):
"""Remove a flowdetail
This method removes a flowdetail from the list of flowdetails contained by
the specified logbook representation.
PARAMETERS:
lb_id: str
Specifies the uuid of the logbook representation from which the
flowdetail will be removed. If a logbook with this uuid does not
exist, the backend will raise an exception.
fd_id: str
Specifies the uuid of the flowdetail representation to be removed
from the specified logbook representation.
"""
return IMPL.logbook_remove_flowdetail(lb_id, fd_id)
def logbook_get_ids_names():
"""Get the ids and names of all logbooks
This method returns a dict of uuids and names of all logbook
representations stored in the backend.
RETURNS:
a dict of uuids and names of all logbook representations
"""
return IMPL.logbook_get_ids_names()
"""
FLOWDETAIL
"""
def flowdetail_create(name, wf, fd_id=None):
"""Create a flowdetail
This method creates a flowdetail representation in the currently selected
backend to be persisted.
PARAMETERS:
name: str
Specifies the name of the flowdetail represented.
wf: taskflow.generics.Flow
The workflow object this flowdetail is to represent.
fd_id: str
Specifies the uuid of the flowdetail represented. If this element
is left as None, the table will generate a uuid for the flowdetail.
This field must be unique for all flowdetails stored in the
backend. In the event that a flowdetail with this uuid already
exists, the backend will raise an exception.
"""
return IMPL.flowdetail_create(name, wf, fd_id)
def flowdetail_destroy(fd_id):
"""Destroy a flowdetail
This method will destroy a flowdetail representation with a uuid matching
fd_id from the currently selected backend.
PARAMETERS:
fd_id: str
Specifices the uuid of the flowdetail representation to be
destroyed from the currently selected backend. If a flowdetail with
matching uuid is not present, the backend will raise an exception.
"""
return IMPL.flowdetail_destroy(fd_id)
def flowdetail_save(fd):
"""Save a flowdetail
This method creates a flowdetail representation in the currently selected
backend if one does not already exist and updates it with attributes from
the generic FlowDetail passed in as a parameter.
PARAMETERS:
fd: taskflow.generics.FlowDetail
The generic type FlowDetail to be persisted in the backend. If a
representation does not yet exist, one will be created. The
representation will be updated to match the attributes of this
object.
"""
return IMPL.flowdetail_save(fd)
def flowdetail_delete(fd):
"""Delete a flowdetail
This method destroys a flowdetail representation in the currently selected
backend.
PARAMETERS:
fd: taskflow.generics.FlowDetail
The generic type FlowDetail whose representation is to be destroyed
from the backend. If a representation does not exist, the backend
will raise an exception.
"""
return IMPL.flowdetail_delete(fd)
def flowdetail_get(fd_id):
"""Get a flowdetail
This method returns a generic type FlowDetail based on its representation
in the currently selected backend.
PARAMETERS:
fd_id: str
Specifies the uuid of the flowdetail representation to be used. If
a flowdetail with this uuid does not exist, the backend will raise
an exception.
RETURNS:
a generic type FlowDetail that reflects what is stored by a flowdetail
representation in the backend.
"""
return IMPL.flowdetail_get(fd_id)
def flowdetail_add_task_detail(fd_id, td_id):
"""Add a taskdetail
This method adds a taskdetail in the backend to the list of taskdetails
contained by the specified flowdetail representation.
PARAMETERS:
fd_id: str
Specifies the uuid of the flowdetail representation to which the
taskdetail will be added. If a flowdetail with this uuid does not
exist, the backend will raise an exception.
td_id: str
Specifies the uuid of the taskdetail representation to be added
to the specified flowdetail representation. If a flowdetail with
this uuid does not exist, the backend will raise an exception.
"""
return IMPL.flowdetail_add_task_detail(fd_id, td_id)
def flowdetail_remove_taskdetail(fd_id, td_id):
"""Remove a taskdetail
This method removes a taskdetail from the list of taskdetails contained by
the specified flowdetail representation.
PARAMETERS:
fd_id: str
Specifies the uuid of the flowdetail representation from which the
taskdetail will be removed. If a flowdetail with this uuid does not
exist, the backend will raise an exception.
td_id: str
Specifies the uuid of the taskdetail representation to be removed
from the specified flowdetail representation.
"""
return IMPL.flowdetail_remove_taskdetail(fd_id, td_id)
def flowdetail_get_ids_names():
"""Get the ids and names of all flowdetails
This method returns a dict of the uuids and names of all flowdetail
representations stored in the backend.
RETURNS:
a dict of uuids and names of all flowdetail representations
"""
return IMPL.flowdetail_get_ids_names()
"""
TASKDETAIL
"""
def taskdetail_create(name, tsk, td_id=None):
"""Create a taskdetail
This method creates a taskdetail representation in the current selected
backend to be persisted.
PARAMETERS:
name: str
Specifies the name of the taskdetail represented.
tsk: taskflow.generics.Task
The task object this taskdetail is to represent.
td_id: str
Specifies the uuid of the taskdetail represented. If this element
is left as None, the table will generate a uuid for the taskdetail.
This field must be unique for all taskdetails stored in the
backend. In the event that a taskdetail with this uuid already
exists, the backend will raise an exception.
"""
return IMPL.taskdetail_create(name, tsk, td_id)
def taskdetail_destroy(td_id):
"""Destroy a taskdetail
This method will destroy a taskdetail representation with a uuid matching
td_id from the currently selected backend.
PARAMETERS:
td_id: str
Specifies the uuid of the taskdetail representation to be
destroyed from the currently selected backend. If a taskdetail with
matching uuid is not present, the backend will raise an exception.
"""
return IMPL.taskdetail_destroy(td_id)
def taskdetail_save(td):
"""Save a taskdetail
This method creates a taskdetail representation in the currently selected
backend if one does not already exist and updates it with attributes from
the generic TaskDetail passed in as a parameter.
PARAMETERS:
td: taskflow.generics.TaskDetail
The generic type TaskDetail to be persisted in the backend. If a
representation does not yet exist, one will be created. The
representation will be updated to match the attributes of this
object.
"""
return IMPL.taskdetail_save(td)
def taskdetail_delete(td):
"""Delete a taskdetail
This method destroys a taskdetail representation in the currently selected
backend.
PARAMETERS:
td: taskdetail.generics.TaskDetail
The generic type TaskDetail whose representation is to be destroyed
from the backend. If a representation does not exist, the backend
will raise an exception.
"""
return IMPL.taskdetail_delete(td)
def taskdetail_get(td_id):
"""Get a taskdetail
This method returns a generic type TaskDetail based on its representation
in the currently selected backend.
PARAMETERS:
td_id: str
Specifies the uuid of the taskdetail representation to be used. If
a taskdetail with this uuid does not exist, the backend will raise
an exception.
RETURNS:
a generic type TaskDetail that reflects what is stored by a taskdetail
representation in the backend.
"""
return IMPL.taskdetail_get(td_id)
def taskdetail_update(td_id, values):
"""Update a taskdetail
This method updates the attributes of a taskdetail representation in the
currently selected backend.
PARAMETERS:
td_id: str
Specifies the uuid of the taskdetail representation to be updated.
If a taskdetail with this uuid does not exist, the backend will
raise an execption.
values: dict
Specifies the values to be updated and the values to which they are
to be updated.
"""
return IMPL.taskdetail_update(td_id, values)
def taskdetail_get_ids_names():
"""Gets the ids and names of all taskdetails
This method returns a dict of the uuids and names of all taskdetail
representations stored in the backend.
RETURNS:
a dict of uuids and names of all taskdetail representations
"""
return IMPL.taskdetail_get_ids_names()

View File

@@ -1,243 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import functools
import logging
import threading
import weakref
from taskflow import catalog
from taskflow import exceptions as exc
from taskflow import job as t_job
from taskflow import jobboard
from taskflow import logbook
from taskflow import states
from taskflow import utils
LOG = logging.getLogger(__name__)
def check_not_closed(meth):
@functools.wraps(meth)
def check(self, *args, **kwargs):
if self._closed: # pylint: disable=W0212
raise exc.ClosedException("Unable to call %s on closed object" %
(meth.__name__))
return meth(self, *args, **kwargs)
return check
class MemoryClaimer(t_job.Claimer):
def claim(self, job, owner):
job.owner = owner
def unclaim(self, job, owner):
job.owner = None
class MemoryCatalog(catalog.Catalog):
def __init__(self):
super(MemoryCatalog, self).__init__()
self._catalogs = []
self._closed = False
self._lock = threading.RLock()
def __len__(self):
with self._lock:
return len(self._catalogs)
def __contains__(self, job):
with self._lock:
for (j, _b) in self._catalogs:
if j == job:
return True
return False
def close(self):
self._closed = True
@check_not_closed
def create_or_fetch(self, job):
with self._lock:
for (j, b) in self._catalogs:
if j == job:
return b
b = MemoryLogBook()
self._catalogs.append((job, b))
return b
@check_not_closed
def erase(self, job):
with self._lock:
self._catalogs = [(j, b) for (j, b) in self._catalogs if j != job]
class MemoryFlowDetail(logbook.FlowDetail):
def __init__(self, book, name, task_cls=logbook.TaskDetail):
super(MemoryFlowDetail, self).__init__(book, name)
self._tasks = {}
self._task_cls = task_cls
def __iter__(self):
for t in self._tasks.values():
yield t
def __contains__(self, task_name):
return task_name in self._tasks
def __getitem__(self, task_name):
return self._tasks[task_name]
def __len__(self):
return len(self._tasks)
def add_task(self, task_name, metadata=None):
task_details = self._task_cls(task_name, metadata)
self._tasks[task_name] = task_details
return task_details
def __delitem__(self, task_name):
self._tasks.pop(task_name, None)
return None
class MemoryLogBook(logbook.LogBook):
def __init__(self):
super(MemoryLogBook, self).__init__()
self._flows = []
self._flow_names = set()
self._closed = False
@check_not_closed
def add_flow(self, flow_name):
if flow_name in self._flow_names:
raise exc.AlreadyExists()
f = MemoryFlowDetail(self, flow_name)
self._flows.append(f)
self._flow_names.add(flow_name)
return f
@check_not_closed
def __getitem__(self, flow_name):
if flow_name not in self._flow_names:
raise exc.NotFound()
for w in self._flows:
if w.name == flow_name:
return w
@check_not_closed
def __iter__(self):
for w in self._flows:
yield w
def close(self):
self._closed = True
@check_not_closed
def __contains__(self, flow_name):
return flow_name in self._flow_names
def __delitem__(self, flow_name):
w = self[flow_name]
self._flow_names.remove(flow_name)
self._flows.remove(w)
def __len__(self):
return len(self._flows)
class MemoryJobBoard(jobboard.JobBoard):
def __init__(self):
super(MemoryJobBoard, self).__init__()
self._event = threading.Event()
# Locking to ensure that if there are multiple
# users posting to the backing board that we only
# have 1 writer modifying it at a time, but we can
# have X readers.
self._lock = utils.ReaderWriterLock()
self._board = []
self._closed = False
def close(self):
self._closed = True
def _select_posts(self, date_functor):
for (d, j) in self._board:
if date_functor(d):
yield j
def repost(self, job):
# Let people know a job is here
self._notify_posted(job)
self._event.set()
# And now that they are notified, reset for another posting.
self._event.clear()
@check_not_closed
def post(self, job):
with self._lock.acquire(read=False):
self._board.append((datetime.datetime.utcnow(), job))
# Ensure the job tracks that we posted it
job.posted_on.append(weakref.proxy(self))
# Let people know a job is here
self._notify_posted(job)
self._event.set()
# And now that they are notified, reset for another posting.
self._event.clear()
@check_not_closed
def posted_before(self, date_posted=None):
date_functor = lambda d: True
if date_posted is not None:
date_functor = lambda d: d < date_posted
with self._lock.acquire(read=True):
return [j for j in self._select_posts(date_functor)]
@check_not_closed
def erase(self, job):
with self._lock.acquire(read=False):
# Ensure that we even have said job in the first place.
exists = False
for (d, j) in self._board:
if j == job:
exists = True
break
if not exists:
raise exc.JobNotFound()
if job.state not in (states.SUCCESS, states.FAILURE):
raise exc.InvalidStateException("Can not delete a job in "
"state %s" % (job.state))
self._board = [(d, j) for (d, j) in self._board if j != job]
self._notify_erased(job)
@check_not_closed
def posted_after(self, date_posted=None):
date_functor = lambda d: True
if date_posted is not None:
date_functor = lambda d: d >= date_posted
with self._lock.acquire(read=True):
return [j for j in self._select_posts(date_functor)]
@check_not_closed
def await(self, timeout=None):
self._event.wait(timeout)

View File

@@ -0,0 +1,453 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of in-memory backend."""
import logging
from taskflow.backends.memory import memory
from taskflow import exceptions as exception
from taskflow.generics import flowdetail
from taskflow.generics import logbook
from taskflow.generics import taskdetail
from taskflow.utils import LockingDict
LOG = logging.getLogger(__name__)
logbooks = LockingDict()
flowdetails = LockingDict()
taskdetails = LockingDict()
"""
LOGBOOK
"""
def logbook_create(name, lb_id=None):
"""Creates a new LogBook model with matching lb_id"""
# Create the LogBook model
lb = memory.MemoryLogBook(name, lb_id)
# Store it in the LockingDict for LogBooks
logbooks[lb_id] = lb
def logbook_destroy(lb_id):
"""Deletes the LogBook model with matching lb_id"""
# Try deleting the LogBook
try:
del logbooks[lb_id]
# Raise a NotFound error if the LogBook doesn't exist
except KeyError:
raise exception.NotFound("No Logbook found with id "
"%s." % (lb_id,))
def logbook_save(lb):
"""Saves a generic LogBook object to the db"""
# Create a LogBook model if one doesn't exist
if not _logbook_exists(lb.uuid):
logbook_create(lb.name, lb.uuid)
# Get a copy of the LogBook model in the LockingDict
ld_lb = logbook_get(lb.uuid)
for fd in lb:
# Save each FlowDetail the LogBook to save has
fd.save()
# Add the FlowDetail to the LogBook model if not already there
if fd not in ld_lb:
logbook_add_flow_detail(lb.uuid, fd.uuid)
def logbook_delete(lb):
"""Deletes a LogBook from db based on a generic type"""
# Try to get the LogBook
try:
ld_lb = logbooks[lb.uuid]
# Raise a NotFound exception if the LogBook cannot be found
except KeyError:
raise exception.NotFound("No Logbook found with id "
"%s." % (lb.uuid,))
# Raise an error if the LogBook still has FlowDetails
if len(ld_lb):
raise exception.Error("Logbook <%s> still has "
"dependents." % (lb.uuid,))
# Destroy the LogBook model if it is safe
else:
logbook_destroy(lb.uuid)
def logbook_get(lb_id):
"""Gets a LogBook with matching lb_id, if it exists"""
# Try to get the LogBook
try:
ld_lb = logbooks[lb_id]
# Raise a NotFound exception if the LogBook is not there
except KeyError:
raise exception.NotFound("No Logbook found with id "
"%s." % (lb_id,))
# Acquire a read lock on the LogBook
with ld_lb.acquire_lock(read=True):
# Create a generic type LogBook to return
retVal = logbook.LogBook(ld_lb.name, ld_lb.uuid)
# Attach the appropriate generic FlowDetails to the generic LogBook
for fd in ld_lb:
retVal.add_flow_detail(flowdetail_get(fd.uuid))
return retVal
def logbook_add_flow_detail(lb_id, fd_id):
"""Adds a FlowDetail with id fd_id to a LogBook with id lb_id"""
# Try to get the LogBook
try:
ld_lb = logbooks[lb_id]
# Raise a NotFound exception if the LogBook is not there
except KeyError:
raise exception.NotFound("No Logbook found with id "
"%s." % (lb_id,))
# Try to get the FlowDetail to add
try:
ld_fd = flowdetails[fd_id]
# Raise a NotFound exception if the FlowDetail is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
# Acquire a write lock on the LogBook
with ld_lb.acquire_lock(read=False):
# Add the FlowDetail model to the LogBook model
ld_lb.add_flow_detail(ld_fd)
def logbook_remove_flowdetail(lb_id, fd_id):
"""Removes a FlowDetail with id fd_id from a LogBook with id lb_id"""
# Try to get the LogBook
try:
ld_lb = logbooks[lb_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No Logbook found with id "
"%s." % (lb_id,))
# Try to get the FlowDetail to remove
try:
ld_fd = flowdetails[fd_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
# Acquire a write lock on the LogBook model
with ld_lb.acquire_lock(read=False):
# Remove the FlowDetail from the LogBook model
ld_lb.remove_flow_detail(ld_fd)
def logbook_get_ids_names():
"""Returns all LogBook ids and names"""
lb_ids = []
lb_names = []
# Iterate through the LockingDict and append to the Lists
for (k, v) in logbooks.items():
lb_ids.append(k)
lb_names.append(v.name)
# Return a dict of the ids and names
return dict(zip(lb_ids, lb_names))
def _logbook_exists(lb_id, session=None):
"""Check if a LogBook with lb_id exists"""
return lb_id in logbooks.keys()
"""
FLOWDETAIL
"""
def flowdetail_create(name, wf, fd_id=None):
"""Create a new FlowDetail model with matching fd_id"""
# Create a FlowDetail model to save
fd = memory.MemoryFlowDetail(name, wf, fd_id)
#Save the FlowDetail model to the LockingDict
flowdetails[fd_id] = fd
def flowdetail_destroy(fd_id):
"""Deletes the FlowDetail model with matching fd_id"""
# Try to delete the FlowDetail model
try:
del flowdetails[fd_id]
# Raise a NotFound exception if the FlowDetail is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
def flowdetail_save(fd):
"""Saves a generic FlowDetail object to the db"""
# Create a FlowDetail model if one does not exist
if not _flowdetail_exists(fd.uuid):
flowdetail_create(fd.name, fd.flow, fd.uuid)
# Get a copy of the FlowDetail model in the LockingDict
ld_fd = flowdetail_get(fd.uuid)
for td in fd:
# Save the TaskDetails in the FlowDetail to save
td.save()
# Add the TaskDetail model to the FlowDetail model if it is not there
if td not in ld_fd:
flowdetail_add_task_detail(fd.uuid, td.uuid)
def flowdetail_delete(fd):
"""Deletes a FlowDetail from db based on a generic type"""
# Try to get the FlowDetails
try:
ld_fd = flowdetails[fd.uuid]
# Raise a NotFound exception if the FlowDetail is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd.uuid,))
# Raise an error if the FlowDetail still has TaskDetails
if len(ld_fd):
raise exception.Error("FlowDetail <%s> still has "
"dependents." % (fd.uuid,))
# If it is safe, delete the FlowDetail model
else:
flowdetail_destroy(fd.uuid)
def flowdetail_get(fd_id):
"""Gets a FlowDetail with matching fd_id, if it exists"""
# Try to get the FlowDetail
try:
fd = flowdetails[fd_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
# Acquire a read lock on the FlowDetail
with fd.acquire_lock(read=True):
# Get the Flow this FlowDetail represents
wf = fd.flow
# Create a FlowDetail to return
retVal = flowdetail.FlowDetail(fd.name, wf, fd.uuid)
# Change updated_at to reflect the current data
retVal.updated_at = fd.updated_at
# Add the generic TaskDetails to the FlowDetail to return
for td in fd:
retVal.add_task_detail(taskdetail_get(td.uuid))
return retVal
def flowdetail_add_task_detail(fd_id, td_id):
"""Adds a TaskDetail with id td_id to a Flowdetail with id fd_id"""
# Try to get the FlowDetail
try:
fd = flowdetails[fd_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
# Try to get the TaskDetail to add
try:
td = taskdetails[td_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No TaskDetail found with id "
"%s." % (td_id,))
# Acquire a write lock on the FlowDetail model
with fd.acquire_lock(read=False):
# Add the TaskDetail to the FlowDetail model
fd.add_task_detail(td)
def flowdetail_remove_taskdetail(fd_id, td_id):
"""Removes a TaskDetail with id td_id from a FlowDetail with id fd_id"""
# Try to get the FlowDetail
try:
fd = flowdetails[fd_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No FlowDetail found with id "
"%s." % (fd_id,))
# Try to get the TaskDetail to remove
try:
td = taskdetails[td_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No TaskDetail found with id "
"%s." % (td_id,))
# Acquire a write lock on the FlowDetail
with fd.acquire_lock(read=False):
# Remove the TaskDetail model from the FlowDetail model
fd.remove_task_detail(td)
def flowdetail_get_ids_names():
"""Returns all FlowDetail ids and names"""
fd_ids = []
fd_names = []
# Iterate through the LockingDict and append to lists
for (k, v) in flowdetails.items():
fd_ids.append(k)
fd_names.append(v.name)
# Return a dict of the uuids and names
return dict(zip(fd_ids, fd_names))
def _flowdetail_exists(fd_id):
"""Checks if a FlowDetail with fd_id exists"""
return fd_id in flowdetails.keys()
"""
TASKDETAIL
"""
def taskdetail_create(name, tsk, td_id=None):
"""Create a new TaskDetail model with matching td_id"""
# Create a TaskDetail model to save
td = memory.MemoryTaskDetail(name, tsk, td_id)
# Save the TaskDetail model to the LockingDict
taskdetails[td_id] = td
def taskdetail_destroy(td_id):
"""Deletes the TaskDetail model with matching td_id"""
# Try to delete the TaskDetails
try:
del taskdetails[td_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No TaskDetail found with id "
"%s." % (td_id,))
def taskdetail_save(td):
"""Saves a generic TaskDetail object to the db"""
# Create a TaskDetail model if it doesn't exist
if not _taskdetail_exists(td.uuid):
taskdetail_create(td.name, td.task, td.uuid)
# Prepare values for updating
values = dict(state=td.state,
results=td.results,
exception=td.exception,
stacktrace=td.stacktrace,
meta=td.meta)
# Update the values of the TaskDetail model
taskdetail_update(td.uuid, values)
def taskdetail_delete(td):
"""Deletes a TaskDetail from db based on a generic type"""
# Destroy the TaskDetail model
taskdetail_destroy(td.uuid)
def taskdetail_get(td_id):
"""Gets a TaskDetail with matching td_id, if it exists"""
# Try to get the TaskDetail
try:
ld_td = taskdetails[td_id]
# Raise NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No TaskDetail found with id "
"%s." % (td_id,))
# Acquire a read lock
with ld_td.acquire_lock(read=True):
# Get the Task this TaskDetail represents
tsk = ld_td.task
# Update TaskDetail to return
retVal = taskdetail.TaskDetail(ld_td.name, tsk, ld_td.uuid)
retVal.updated_at = ld_td.updated_at
retVal.state = ld_td.state
retVal.results = ld_td.results
retVal.exception = ld_td.exception
retVal.stacktrace = ld_td.stacktrace
retVal.meta = ld_td.meta
return retVal
def taskdetail_update(td_id, values):
"""Updates a TaskDetail with matching td_id"""
# Try to get the TaskDetail
try:
ld_td = taskdetails[td_id]
# Raise a NotFound exception if it is not there
except KeyError:
raise exception.NotFound("No TaskDetail found with id "
"%s." % (td_id,))
# Acquire a write lock for the TaskDetail
with ld_td.acquire_lock(read=False):
# Write the values to the TaskDetail
for k, v in values.iteritems():
setattr(ld_td, k, v)
def taskdetail_get_ids_names():
"""Returns all TaskDetail ids and names"""
td_ids = []
td_names = []
# Iterate through the LockingDict and append to Lists
for (k, v) in taskdetails.items():
td_ids.append(k)
td_names.append(v.name)
# Return a dict of uuids and names
return dict(zip(td_ids, td_names))
def _taskdetail_exists(td_id, session=None):
"""Check if a TaskDetail with td_id exists"""
return td_id in taskdetails.keys()

View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from taskflow.generics import flow
from taskflow.generics import flowdetail
from taskflow.generics import job
from taskflow.generics import jobboard
from taskflow.generics import logbook
from taskflow.generics import task
from taskflow.generics import taskdetail
from taskflow import utils
class MemoryJobBoard(jobboard.JobBoard):
def __init__(self, name, jb_id=None):
super(MemoryJobBoard, self).__init__(name, jb_id)
self._lock = utils.ReaderWriterLock()
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryJob(job.Job):
def __init__(self, name, job_id=None):
super(MemoryJob, self).__init__(name, job_id)
self._lock = utils.ReaderWriterLock()
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryLogBook(logbook.LogBook):
def __init__(self, name, lb_id=None):
super(MemoryLogBook, self).__init__(name, lb_id)
self._lock = utils.ReaderWriterLock()
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryFlow(flow.Flow):
def __init__(self, name, wf_id=None):
super(MemoryFlow, self).__init__(name, wf_id)
self._lock = utils.ReaderWriterLock()
self.flowdetails = {}
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryFlowDetail(flowdetail.FlowDetail):
def __init__(self, name, wf, fd_id=None):
super(MemoryFlowDetail, self).__init__(name, wf, fd_id)
self._lock = utils.ReaderWriterLock()
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryTask(task.Task):
def __init__(self, name, task_id=None):
super(MemoryTask, self).__init__(name, task_id)
self._lock = utils.ReaderWriterLock()
self.taskdetails = {}
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()
class MemoryTaskDetail(taskdetail.TaskDetail):
def __init__(self, name, task, td_id=None):
super(MemoryTaskDetail, self).__init__(name, task, td_id)
self._lock = utils.ReaderWriterLock()
@contextlib.contextmanager
def acquire_lock(self, read=True):
try:
self._lock.acquire(read)
yield self._lock
finally:
self._lock.release()

View File

@@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Catalog(object):
"""A catalog can [create, fetch, erase] logbooks for jobs"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __contains__(self, job):
"""Checks if the given catalog has a logbook for a job."""
raise NotImplementedError()
def __len__(self):
"""Gets how many logbooks are in this catalog."""
raise NotImplementedError()
@abc.abstractmethod
def create_or_fetch(self, job):
"""Creates a new logbook for a job or gives back an old one."""
raise NotImplementedError()
@abc.abstractmethod
def erase(self, job):
"""Erases a existing logbook for a job."""
raise NotImplementedError()
def close(self):
"""Allows the catalog to free any resources that it has."""
pass

View File

@@ -1,163 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy Backend"""
from oslo.config import cfg
from taskflow.common import config
from taskflow import utils
db_opts = [
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db')]
CONF = cfg.CONF
CONF.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='taskflow.db.sqlalchemy.api')
def configure():
global SQL_CONNECTION
global SQL_IDLE_TIMEOUT
config.register_db_opts()
SQL_CONNECTION = cfg.CONF.sql_connection
SQL_IDLE_TIMEOUT = cfg.CONF.sql_idle_timeout
"""
LOGBOOK
"""
def logbook_get(context, lb_id):
return IMPL.logbook_get(context, lb_id)
def logbook_get_by_name(context, lb_name):
return IMPL.logbook_get_by_name(context, lb_name)
def logbook_create(context, lb_name, lb_id=None):
return IMPL.logbook_create(context, lb_name, lb_id)
def logbook_get_workflows(context, lb_id):
return IMPL.logbook_get_workflows(context, lb_id)
def logbook_add_workflow(context, lb_id, wf_name):
return IMPL.logbook_add_workflow(context, lb_id, wf_name)
def logbook_destroy(context, lb_id):
return IMPL.logbook_destroy(context, lb_id)
"""
JOB
"""
def job_get(context, job_id):
return IMPL.job_get(context, job_id)
def job_update(context, job_id, values):
return IMPL.job_update(context, job_id, values)
def job_add_workflow(context, job_id, wf_id):
return IMPL.job_add_workflow(context, job_id, wf_id)
def job_get_owner(context, job_id):
return IMPL.job_get_owner(context, job_id)
def job_get_state(context, job_id):
return IMPL.job_get_state(context, job_id)
def job_get_logbook(context, job_id):
return IMPL.job_get_logbook(context, job_id)
def job_create(context, job_name, job_id=None):
return IMPL.job_create(context, job_name, job_id)
def job_destroy(context, job_id):
return IMPL.job_destroy(context, job_id)
"""
WORKFLOW
"""
def workflow_get(context, wf_name):
return IMPL.workflow_get(context, wf_name)
def workflow_get_all(context):
return IMPL.workflow_get_all(context)
def workflow_get_names(context):
return IMPL.workflow_get_names(context)
def workflow_get_tasks(context, wf_name):
return IMPL.workflow_get_tasks(context, wf_name)
def workflow_add_task(context, wf_name, task_id):
return IMPL.workflow_add_task(context, wf_name, task_id)
def workflow_create(context, wf_name):
return IMPL.workflow_create(context, wf_name)
def workflow_destroy(context, wf_name):
return IMPL.workflow_destroy(context, wf_name)
"""
TASK
"""
def task_get(context, task_id):
return IMPL.task_get(context, task_id)
def task_create(context, task_name, wf_id, task_id=None):
return IMPL.task_create(context, task_name, wf_id, task_id)
def task_update(context, task_id, values):
return IMPL.task_update(context, task_id, values)
def task_destroy(context, task_id):
return IMPL.task_destroy(context, task_id)

View File

@@ -1,299 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import logging
from taskflow import exceptions
from taskflow import states
from taskflow.db.sqlalchemy import models
from taskflow.db.sqlalchemy import session as sql_session
LOG = logging.getLogger(__name__)
def model_query(context, *args, **kwargs):
session = kwargs.get('session') or sql_session.get_session()
query = session.query(*args)
return query
"""
LOGBOOK
"""
def logbook_get(context, lb_id, session=None):
"""Return a logbook with matching lb_id"""
query = model_query(context, models.LogBook, session=session).\
filter_by(logbook_id=lb_id)
if not query.first():
raise exceptions.NotFound("No LogBook found with id "
"%s." % (lb_id,))
return query.first()
def logbook_get_by_name(context, lb_name):
"""Return all logbooks with matching name"""
query = model_query(context, models.LogBook).\
filter_by(name=lb_name)
if not query.all():
raise exceptions.NotFound("LogBook %s not found."
% (lb_name,))
return query.all()
def logbook_create(context, name, lb_id=None):
"""Create a new logbook"""
lb_ref = models.LogBook()
lb_ref.name = name
if lb_id:
lb_ref.logbook_id = lb_id
lb_ref.save()
return lb_ref
def logbook_get_workflows(context, lb_id):
"""Return all workflows associated with a logbook"""
session = sql_session.get_session()
with session.begin():
lb = logbook_get(context, lb_id, session=session)
return lb.workflows
def logbook_add_workflow(context, lb_id, wf_name):
"""Add Workflow to given LogBook"""
session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
lb = logbook_get(context, lb_id, session=session)
lb.workflows.append(wf)
return lb.workflows
def logbook_destroy(context, lb_id):
"""Delete a given LogBook"""
session = sql_session.get_session()
with session.begin():
lb = logbook_get(context, lb_id, session=session)
lb.delete(session=session)
"""
JOB
"""
def job_get(context, job_id, session=None):
"""Return Job with matching job_id"""
query = model_query(context, models.Job, session=session).\
filter_by(job_id=job_id)
if not query.first():
raise exceptions.NotFound("No Job with id %s found"
% (job_id,))
return query.first()
def job_update(context, job_id, values):
"""Update job with given values"""
session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.update(values)
job.save(session=session)
def job_add_workflow(context, job_id, wf_id):
"""Add a Workflow to given job"""
session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
wf = workflow_get(context, wf_id, session=session)
job.workflows.append(wf)
return job.workflows
def job_get_owner(context, job_id):
"""Return a job's current owner"""
job = job_get(context, job_id)
return job.owner
def job_get_state(context, job_id):
"""Return a job's current owner"""
job = job_get(context, job_id)
return job.state
def job_get_logbook(context, job_id):
"""Return the logbook associated with the given job"""
session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
return job.logbook
def job_create(context, name, job_id=None):
job_ref = models.Job()
job_ref.name = name
job_ref.state = states.UNCLAIMED
if job_id:
job_ref.job_id = job_id
job_ref.logbook_id = job_id
job_ref.save()
return job_ref
def job_destroy(context, job_id):
"""Delete a given Job"""
session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.delete(session=session)
"""
WORKFLOW
"""
def workflow_get(context, wf_name, session=None):
"""Return one workflow with matching workflow_id"""
query = model_query(context, models.Workflow, session=session).\
filter_by(name=wf_name)
if not query.first():
raise exceptions.NotFound("Workflow %s not found." % (wf_name,))
return query.first()
def workflow_get_all(context):
"""Return all workflows"""
results = model_query(context, models.Workflow).all()
if not results:
raise exceptions.NotFound("No Workflows were found.")
return results
def workflow_get_names(context):
"""Return all workflow names"""
results = model_query(context, models.Workflow.name).all()
return zip(*results)
def workflow_get_tasks(context, wf_name):
"""Return all tasks for a given Workflow"""
session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
return wf.tasks
def workflow_add_task(context, wf_id, task_id):
"""Add a task to a given workflow"""
session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id, session=session)
wf = workflow_get(context, wf_id, session=session)
wf.tasks.append(task)
return wf.tasks
def workflow_create(context, workflow_name):
"""Create new workflow with workflow_id"""
workflow_ref = models.Workflow()
workflow_ref.name = workflow_name
workflow_ref.save()
return workflow_ref
def workflow_destroy(context, wf_name):
"""Delete a given Workflow"""
session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
wf.delete(session=session)
"""
TASK
"""
def task_get(context, task_id, session=None):
"""Return Task with task_id"""
query = model_query(context, models.Task, session=session).\
filter_by(task_id=task_id)
if not query.first():
raise exceptions.NotFound("No Task found with id "
"%s." % (task_id,))
return query.first()
def task_create(context, task_name, wf_id, task_id=None):
"""Create task associated with given workflow"""
task_ref = models.Task()
task_ref.name = task_name
task_ref.wf_id = wf_id
if task_id:
task_ref.task_id = task_id
task_ref.save()
return task_ref
def task_update(context, task_id, values):
"""Update Task with given values"""
session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id)
task.update(values)
task.save(session=session)
def task_destroy(context, task_id):
"""Delete an existing Task"""
session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id, session=session)
task.delete(session=session)

View File

@@ -1,185 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for taskflow data.
"""
import json
from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import object_mapper, relationship
from sqlalchemy import DateTime, ForeignKey
from sqlalchemy import types as types
from taskflow.db.sqlalchemy import session as sql_session
from taskflow import exceptions
from taskflow.openstack.common import timeutils
from taskflow.openstack.common import uuidutils
CONF = cfg.CONF
BASE = declarative_base()
class Json(types.TypeDecorator, types.MutableType):
impl = types.Text
def process_bind_param(self, value, dialect):
return json.dumps(value)
def process_result_value(self, value, dialect):
return json.loads(value)
class TaskFlowBase(object):
"""Base class for TaskFlow Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
__table_initialized = False
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, default=timeutils.utcnow)
def save(self, session=None):
"""Save this object."""
if not session:
session = sql_session.get_session()
session.add(self)
try:
session.flush()
except IntegrityError, e:
if str(e).endswith('is not unique'):
raise exceptions.Duplicate(str(e))
else:
raise
def delete(self, session=None):
"""Delete this object."""
self.deleted = True
self.deleted_at = timeutils.utcnow()
if not session:
session = sql_session.get_session()
session.delete(self)
session.flush()
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
self._i = iter(object_mapper(self).columns)
return self
def next(self):
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict"""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins.
"""
local = dict(self)
joined = dict([k, v] for k, v in self.__dict__.iteritems()
if not k[0] == '_')
local.update(joined)
return local.iteritems()
workflow_logbook_assoc = Table(
'wf_lb_assoc', BASE.metadata,
Column('workflow_id', Integer, ForeignKey('workflow.id')),
Column('logbook_id', Integer, ForeignKey('logbook.id')),
Column('id', Integer, primary_key=True)
)
workflow_job_assoc = Table(
'wf_job_assoc', BASE.metadata,
Column('workflow_id', Integer, ForeignKey('workflow.id')),
Column('job_id', Integer, ForeignKey('job.id')),
Column('id', Integer, primary_key=True)
)
class LogBook(BASE, TaskFlowBase):
"""Represents a logbook for a set of workflows"""
__tablename__ = 'logbook'
id = Column(Integer, primary_key=True)
logbook_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
workflows = relationship("Workflow",
secondary=workflow_logbook_assoc)
job = relationship("Job", uselist=False, backref="logbook")
class Job(BASE, TaskFlowBase):
"""Represents a Job"""
__tablename__ = 'job'
id = Column(Integer, primary_key=True)
job_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
owner = Column(String)
state = Column(String)
workflows = relationship("Workflow",
secondary=workflow_job_assoc)
logbook_id = Column(String, ForeignKey('logbook.logbook_id'))
class Workflow(BASE, TaskFlowBase):
"""Represents Workflow detail objects"""
__tablename__ = 'workflow'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
tasks = relationship("Task", backref="workflow")
class Task(BASE, TaskFlowBase):
"""Represents Task detail objects"""
__tablename__ = 'task'
id = Column(Integer, primary_key=True)
task_id = Column(String, default=uuidutils.generate_uuid)
name = Column(String)
results = Column(Json)
exception = Column(String)
stacktrace = Column(String)
workflow_id = Column(String, ForeignKey('workflow.id'))
def create_tables():
BASE.metadata.create_all(sql_session.get_engine())

View File

@@ -1,111 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend."""
import logging
import sqlalchemy
import sqlalchemy.engine
import sqlalchemy.interfaces
import sqlalchemy.orm
from sqlalchemy.pool import NullPool
from sqlalchemy import exc
from taskflow.db import api as db_api
LOG = logging.getLogger(__name__)
_ENGINE = None
_MAKER = None
def get_session(autocommit=True, expire_on_commit=True):
"""Return a SQLAlchemy session."""
global _MAKER
if _MAKER is None:
_MAKER = get_maker(get_engine(), autocommit, expire_on_commit)
return _MAKER()
def synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode"""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise exc.DisconnectionError("Database server went away")
else:
raise
def get_engine():
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
connection_dict = sqlalchemy.engine.url.make_url(_get_sql_connection())
engine_args = {
"pool_recycle": _get_sql_idle_timeout(),
"echo": False,
"convert_unicode": True
}
if "sqlite" in connection_dict.drivername:
engine_args['poolclass'] = NullPool
_ENGINE = sqlalchemy.create_engine(_get_sql_connection(),
**engine_args)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
if 'sqlite' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
# TODO(jharlow): Check to make sure engine connected
return _ENGINE
def get_maker(engine, autocommit=True, expire_on_commit=False):
"Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
def _get_sql_connection():
return db_api.SQL_CONNECTION
def _get_sql_idle_timeout():
return db_api.SQL_IDLE_TIMEOUT

View File

@@ -2,7 +2,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@@ -78,7 +78,7 @@ class Flow(object):
if parents:
self.parents = tuple(parents)
else:
self.parents = ()
self.parents = tuple([])
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
@@ -102,8 +102,7 @@ class Flow(object):
@property
def uuid(self):
"""Uniquely identifies this flow"""
return "f-%s" % (self._id)
return self._id
@property
def state(self):

View File

@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
class FlowDetail(object):
"""Generic backend class that contains TaskDetail snapshots from when
the FlowDetail is gotten. The data contained within this class is
not backed by the backend storage in real time.
The data in this class will only be persisted by the backend when
the save() method is called.
"""
def __init__(self, name, wf, fd_id=None):
if fd_id:
self._uuid = fd_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self._flow = wf
self.updated_at = datetime.now()
self._taskdetails = []
def save(self):
b_api.flowdetail_save(self)
def delete(self):
b_api.flowdetail_delete(self)
def add_task_detail(self, td):
self._taskdetails.append(td)
def remove_task_detail(self, td):
self._taskdetails = [d for d in self._taskdetails
if d.uuid != td.uuid]
def __contains__(self, td):
for self_td in self:
if self_td.taskdetail_id == td.taskdetail_id:
return True
return False
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
@property
def flow(self):
return self._flow
def __iter__(self):
for td in self._taskdetails:
yield td
def __len__(self):
return len(self._taskdetails)

59
taskflow/generics/job.py Normal file
View File

@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
class Job(object):
def __init__(self, name, uuid=None):
if uuid:
self._uuid = uuid
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self.owner = None
self.state = None
self._flows = []
self.logbook = None
def add_flow(self, wf):
self._flows.append(wf)
def remove_flow(self, wf):
self._flows = [f for f in self._flows
if f.uuid != wf.uuid]
def __contains__(self, wf):
for self_wf in self.flows:
if self_wf.flow_id == wf.flow_id:
return True
return False
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
@property
def flows(self):
return self._flows

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
# TODO(kchenweijie): THIS ENTIRE CLASS
class JobBoard(object):
def __init__(self, name, jb_id=None):
if jb_id:
self._uuid = jb_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self._jobs = []
def add_job(self, job):
self._jobs.append(job)
def get_job(self, job_id):
return self._jobs[job_id]
def remove_job(self, job):
self._jobs = [j for j in self._jobs
if j.uuid != job.uuid]
def __contains__(self, job):
for self_job in self.jobs:
if self_job.job_id == job.job_id:
return True
return False
@property
def name(self):
return self._name
@property
def uuid(self):
return self._uuid
@property
def jobs(self):
return self._jobs

View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
class LogBook(object):
"""Generic backend class that contains TaskDetail snapshots from when
the FlowDetail is gotten. The data contained within this class is
not backed by the backend storage in real time.
The data in this class will only be persisted by the backend when
the save method is called.
"""
def __init__(self, name, lb_id=None):
if lb_id:
self._uuid = lb_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self.updated_at = datetime.now()
self._flowdetails = []
def save(self):
b_api.logbook_save(self)
def delete(self):
b_api.logbook_delete(self)
def add_flow_detail(self, fd):
self._flowdetails.append(fd)
def remove_flow_detail(self, fd):
self._flowdetails = [d for d in self._flowdetails
if d.uuid != fd.uuid]
def __contains__(self, fd):
for self_fd in self:
if self_fd.flowdetail_id == fd.flowdetail_id:
return True
return False
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __iter__(self):
for fd in self._flowdetails:
yield fd
def __len__(self):
return len(self._flowdetails)

View File

@@ -2,7 +2,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -18,6 +18,8 @@
import abc
# from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
from taskflow import utils
@@ -27,8 +29,12 @@ class Task(object):
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name):
self.name = name
def __init__(self, name, task_id=None):
if task_id:
self._uuid = task_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
@@ -44,6 +50,14 @@ class Task(object):
# major, minor version semantics apply.
self.version = (1, 0)
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __str__(self):
return "%s==%s" % (self.name, utils.join(self.version, with_what="."))

View File

@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from taskflow.backends import api as b_api
from taskflow.openstack.common import uuidutils
class TaskDetail(object):
def __init__(self, name, task, td_id=None):
if td_id:
self._uuid = td_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
self.updated_at = datetime.now()
self.state = None
self.results = None
self.exception = None
self.stacktrace = None
self.meta = None
self.task = task
def save(self):
b_api.taskdetail_save(self)
def delete(self):
b_api.taskdetail_delete(self)
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name

View File

@@ -1,159 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
from taskflow.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
class Claimer(object):
"""A base class for objects that can attempt to claim a given
job, so that said job can be worked on.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def claim(self, job, owner):
"""This method will attempt to claim said job and must
either succeed at this or throw an exception signaling the job can not
be claimed.
"""
raise NotImplementedError()
@abc.abstractmethod
def unclaim(self, job, owner):
"""This method will attempt to unclaim said job and must
either succeed at this or throw an exception signaling the job can not
be unclaimed.
"""
raise NotImplementedError()
class Job(object):
"""A job is connection to some set of work to be done by some agent. Basic
information is provided about said work to be able to attempt to
fullfill said work.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name, context, catalog, claimer, uuid=None):
self.name = name
self.context = context
self.owner = None
self.posted_on = []
self._catalog = catalog
self._claimer = claimer
self._logbook = None
if not uuid:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid)
self._state = states.UNCLAIMED
def __str__(self):
lines = ['Job: %s' % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self.state))
return "; ".join(lines)
@property
def state(self):
return self._state
@state.setter
def state(self, new_state):
self._change_state(new_state)
def _change_state(self, new_state):
if self.state != new_state:
self._state = new_state
# TODO(harlowja): add logbook info?
@property
def logbook(self):
"""Fetches (or creates) a logbook entry for this job."""
if self._logbook is None:
self._logbook = self._catalog.create_or_fetch(self)
return self._logbook
def claim(self, owner):
"""This can be used to attempt transition this job from unclaimed
to claimed.
This must be done in a way that likely uses some type of locking or
ownership transfer so that only a single entity gets this job to work
on. This will avoid multi-job ownership, which can lead to
inconsistent state.
"""
if self.state != states.UNCLAIMED:
raise exc.UnclaimableJobException("Unable to claim job when job is"
" in state %s" % (self.state))
self._claimer.claim(self, owner)
self._change_state(states.CLAIMED)
def run(self, flow, *args, **kwargs):
if flow.state != states.PENDING:
raise exc.InvalidStateException("Unable to run %s when in"
" state %s" % (flow, flow.state))
return flow.run(self.context, *args, **kwargs)
def unclaim(self):
"""Atomically transitions this job from claimed to unclaimed."""
if self.state == states.UNCLAIMED:
return
self._claimer.unclaim(self, self.owner)
self._change_state(states.UNCLAIMED)
def erase(self):
"""Erases any traces of this job from its associated resources."""
for b in self.posted_on:
b.erase(self)
self._catalog.erase(self)
if self._logbook is not None:
self._logbook.close()
self._logbook = None
if self.state != states.UNCLAIMED:
self._claimer.unclaim(self, self.owner)
def await(self, timeout=None):
"""Awaits until either the job fails or succeeds or the provided
timeout is reached.
"""
def check_functor():
if self.state not in (states.FAILURE, states.SUCCESS):
return False
return True
return utils.await(check_functor, timeout)
@property
def uuid(self):
"""Returns a tracking *unique* identifier that can be used to identify
this job among other jobs.
"""
return "j-%s" % (self._id)

View File

@@ -1,98 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
# Sent as what happened to listeners
POSTED = 'POSTED'
ERASED = 'ERASED'
class JobBoard(object):
"""Base class for job boards."""
__metaclass__ = abc.ABCMeta
def __init__(self):
self._listeners = []
@abc.abstractmethod
def post(self, job):
"""Posts a job to the job board."""
raise NotImplementedError()
@abc.abstractmethod
def repost(self, job):
"""Reposts a job that already exists on the job board."""
raise NotImplementedError()
def _notify_posted(self, job):
"""When a job is received, by whichever mechanism the underlying
implementation provides, the job should be given to said listeners
for them to know that a job has arrived.
"""
for f in self._listeners:
f(job, POSTED)
def _notify_erased(self, job):
"""When a job is erased, by whichever mechanism the underlying
implementation provides, the job should be given to said listeners
for them to know that a job has been erased.
"""
for f in self._listeners:
f(job, ERASED)
@abc.abstractmethod
def posted_after(self, date_posted=None):
"""Gets the jobs posted after (or equal to) the given datetime object
(or all jobs if none).
"""
raise NotImplementedError()
@abc.abstractmethod
def posted_before(self, date_posted=None):
"""Gets the jobs posted before the given datetime object
(or all jobs if none).
"""
raise NotImplementedError()
@abc.abstractmethod
def erase(self, job):
"""Erases the given job from this job board."""
raise NotImplementedError()
@abc.abstractmethod
def await(self, timeout=None):
"""Blocks the current thread until a new job has arrived."""
raise NotImplementedError()
def subscribe(self, listener):
"""Adds a new listener who will be notified on job updates/postings."""
if listener not in self._listeners:
self._listeners.append(listener)
def unsubscribe(self, listener):
"""Removes a given listener from notifications about job
updates/postings.
"""
if listener in self._listeners:
self._listeners.remove(listener)
def close(self):
"""Allows the job board to free any resources that it has."""
pass

View File

@@ -1,138 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
import weakref
class TaskDetail(object):
"""Task details have the bare minimum of these fields/methods."""
def __init__(self, name, metadata=None):
self.date_created = datetime.datetime.utcnow()
self.name = name
self.metadata = metadata
self.date_updated = None
def __str__(self):
return "TaskDetail (%s, %s): %s" % (self.name, self.date_created,
self.metadata)
class FlowDetail(object):
"""Flow details have the bare minimum of these fields/methods."""
__metaclass__ = abc.ABCMeta
def __init__(self, book, name):
self.book = weakref.proxy(book)
self.name = name
@abc.abstractmethod
def __iter__(self):
"""Iterates over all task details.
The order will be in the same order that they were added.
"""
raise NotImplementedError()
@abc.abstractmethod
def __contains__(self, task_name):
"""Determines if any task details with the given name exists in this
flow details.
"""
raise NotImplementedError()
@abc.abstractmethod
def __getitem__(self, task_name):
"""Fetch any task details that match the given task name."""
raise NotImplementedError()
@abc.abstractmethod
def add_task(self, task_name, metadata=None):
"""Atomically creates a new task detail entry to this flows details and
returns it for further use.
"""
raise NotImplementedError()
@abc.abstractmethod
def __delitem__(self, task_name):
"""Deletes any task details that match the given task name."""
raise NotImplementedError()
@abc.abstractmethod
def __len__(self):
"""Returns how many task details objects the flow contains."""
raise NotImplementedError()
def __str__(self):
return "FlowDetail (%s): %s entries" % (self.name, len(self))
class LogBook(object):
"""Base class for what a logbook should provide"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def add_flow(self, flow_name):
"""Atomically adds and returns a new flow details object to the given
logbook or raises an exception if that flow (or a flow with that name)
already exists.
"""
raise NotImplementedError()
@abc.abstractmethod
def __getitem__(self, flow_name):
"""Fetches the given flow details object for the given flow
name or raises an exception if that flow name does not exist.
"""
raise NotImplementedError()
@abc.abstractmethod
def __contains__(self, flow_name):
"""Determines if a flow details object with the given flow name
exists in this logbook.
"""
raise NotImplementedError()
@abc.abstractmethod
def __delitem__(self, flow_name):
"""Removes the given flow details object that matches the provided
flow name or raises an exception if that flow name does not
exist.
"""
raise NotImplementedError()
@abc.abstractmethod
def __iter__(self):
"""Iterates over all the contained flow details.
The order will be in the same order that they were added.
"""
raise NotImplementedError()
@abc.abstractmethod
def __len__(self):
"""Returns how many flow details the logbook contains."""
raise NotImplementedError()
def close(self):
"""Allows the logbook to free any resources that it has."""
pass

View File

@@ -26,12 +26,12 @@ from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
from taskflow.patterns import base
from taskflow.generics import flow
LOG = logging.getLogger(__name__)
class Flow(base.Flow):
class Flow(flow.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.

View File

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.backends import api as b_api
def setUpModule():
b_api.configure('mem_backend')
def tearDownModule():
pass

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Import required libraries"""
import unittest2
from taskflow.backends import api as b_api
from taskflow import exceptions as exception
from taskflow.generics import flowdetail
from taskflow.openstack.common import uuidutils
from taskflow.patterns import graph_flow as flow
from taskflow.tests import utils
class FlowDetailTest(unittest2.TestCase):
"""This class is to test the functionality of the backend API's flowdetail
methods.
"""
wfs = []
fd_names = []
fd_ids = []
tsks = []
td_names = []
td_ids = []
@classmethod
def setUpClass(cls):
# Create a workflow for flowdetails to use
wf_id = uuidutils.generate_uuid()
wf_name = 'wf-%s' % (wf_id)
wf = flow.Flow(wf_name, None, wf_id)
cls.wfs.append(wf)
# Create a task for taskdetails to use
task_id = uuidutils.generate_uuid()
task_name = 'task-%s' % (task_id)
tsk = utils.DummyTask(task_name, task_id)
cls.tsks.append(tsk)
@classmethod
def tearDownClass(cls):
# Clear out the lists of workflows and tasks
utils.drain(cls.wfs)
utils.drain(cls.tsks)
def setUp(self):
# Create a flowdetail and record its uuid and name
fd_id = uuidutils.generate_uuid()
fd_name = 'fd-%s' % (fd_id)
b_api.flowdetail_create(fd_name, self.wfs[0], fd_id)
self.fd_names.append(fd_name)
self.fd_ids.append(fd_id)
# Create a taskdetail and record its uuid and name
td_id = uuidutils.generate_uuid()
td_name = 'td-%s' % (td_id)
b_api.taskdetail_create(td_name, self.tsks[0], td_id)
self.td_names.append(td_name)
self.td_ids.append(td_id)
def tearDown(self):
# Destroy all taskdetails and flowdetails in the backend
for id in self.td_ids:
b_api.taskdetail_destroy(id)
for id in self.fd_ids:
b_api.flowdetail_destroy(id)
# Drain the lists of taskdetail and flowdetail uuids and names
utils.drain(self.fd_names)
utils.drain(self.fd_ids)
utils.drain(self.td_names)
utils.drain(self.td_ids)
def test_flowdetail_create(self):
# Create a flowdetail and record its uuid and name
fd_id = uuidutils.generate_uuid()
fd_name = 'fd-%s' % (fd_id)
b_api.flowdetail_create(fd_name, self.wfs[0], fd_id)
self.fd_names.append(fd_name)
self.fd_ids.append(fd_id)
# Check to see that the created flowdetail is there
actual = b_api.flowdetail_get(fd_id)
self.assertIsNotNone(actual)
def test_flowdetail_destroy(self):
# Destroy the last added flowdetail
id = self.fd_ids.pop()
b_api.flowdetail_destroy(id)
self.fd_names.pop()
# Check to make sure the removed flowdetail is no longer there
self.assertRaises(exception.NotFound, b_api.flowdetail_get,
id)
def test_flowdetail_save(self):
# Create a generic flowdetail to save
fd_id = uuidutils.generate_uuid()
fd_name = 'fd-%s' % (fd_id)
wf = self.wfs[0]
fd = flowdetail.FlowDetail(fd_name, wf, fd_id)
# Save the generic flowdetail to the backend and record its uuid/name
b_api.flowdetail_save(fd)
self.fd_names.append(fd_name)
self.fd_ids.append(fd_id)
# Check that the saved flowdetail is in the backend
actual = b_api.flowdetail_get(fd_id)
self.assertIsNotNone(actual)
# Check that the saved flowdetail has no taskdetails
self.assertEquals(len(actual), 0)
# Add a generic taskdetail to the flowdetail
td = b_api.taskdetail_get(self.td_ids[0])
fd.add_task_detail(td)
# Save the updated flowdetail
b_api.flowdetail_save(fd)
# Check that the saved flowdetail is still there
actual = b_api.flowdetail_get(fd_id)
self.assertIsNotNone(actual)
# Check that the addition of a taskdetail was recorded
self.assertEquals(len(actual), 1)
def test_flowdetail_delete(self):
# Get the flowdetail to delete
id = self.fd_ids.pop()
fd = b_api.flowdetail_get(id)
# Delete the flowdetail
b_api.flowdetail_delete(fd)
self.fd_names.pop()
# Make sure it is not there anymore
self.assertRaises(exception.NotFound, b_api.flowdetail_get,
id)
def test_flowdetail_get(self):
# Get the first flowdetail
actual = b_api.flowdetail_get(self.fd_ids[0])
# Check that it is a flowdetail
self.assertIsInstance(actual, flowdetail.FlowDetail)
# Check that its name matches what is expected
self.assertEquals(actual.name, self.fd_names[0])
def test_flowdetail_add_task_detail(self):
# Get the first flowdetail
actual = b_api.flowdetail_get(self.fd_ids[0])
# Make sure it has no taskdetails
self.assertEquals(len(actual), 0)
# Add a taskdetail to the flowdetail
b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0])
# Get the flowdetail again
actual = b_api.flowdetail_get(self.fd_ids[0])
# Check that the flowdetail has one taskdetail
self.assertEquals(len(actual), 1)
def test_flowdetail_remove_taskdetail(self):
# Add a taskdetail to the first flowdetail
b_api.flowdetail_add_task_detail(self.fd_ids[0], self.td_ids[0])
# Get the first flowdetail
actual = b_api.flowdetail_get(self.fd_ids[0])
# Check that the first flowdetail has exactly one taskdetail
self.assertEquals(len(actual), 1)
# Remove the taskdetail from the first flowdetail
b_api.flowdetail_remove_taskdetail(self.fd_ids[0], self.td_ids[0])
# Get the first flowdetail
actual = b_api.flowdetail_get(self.fd_ids[0])
# Check that the first flowdetail no longer has any taskdetails
self.assertEquals(len(actual), 0)
def test_flowdetail_get_ids_names(self):
# Get a list of all uuids and names for flowdetails
actual = b_api.flowdetail_get_ids_names()
# Match it to our in-memory records
self.assertEquals(actual.values(), self.fd_names)
self.assertEquals(actual.keys(), self.fd_ids)

View File

@@ -0,0 +1,202 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Import required libraries"""
import unittest2
from taskflow.backends import api as b_api
from taskflow import exceptions as exception
from taskflow.generics import logbook
from taskflow.openstack.common import uuidutils
from taskflow.patterns import graph_flow as flow
from taskflow.tests import utils
class LogBookTest(unittest2.TestCase):
"""This class is designed to test the functionality of the backend API's
logbook methods
"""
lb_names = []
lb_ids = []
wfs = []
fd_names = []
fd_ids = []
@classmethod
def setUpClass(cls):
# Create a workflow to create flowdetails with
wf_id = uuidutils.generate_uuid()
wf_name = 'wf-%s' % (wf_id)
wf = flow.Flow(wf_name, None, wf_id)
cls.wfs.append(wf)
@classmethod
def tearDownClass(cls):
# Empty the list of workflows
utils.drain(cls.wfs)
def setUp(self):
# Create a logbook and record its uuid and name
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
b_api.logbook_create(lb_name, lb_id)
self.lb_names.append(lb_name)
self.lb_ids.append(lb_id)
# Create a flowdetail and record its uuid and name
fd_id = uuidutils.generate_uuid()
fd_name = 'fd-%s' % (fd_id)
b_api.flowdetail_create(fd_name, self.wfs[0], fd_id)
self.fd_names.append(fd_name)
self.fd_ids.append(fd_id)
def tearDown(self):
# Destroy all flowdetails and logbooks in the backend
for id in self.fd_ids:
b_api.flowdetail_destroy(id)
for id in self.lb_ids:
b_api.logbook_destroy(id)
# Clear the lists of logbook and flowdetail uuids and names
utils.drain(self.lb_names)
utils.drain(self.lb_ids)
utils.drain(self.fd_names)
utils.drain(self.fd_ids)
def test_logbook_create(self):
# Create a logbook and record its uuid and name
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
b_api.logbook_create(lb_name, lb_id)
self.lb_names.append(lb_name)
self.lb_ids.append(lb_id)
# Check that the created logbook exists in the backend
actual = b_api.logbook_get(lb_id)
self.assertIsNotNone(actual)
def test_logbook_destroy(self):
# Delete the last added logbook
id = self.lb_ids.pop()
b_api.logbook_destroy(id)
self.lb_names.pop()
# Check that the deleted logbook is no longer there
self.assertRaises(exception.NotFound, b_api.logbook_get,
id)
def test_logbook_save(self):
# Create a generic logbook to save
lb_id = uuidutils.generate_uuid()
lb_name = 'lb-%s' % (lb_id)
lb = logbook.LogBook(lb_name, lb_id)
# Save the logbook and record its uuid and name
b_api.logbook_save(lb)
self.lb_names.append(lb_name)
self.lb_ids.append(lb_id)
# Check that the saved logbook exists in the backend
actual = b_api.logbook_get(lb_id)
self.assertIsNotNone(actual)
# Check that the saved logbook has no flowdetails
self.assertEquals(len(actual), 0)
# Add a flowdetail to the logbook
fd = b_api.flowdetail_get(self.fd_ids[0])
lb.add_flow_detail(fd)
# Save the updated logbook
b_api.logbook_save(lb)
# Check that the updated logbook is still in the backend
actual = b_api.logbook_get(lb_id)
self.assertIsNotNone(actual)
# Check that the added flowdetail was recorded
self.assertEquals(len(actual), 1)
def test_logbook_delete(self):
# Get the logbook to delete
id = self.lb_ids.pop()
lb = b_api.logbook_get(id)
# Delete the logbook from the backend
b_api.logbook_delete(lb)
self.lb_names.pop()
# Check that the deleted logbook is no longer present
self.assertRaises(exception.NotFound, b_api.logbook_get,
id)
def test_logbook_get(self):
# Get the logbook from the backend
actual = b_api.logbook_get(self.lb_ids[0])
# Check that it is actually a logbook
self.assertIsInstance(actual, logbook.LogBook)
# Check that the name is correct
self.assertEquals(actual.name, self.lb_names[0])
def test_logbook_add_flow_detail(self):
# Get the logbook from the backend
actual = b_api.logbook_get(self.lb_ids[0])
# Check that it has no flowdetails
self.assertEquals(len(actual), 0)
# Add a flowdetail to the logbook
b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0])
# Get the logbook again
actual = b_api.logbook_get(self.lb_ids[0])
# Check that the logbook has exactly one flowdetail
self.assertEquals(len(actual), 1)
def test_logbook_remove_flowdetail(self):
# Add a flowdetail to the first logbook
b_api.logbook_add_flow_detail(self.lb_ids[0], self.fd_ids[0])
# Get the first logbook
actual = b_api.logbook_get(self.lb_ids[0])
# Check that it has exactly one flowdetail
self.assertEquals(len(actual), 1)
# Remove the flowdetail from the logbook
b_api.logbook_remove_flowdetail(self.lb_ids[0], self.fd_ids[0])
# Get the logbook again
actual = b_api.logbook_get(self.lb_ids[0])
# Check that the logbook now has no flowdetails
self.assertEquals(len(actual), 0)
def test_logbook_get_ids_names(self):
# Get the dict of uuids and names
actual = b_api.logbook_get_ids_names()
# Check that it matches our in-memory list
self.assertEquals(actual.values(), self.lb_names)
self.assertEquals(actual.keys(), self.lb_ids)

View File

@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Import required libraries"""
import unittest2
from taskflow.backends import api as b_api
from taskflow import exceptions as exception
from taskflow.generics import taskdetail
from taskflow.openstack.common import uuidutils
from taskflow.tests import utils
class TaskDetailTest(unittest2.TestCase):
"""This class is designed to test the functionality of the backend API's
taskdetail methods
"""
tsks = []
td_names = []
td_ids = []
@classmethod
def setUpClass(cls):
# Create a task for taskdetails to be made from
task_id = uuidutils.generate_uuid()
task_name = 'task-%s' % (task_id)
tsk = utils.DummyTask(task_name, task_id)
tsk.requires.update('r')
tsk.optional.update('o')
tsk.provides.update('p')
cls.tsks.append(tsk)
@classmethod
def tearDownClass(cls):
# Clear the tasks
utils.drain(cls.tsks)
def setUp(self):
# Create a taskdetail and record its uuid and name
td_id = uuidutils.generate_uuid()
td_name = 'td-%s' % (td_id)
b_api.taskdetail_create(td_name, self.tsks[0], td_id)
self.td_names.append(td_name)
self.td_ids.append(td_id)
def tearDown(self):
# Destroy all taskdetails from the backend
for id in self.td_ids:
b_api.taskdetail_destroy(id)
# Clear the list of taskdetail names and uuids
utils.drain(self.td_names)
utils.drain(self.td_ids)
def test_taskdetail_create(self):
# Create a taskdetail and record its uuid and name
td_id = uuidutils.generate_uuid()
td_name = 'td-%s' % (td_id)
b_api.taskdetail_create(td_name, self.tsks[0], td_id)
self.td_names.append(td_name)
self.td_ids.append(td_id)
# Check that the taskdetail is there
actual = b_api.taskdetail_get(td_id)
self.assertIsNotNone(actual)
def test_taskdetail_destroy(self):
# Destroy the last added taskdetail
id = self.td_ids.pop()
b_api.taskdetail_destroy(id)
self.td_names.pop()
# Check that the deleted taskdetail is no longer there
self.assertRaises(exception.NotFound, b_api.taskdetail_get,
id)
def test_taskdetail_save(self):
# Create a generic taskdetail to save
td_id = uuidutils.generate_uuid()
td_name = 'td-%s' % (td_id)
tsk = self.tsks[0]
td = taskdetail.TaskDetail(td_name, tsk, td_id)
# Save the generic taskdetail to the backend and record uuid/name
b_api.taskdetail_save(td)
self.td_names.append(td_name)
self.td_ids.append(td_id)
# Get the created taskdetail and check for default attributes
actual = b_api.taskdetail_get(td_id)
self.assertIsNotNone(actual)
self.assertIsNone(actual.state)
self.assertIsNone(actual.results)
self.assertIsNone(actual.exception)
self.assertIsNone(actual.stacktrace)
self.assertIsNone(actual.meta)
# Change the generic taskdetail's attributes
td.state = 'SUCCESS'
td.exception = 'ERROR'
td.stacktrace = 'STACKTRACE'
td.meta = 'META'
# Save the changed taskdetail
b_api.taskdetail_save(td)
# Get the updated taskdetail and check for updated attributes
actual = b_api.taskdetail_get(td_id)
self.assertEquals(actual.state, 'SUCCESS')
self.assertIsNone(actual.results)
self.assertEquals(actual.exception, 'ERROR')
self.assertEquals(actual.stacktrace, 'STACKTRACE')
self.assertEquals(actual.meta, 'META')
def test_taskdetail_delete(self):
# Get the taskdetail to delete
id = self.td_ids.pop()
td = b_api.taskdetail_get(id)
# Delete the desired taskdetail
b_api.taskdetail_delete(td)
self.td_names.pop()
# Check that the deleted taskdetail is no longer there
self.assertRaises(exception.NotFound, b_api.taskdetail_get,
id)
def test_taskdetail_get(self):
# Get the first taskdetail
actual = b_api.taskdetail_get(self.td_ids[0])
# Check that it is actually a taskdetail
self.assertIsInstance(actual, taskdetail.TaskDetail)
# Check that its name is what we expect
self.assertEquals(actual.name, self.td_names[0])
def test_taskdetail_update(self):
# Get the first taskdetail and check for default attributes
actual = b_api.taskdetail_get(self.td_ids[0])
self.assertIsNone(actual.state)
self.assertIsNone(actual.results)
self.assertIsNone(actual.exception)
self.assertIsNone(actual.stacktrace)
self.assertIsNone(actual.meta)
# Prepare attributes for updating
values = dict(state='SUCCESS', exception='ERROR',
stacktrace='STACKTRACE', meta='META')
# Update attributes
b_api.taskdetail_update(self.td_ids[0], values)
# Get the updated taskdetila and check for updated attributes
actual = b_api.taskdetail_get(self.td_ids[0])
self.assertEquals(actual.state, 'SUCCESS')
self.assertIsNone(actual.results)
self.assertEquals(actual.exception, 'ERROR')
self.assertEquals(actual.stacktrace, 'STACKTRACE')
self.assertEquals(actual.meta, 'META')
def test_taskdetail_get_ids_names(self):
# Get dict of uuids and names
actual = b_api.taskdetail_get_ids_names()
# Check that it matches our in-memory records
self.assertEquals(actual.values(), self.td_names)
self.assertEquals(actual.keys(), self.td_ids)

View File

@@ -215,6 +215,7 @@ class LinearFlowTest(unittest2.TestCase):
wf.reset()
wf.run({})
@unittest2.skip('')
def test_interrupt_flow(self):
wf = lw.Flow("the-int-action")

View File

@@ -1,242 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import functools
import threading
import unittest2
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import job
from taskflow import states
from taskflow.backends import memory
from taskflow.patterns import linear_flow as lw
from taskflow.patterns.resumption import logbook as lr
from taskflow.tests import utils
class MemoryBackendTest(unittest2.TestCase):
def _create_memory_impl(self, cons=1):
worker_group = []
poisons = []
for _i in range(0, cons):
poisons.append(threading.Event())
def killer():
for p in poisons:
p.set()
for t in worker_group:
t.join()
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
job_board = memory.MemoryJobBoard()
def runner(my_name, poison):
while not poison.isSet():
my_jobs = []
job_board.await(0.05)
for j in job_board.posted_after():
if j.owner is not None:
# Someone else took it.
continue
try:
j.claim(my_name)
my_jobs.append(j)
except exc.UnclaimableJobException:
pass
if my_jobs and poison.isSet():
# Oh crap, we need to unclaim and repost the jobs.
for j in my_jobs:
j.unclaim()
job_board.repost(j)
else:
# Set all jobs to pending before starting them
for j in my_jobs:
j.state = states.PENDING
for j in my_jobs:
# Create some dummy flow for the job
wf = lw.Flow('dummy')
for _i in range(0, 5):
wf.add(utils.null_functor)
tracker = lr.Resumption(j.logbook)
tracker.record_for(wf)
wf.resumer = tracker
j.state = states.RUNNING
wf.run(j.context)
j.state = states.SUCCESS
j.erase()
for i in range(0, cons):
t_name = "Thread-%s" % (i + 1)
t_runner = functools.partial(runner, t_name, poisons[i])
c = threading.Thread(name=t_name, target=t_runner)
c.daemon = True
worker_group.append(c)
c.start()
return (job_board, job_claimer, book_catalog, killer)
def test_job_working(self):
killer = None
job_board = None
book_catalog = None
try:
(job_board, job_claimer,
book_catalog, killer) = self._create_memory_impl()
j = job.Job("blah", {}, book_catalog, job_claimer)
job_board.post(j)
j.await()
self.assertEquals(0, len(job_board.posted_after()))
finally:
if killer:
killer()
utils.close_all(book_catalog, job_board)
def test_working_job_interrupted(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("the-int-job", {}, book_catalog, job_claimer)
self.assertEquals(states.UNCLAIMED, j.state)
j.claim("me")
self.assertEquals(states.CLAIMED, j.state)
self.assertEquals('me', j.owner)
wf = lw.Flow("the-int-action")
tracker = lr.Resumption(j.logbook)
tracker.record_for(wf)
wf.resumer = tracker
self.assertEquals(states.PENDING, wf.state)
call_log = []
@decorators.task
def do_1(context, *args, **kwargs):
call_log.append(1)
@decorators.task
def do_2(context, *args, **kwargs):
call_log.append(2)
@decorators.task
def do_interrupt(context, *args, **kwargs):
wf.interrupt()
task_1 = do_1
task_1_5 = do_interrupt
task_2 = do_2
wf.add(task_1)
wf.add(task_1_5) # Interrupt it after task_1 finishes
wf.add(task_2)
wf.run(j.context)
self.assertEquals(1, len(j.logbook))
self.assertEquals(2, len(j.logbook[wf.uuid]))
self.assertEquals(1, len(call_log))
wf.reset()
self.assertEquals(states.PENDING, wf.state)
tracker.record_for(wf)
wf.resumer = tracker
wf.run(j.context)
self.assertEquals(1, len(j.logbook))
self.assertEquals(3, len(j.logbook[wf.uuid]))
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
def test_working_job(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("the-line-job", {}, book_catalog, job_claimer)
self.assertEquals(states.UNCLAIMED, j.state)
j.claim("me")
self.assertEquals(states.CLAIMED, j.state)
self.assertEquals('me', j.owner)
wf = lw.Flow('the-line-action')
self.assertEquals(states.PENDING, wf.state)
tracker = lr.Resumption(j.logbook)
tracker.record_for(wf)
wf.resumer = tracker
call_log = []
@decorators.task
def do_1(context, *args, **kwargs):
call_log.append(1)
@decorators.task
def do_2(context, *args, **kwargs):
call_log.append(2)
wf.add(do_1)
wf.add(do_2)
wf.run(j.context)
self.assertEquals(1, len(j.logbook))
self.assertEquals(2, len(j.logbook[wf.uuid]))
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
def test_post_receive_job(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("test", {}, book_catalog, job_claimer)
# Hook up some simulated workers to said job-board.
job_board = memory.MemoryJobBoard()
receiver_awake = threading.Event()
work_items = []
def post_job():
job_board.post(j)
def work_on_job(j):
owner = 'me'
j.claim(owner)
def receive_job():
start = datetime.datetime.utcnow()
receiver_awake.set()
new_jobs = []
while not new_jobs:
job_board.await(0.5)
new_jobs = job_board.posted_after(start)
work_items.extend(new_jobs)
for j in work_items:
work_on_job(j)
poster = threading.Thread(target=post_job)
receiver = threading.Thread(target=receive_job)
receiver.start()
while not receiver_awake.isSet():
receiver_awake.wait()
poster.start()
for t in [poster, receiver]:
t.join()
self.assertEquals(1, len(work_items))
self.assertEquals(j.owner, 'me')

View File

@@ -1,459 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Import required libraries"""
import os
import unittest2
from os import path
from taskflow.db import api as db_api
from taskflow.db.sqlalchemy import models
from taskflow import exceptions
from taskflow import states
db_api.configure()
db_api.SQL_CONNECTION = 'sqlite:///test.db'
def setUpModule():
if not path.isfile('test.db'):
models.create_tables()
def tearDownModule():
os.remove('test.db')
"""
JobTest
"""
class JobTest(unittest2.TestCase):
wf_ids = []
wf_names = []
lb_ids = []
lb_names = []
job_ids = []
job_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{0}'
db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1')
db_api.job_create('', u'job_1', 1)
cls.job_ids.append(1)
cls.job_names.append(u'job_1')
for i in range(1, 10):
db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
db_api.job_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def tearDownClass(cls):
for name in cls.wf_names:
db_api.workflow_destroy('', name)
for id in cls.lb_ids:
db_api.logbook_destroy('', id)
for id in cls.job_ids:
db_api.job_destroy('', id)
cls.wf_ids = []
cls.wf_names = []
cls.lb_ids = []
cls.lb_names = []
cls.job_ids = []
cls.job_names = []
def test_job_get(self):
expected = self.job_names[0]
actual = db_api.job_get('', self.job_ids[0]).name
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.job_get, '', 9001)
def test_job_update(self):
db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED))
job = db_api.job_get('', 1)
expected = 'OwnerTest'
actual = job.owner
self.assertEquals(expected, actual)
expected = states.CLAIMED
actual = job.state
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.job_update, '', 9001,
dict(owner='OwnerTest', state=states.CLAIMED))
def test_job_add_workflow(self):
db_api.workflow_create('', u'workflow_10')
self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids
actual = []
temp = db_api.job_add_workflow('', 1, u'workflow_10')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.job_add_workflow, '',
9001, u'workflow_10')
self.assertRaises(exceptions.NotFound, db_api.job_add_workflow, '',
1, u'workflow_9001')
def test_job_get_owner(self):
actual = db_api.job_get_owner('', 1)
self.assertIsNone(actual)
self.assertRaises(exceptions.NotFound, db_api.job_get_owner, '', 9001)
def test_job_get_state(self):
expected = states.UNCLAIMED
actual = db_api.job_get_state('', 1)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.job_get_state, '', 9001)
def test_job_get_logbook(self):
expected = self.lb_names[0]
actual = db_api.job_get_logbook('', 1).name
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.job_get_logbook,
'', 9001)
def test_job_create(self):
id = 1
while (self.job_ids.count(id) > 0):
id = id + 1
db_api.job_create('', u'job_{0}'.format(id), id)
self.job_ids.append(id)
self.job_names.append(u'job_{0}'.format(id))
actual = db_api.job_get('', id)
self.assertIsNotNone(actual)
def test_job_destroy(self):
id = self.job_ids.pop()
db_api.job_destroy('', id)
self.job_names.pop()
self.assertRaises(exceptions.NotFound, db_api.job_get, '', id)
"""
LogBookTest
"""
class LogBookTest(unittest2.TestCase):
wf_ids = []
wf_names = []
lb_ids = []
lb_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{0}'
db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1')
for i in range(1, 10):
db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def tearDownClass(cls):
for name in cls.wf_names:
db_api.workflow_destroy('', name)
for id in cls.lb_ids:
db_api.logbook_destroy('', id)
cls.wf_ids = []
cls.wf_names = []
cls.lb_ids = []
cls.lb_names = []
def test_logbook_get(self):
expected = self.lb_names[0]
actual = db_api.logbook_get('', self.lb_ids[0]).name
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.logbook_get, '', 9001)
def test_logbook_get_by_name(self):
expected = [self.lb_ids[0]]
actual = []
for logbook in db_api.logbook_get_by_name('', self.lb_names[0]):
actual.append(logbook.id)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.logbook_get_by_name, '',
u'logbook_9001')
def test_logbook_create(self):
id = 1
while (self.lb_ids.count(id) > 0):
id = id + 1
db_api.logbook_create('', u'logbook_{0}'.format(id), id)
self.lb_ids.append(id)
self.lb_names.append(u'logbook_{0}'.format(id))
actual = db_api.logbook_get('', id)
self.assertIsNotNone(actual)
def test_logbook_get_workflows(self):
expected = self.wf_ids
actual = []
wfs = db_api.logbook_get_workflows('', self.lb_ids[0])
for workflow in wfs:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.logbook_get_workflows,
'', 9001)
def test_logbook_add_workflow(self):
db_api.workflow_create('', u'workflow_10')
self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids
actual = []
temp = db_api.logbook_add_workflow('', 1, u'workflow_10')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.logbook_add_workflow, '',
9001, u'workflow_10')
self.assertRaises(exceptions.NotFound, db_api.logbook_add_workflow, '',
1, u'workflow_9001')
def test_logbook_destroy(self):
id = self.lb_ids.pop()
db_api.logbook_destroy('', id)
self.lb_names.pop()
self.assertRaises(exceptions.NotFound, db_api.logbook_get, '', id)
"""
WorkflowTest
"""
class WorkflowTest(unittest2.TestCase):
tsk_ids = []
tsk_names = []
wf_ids = []
wf_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{0}'
tsk_fmt = u'task_{0}'
for i in range(1, 10):
db_api.workflow_create('', wf_fmt.format(i))
db_api.task_create('', tsk_fmt.format(i), i, i)
db_api.workflow_add_task('', wf_fmt.format(i), i)
cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def teardownClass(cls):
for id in cls.tsk_ids:
db_api.task_destroy('', id)
for name in cls.wf_names:
db_api.workflow_destroy('', name)
cls.tsk_ids = []
cls.tsk_names = []
cls.wf_ids = []
cls.wf_names = []
def test_workflow_get(self):
expected = self.wf_ids[0]
actual = db_api.workflow_get('', self.wf_names[0]).id
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.workflow_get, '',
u'workflow_9001')
def test_workflow_get_all(self):
expected = self.wf_ids
actual = []
temp = db_api.workflow_get_all('')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
def test_workflow_get_names(self):
expected = []
for name in self.wf_names:
expected.append(name)
expected = tuple(expected)
expected = [expected]
actual = db_api.workflow_get_names('')
self.assertEquals(expected, actual)
def test_workflow_get_tasks(self):
expected = [self.tsk_names[0], self.tsk_names[9]]
actual = []
temp = db_api.workflow_get_tasks('', u'workflow_1')
for task in temp:
actual.append(task.name)
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.workflow_get_tasks, '',
u'workflow_9001')
def test_workflow_add_task(self):
db_api.task_create('', u'task_10', 1, 10)
db_api.workflow_add_task('', u'workflow_1', 10)
self.tsk_ids.append(10)
self.tsk_names.append('task_10')
expected = [self.tsk_names[0], self.tsk_names[9]]
tsks = db_api.workflow_get_tasks('', u'workflow_1')
actual = [tsks[0].name, tsks[1].name]
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.workflow_add_task, '',
u'workflow_9001', 10)
self.assertRaises(exceptions.NotFound, db_api.workflow_add_task, '',
u'workflow_1', 9001)
def test_workflow_create(self):
id = 0
while (self.wf_ids.count(id) > 0):
id = id + 1
db_api.workflow_create('', u'workflow_{0}'.format(id))
self.wf_ids.append(id)
self.wf_names.append(u'workflow_{0}'.format(id))
self.assertIsNotNone(db_api.workflow_get('',
u'workflow_{0}'.format(id)))
def test_workflow_destroy(self):
name = self.wf_names.pop()
db_api.workflow_destroy('', name)
self.wf_ids.pop()
self.assertRaises(exceptions.NotFound, db_api.workflow_get, '', name)
"""
TaskTest
"""
class TaskTest(unittest2.TestCase):
tsk_ids = []
tsk_names = []
@classmethod
def setUpClass(cls):
tsk_fmt = u'task_{0}'
for i in range(1, 10):
db_api.task_create('', tsk_fmt.format(i), i, i)
cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i))
@classmethod
def teardownClass(cls):
for id in cls.tsk_ids:
db_api.task_destroy('', id)
cls.tsk_ids = []
cls.tsk_names = []
def test_task_get(self):
expected = self.tsk_names[0]
actual = db_api.task_get('', self.tsk_ids[0])
self.assertEquals(expected, actual.name)
self.assertRaises(exceptions.NotFound, db_api.task_get, '', 9001)
def test_task_create(self):
id = 1
while (self.tsk_ids.count(id) > 0):
id = id + 1
db_api.task_create('', u'task_{0}'.format(id), 1, id)
self.tsk_ids.append(id)
self.tsk_names.append(u'task_{0}'.format(id))
self.assertIsNotNone(db_api.task_get('', id))
def test_task_update(self):
db_api.task_update('', 1, dict(exception='ExceptionTest',
stacktrace='StacktraceTest'))
task = db_api.task_get('', 1)
expected = 'ExceptionTest'
actual = task.exception
self.assertEquals(expected, actual)
expected = 'StacktraceTest'
actual = task.stacktrace
self.assertEquals(expected, actual)
self.assertRaises(exceptions.NotFound, db_api.task_update, '', 9001,
dict(exception='ExceptionTest',
stacktrace='StacktraceTest'))
def test_task_destroy(self):
id = self.tsk_ids.pop()
db_api.task_destroy('', id)
self.tsk_names.pop()
self.assertRaises(exceptions.NotFound, db_api.task_get, '', id)

View File

@@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import task
from taskflow.generics import task
ARGS_KEY = '__args__'
KWARGS_KEY = '__kwargs__'
@@ -34,6 +34,11 @@ def null_functor(*args, **kwargs): # pylint: disable=W0613
return None
def drain(lst):
while len(lst):
lst.pop()
class ProvidesRequiresTask(task.Task):
def __init__(self, name, provides, requires):
super(ProvidesRequiresTask, self).__init__(name)
@@ -51,3 +56,11 @@ class ProvidesRequiresTask(task.Task):
for v in self.provides:
outs[v] = True
return outs
class DummyTask(task.Task):
def __init__(self, name, task_id=None):
super(DummyTask, self).__init__(name, task_id)
def __call__(self, context, *args, **kwargs):
pass

View File

@@ -3,6 +3,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -446,19 +447,83 @@ class LazyPluggable(object):
self.__backend = None
def __get_backend(self):
if not self.__backend:
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
backend_name = 'memory'
self.__backend = __import__(name, None, None, fromlist)
if self.__pivot == 'db_backend':
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def set_pivot(self, pivot):
self.__pivot = pivot
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)
class LockingDict(object):
"""This class is designed to provide threadsafe element access in the form
of a dictionary.
"""
def __init__(self):
"""Constructor"""
self._container = {}
self._lock = ReaderWriterLock()
def __getitem__(self, key):
"""Return one item referenced by key"""
with self._lock.acquire(read=True):
retVal = self._container[key]
return retVal
def __setitem__(self, key, value):
"""Set one item referenced by key to value"""
with self._lock.acquire(read=False):
self._container[key] = value
def __delitem__(self, key):
"""Delete the item referenced by key"""
with self._lock.acquire(read=False):
del self._container[key]
def __contains__(self, item):
"""Check if the item is contained by this dict"""
with self._lock.acquire(read=True):
return item in self._container
def keys(self):
"""Return a list of the keys in a threadsafe manner"""
retVal = []
with self._lock.acquire(read=True):
return list(self._container.iterkeys())
return retVal
def values(self):
"""Return a list of the values in a threadsafe manner"""
retVal = []
with self._lock.acquire(read=True):
return list(self._container.itervalues())
return retVal
def items(self):
"""Return a threadsafe list of the items"""
retVal = []
with self._lock.acquire(read=True):
for item in self._container.items():
retVal.append(item)
return retVal