Merge pull request #20 from jessicalucci/sql

DB API / SQL Backend / Util files / Configs / Basic Celery
This commit is contained in:
Joshua Harlow
2013-05-22 17:42:59 -07:00
20 changed files with 1203 additions and 1 deletions

View File

@@ -10,7 +10,7 @@ def read_requires(base):
if not os.path.isfile(path): if not os.path.isfile(path):
return requires return requires
with open(path, 'rb') as h: with open(path, 'rb') as h:
for line in h.read.splitlines(): for line in h.read().splitlines():
line = line.strip() line = line.strip()
if len(line) == 0 or line.startswith("#"): if len(line) == 0 or line.startswith("#"):
continue continue

View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import traceback as tb
from celery.signals import task_failure, task_success
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@task_failure.connect
def task_error_handler(signal=None, sender=None, task_id=None,
exception=None, args=None, kwargs=None,
traceback=None, einfo=None):
""" If a task errors out, log all error info """
LOG.error('Task %s, id: %s, called with args: %s, and kwargs: %s'
'failed with exception: %s' % (sender.name, task_id,
args, kwargs, exception))
LOG.error('Trackeback: %s' % (tb.print_tb(traceback), ))
wf = sender.name.split('.')[0]
task = ('.').join(n for n in (sender.name.split('.')[1:]) if n)
#logbook.update_task(wf, task, status="ERROR", args=args, kwargs=kwargs,
# exception=exception, traceback=(tb.print_tb(traceback)))
# TODO: Auto-initiate rollback from failed task
@task_success.connect
def task_success_handler(singal=None, sender=None, result=None):
""" Save task results to WF """
wf = sender.name.split('.')[0]
task = ('.').join(n for n in (sender.name.split('.')[1:]) if n)
#logbook.update_task(wf, task, status="SUCCESS", result=result)

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Celery Configuration File """
from taskflow.common import config
config.register_celery_opts()
from oslo.cfg import cfg
LOG = logging.getLogger(__name__)
BROKER_URL = cfg.CFG('celery_mq')
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = cfg.CFG('celery_backend')

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

50
taskflow/common/config.py Normal file
View File

@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routines for configuring TaskFlow
"""
from oslo.config import cfg
db_opts = [
cfg.StrOpt('sql_connection',
default='mysql://task:flow@localhost/taskflow',
#default='sqlite:///test.db',
help='The SQLAlchemy connection string used to connect to the '
'database'),
cfg.IntOpt('sql_idle_timeout',
default=3600,
help='timeout before idle sql connections are reaped')]
celery_opts = [
cfg.StrOpt('celery_backend',
default='mysql://task:flow@localhost/taskflow',
help='The SQLAlchemy connection string used to connect to the '
'celery backend'),
cfg.StrOpt('celery_MQ',
default='mongodb://task:flow@localhost:27017/taskflow',
help='The MongoDB connection string used to connect to the '
'celery message queue')]
def register_db_opts():
cfg.CONF.register_opts(db_opts)
def register_celery_opts():
cfg.CONF.register_opts(celery_opts)

18
taskflow/db/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

131
taskflow/db/api.py Normal file
View File

@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy Backend"""
from oslo.config import cfg
from taskflow.common import config
from taskflow import utils
db_opts = [
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db')]
CONF = cfg.CONF
CONF.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='taskflow.db.sqlalchemy.api')
def configure():
global SQL_CONNECTION
global SQL_IDLE_TIMEOUT
config.register_db_opts()
SQL_CONNECTION = cfg.CONF.sql_connection
SQL_IDLE_TIMEOUT = cfg.CONF.sql_idle_timeout
"""
LOGBOOK
"""
def logbook_get(context, lb_id):
return IMPL.logbook_get(context, lb_id)
def logbook_get_by_name(context, lb_name):
return IMPL.logbook_get_by_name(context, lb_name)
def logbook_create(context, lb_name, lb_id=None):
return IMPL.logbook_create(context, lb_name, lb_id)
def logbook_get_workflows(context, lb_id):
return IMPL.logbook_get_workflows(context, lb_id)
def logbook_add_workflow(context, lb_id, wf_name):
return IMPL.logbook_add_workflow(context, lb_id, wf_name)
def logbook_destroy(context, lb_id):
return IMPL.logbook_destroy(context, lb_id)
"""
JOB
"""
def job_get(context, job_id):
return IMPL.job_get(context, job_id)
def job_update(context, job_id, values):
return IMPL.job_update(context, job_id, values)
def job_add_workflow(context, job_id, wf_id):
return IMPL.job_add_workflow(context, job_id, wf_id)
def job_get_owner(context, job_id):
return IMPL.job_get_owner(context, job_id)
def job_get_state(context, job_id):
return IMPL.job_get_state(context, job_id)
def job_get_logbook(context, job_id):
return IMPL.job_get_logbook(context, job_id)
def job_destroy(context, job_id):
return IMPL.job_destroy(context, job_id)
"""
WORKFLOW
"""
def workflow_get(context, wf_name):
return IMPL.workflow_get(context, wf_name)
def workflow_get_all(context):
return IMPL.workflow_get_all(context)
def workflow_get_names(context):
return IMPL.workflow_get_names(context)
def workflow_get_tasks(context, wf_name):
return IMPL.workflow_get_tasks(context, wf_name)
def workflow_add_task(context, wf_name, task_id):
return IMPL.workflow_add_task(context, wf_name, task_id)
def workflow_create(context, wf_name):
return IMPL.workflow_create(context, wf_name)
def workflow_destroy(context, wf_name):
return IMPL.workflow_destroy(context, wf_name)
"""
TASK
"""
def task_get(context, task_id):
return IMPL.task_get(context, task_id)
def task_create(context, task_name, wf_id, task_id=None):
return IMPL.task_create(context, task_name, wf_id, task_id)
def task_update(context, task_id, values):
return IMPL.task_update(context, task_id, values)
def task_destroy(context, task_id):
return IMPL.task_destroy(context, task_id)

40
taskflow/db/base.py Normal file
View File

@@ -0,0 +1,40 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for classes that need modular database access."""
from oslo.config import cfg
from taskflow.openstack.common import importutils
db_driver_opt = cfg.StrOpt('db_driver',
default='taskflow.db',
help='driver to use for database access')
CONF = cfg.CONF
CONF.register_opt(db_driver_opt)
class Base(object):
"""DB driver is injected in the init method."""
def __init__(self, db_driver=None):
if not db_driver:
db_driver = CONF.db_driver
self.db = importutils.import_module(db_driver) # pylint: disable=C0103

18
taskflow/db/migration.py Normal file
View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import logging
from taskflow.db.sqlalchemy import models
from taskflow.db.sqlalchemy.session import get_session
from taskflow.openstack.common import exception
LOG = logging.getLogger(__name__)
def model_query(context, *args, **kwargs):
session = kwargs.get('session') or get_session()
query = session.query(*args)
return query
"""
LOGBOOK
"""
def logbook_get(context, lb_id, session=None):
"""Return a logbook with matching lb_id"""
query = model_query(context, models.LogBook, session=session).\
filter_by(logbook_id=lb_id)
if not query:
raise exception.NotFound("No LogBook found with id "
"%s." % (lb_id,))
return query.first()
def logbook_get_by_name(context, lb_name):
"""Return all logbooks with matching name"""
query = model_query(context, models.LogBook).\
filter_by(name=lb_name)
if not query:
raise exception.NotFound("LogBook %s not found."
% (lb_name,))
return query.all()
def logbook_create(context, name, lb_id=None):
"""Create a new logbook"""
lb_ref = models.LogBook()
lb_ref.name = name
if lb_id:
lb_ref.logbook_id = lb_id
lb_ref.save()
return lb_ref
def logbook_get_workflows(context, lb_id):
"""Return all workflows associated with a logbook"""
lb = logbook_get(context, lb_id)
return lb.workflows
def logbook_add_workflow(context, lb_id, wf_name):
"""Add Workflow to given LogBook"""
session = get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
lb = logbook_get(context, lb_id, session=session)
lb.workflows.append(wf)
return lb.workflows
def logbook_destroy(context, lb_id):
"""Delete a given LogBook"""
session = get_session()
with session.begin():
lb = logbook_get(context, lb_id, session=session)
lb.delete()
"""
JOB
"""
def job_get(context, job_id, session=None):
"""Return Job with matching job_id"""
query = model_query(context, models.Workflow, session=session).\
filter_by(job_id=job_id)
if not query:
raise exception.NotFound("No Job with id %s found"
% (job_id,))
return query.first()
def job_update(context, job_id, values):
"""Update job with given values"""
session = get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.update(values)
job.save(session=session)
def job_add_workflow(context, job_id, wf_id):
"""Add a Workflow to given job"""
session = get_session()
with session.begin():
job = job_get(context, job_id)
wf = workflow_get(context, wf_id)
job.workflows.append(wf)
return job.workflows
def job_get_owner(context, job_id):
"""Return a job's current owner"""
job = job_get(context, job_id)
return job.owner
def job_get_state(context, job_id):
"""Return a job's current owner"""
job = job_get(context, job_id)
return job.state
def job_get_logbook(context, job_id):
"""Return the logbook associated with the given job"""
job = job_get(context, job_id)
return job.logbook
def job_destroy(context, job_id):
"""Delete a given Job"""
session = get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.delete()
"""
WORKFLOW
"""
def workflow_get(context, wf_name, session=None):
"""Return one workflow with matching workflow_id"""
query = model_query(context, models.Workflow, session=session).\
filter_by(name=wf_name)
if not query:
raise exception.NotFound("Workflow %s not found." % (wf_name,))
return query.first()
def workflow_get_all(context):
"""Return all workflows"""
results = model_query(context, models.Workflow).all()
if not results:
raise exception.NotFound("No Workflows were found.")
return results
def workflow_get_names(context):
"""Return all workflow names"""
results = model_query(context, models.Workflow.name).all()
return zip(*results)
def workflow_get_tasks(context, wf_name):
"""Return all tasks for a given Workflow"""
wf = workflow_get(context, wf_name)
return wf.tasks
def workflow_add_task(context, wf_id, task_id):
"""Add a task to a given workflow"""
session = get_session()
with session.begin():
task = task_get(context, task_id, session=session)
wf = workflow_get(context, wf_id, session=session)
wf.tasks.append(task)
return wf.tasks
def workflow_create(context, workflow_name):
"""Create new workflow with workflow_id"""
workflow_ref = models.Workflow()
workflow_ref.name = workflow_name
workflow_ref.save()
return workflow_ref
def workflow_destroy(context, wf_name):
"""Delete a given Workflow"""
session = get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
wf.delete()
"""
TASK
"""
def task_get(context, task_id, session=None):
"""Return Task with task_id"""
result = model_query(context, models.Task, session=session).\
filter_by(task_id=task_id)
if not result:
raise exception.NotFound("No Task found with id "
"%s." % (task_id,))
return result
def task_create(context, task_name, wf_id, task_id=None):
"""Create task associated with given workflow"""
task_ref = models.Task()
task_ref.name = task_name
task_ref.wf_id = wf_id
if task_id:
task_ref.task_id = task_id
task_ref.save()
return task_ref
def task_update(context, task_id, values):
"""Update Task with given values"""
session = get_session()
with session.begin():
task = task_get(context, task_id)
task.update(values)
task.save(session=session)
def task_destroy(context, task_id):
"""Delete an existing Task"""
session = get_session()
with session.begin():
task = task_get(context, task_id, session=session)
task.delete()

View File

@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for taskflow data.
"""
from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table, MetaData
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import object_mapper, relationship, backref
from sqlalchemy import DateTime, ForeignKey
from sqlalchemy import types as types
from json import dumps, loads
from taskflow.db.sqlalchemy.session import get_session, get_engine
from taskflow.openstack.common import timeutils, uuidutils
CONF = cfg.CONF
BASE = declarative_base()
class Json(types.TypeDecorator, types.MutableType):
impl = types.Text
def process_bind_param(self, value, dialect):
return dumps(value)
def process_result_value(self, value, dialect):
return loads(value)
class TaskFlowBase(object):
"""Base class for TaskFlow Models."""
__table_args__ = {'mysql_engine':'InnoDB'}
__table_initialized = False
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, default=timeutils.utcnow)
def save(self, session=None):
"""Save this object."""
if not session:
session = get_session()
session.add(self)
try:
session.flush()
except IntegrityError, e:
if str(e).endswith('is not unique'):
raise exception.Duplicate(str(e))
else:
raise
def delete(self, session=None):
"""Delete this object."""
self.deleted = True
self.deleted_at = timeutils.utcnow()
if not session:
session = get_session()
session.delete(self)
session.flush()
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
self._i = iter(object_mapper(self).columns)
return self
def next(self):
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict"""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict
Includes attributes from joins."""
local = dict(self)
joined = dict([k, v] for k, v in self.__dict__.iteritems()
if not k[0] == '_')
local.update(joined)
return local.iteritems()
workflow_logbook_assoc = Table('wf_lb_assoc', BASE.metadata,
Column('workflow_id', Integer, ForeignKey('workflow.id')),
Column('logbook_id', Integer, ForeignKey('logbook.id')),
Column('id', Integer, primary_key=True)
)
workflow_job_assoc = Table('wf_job_assoc', BASE.metadata,
Column('workflow_id', Integer, ForeignKey('workflow.id')),
Column('job_id', Integer, ForeignKey('job.id')),
Column('id', Integer, primary_key=True)
)
class LogBook(BASE, TaskFlowBase):
"""Represents a logbook for a set of workflows"""
__tablename__ = 'logbook'
id = Column(Integer, primary_key=True)
logbook_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
workflows = relationship("Workflow",
secondary=workflow_logbook_assoc)
job = relationship("Job", uselist=False, backref="logbook")
class Job(BASE, TaskFlowBase):
"""Represents a Job"""
__tablename__ = 'job'
id = Column(Integer, primary_key=True)
job_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
owner = Column(String)
state = Column(String)
workflows = relationship("Workflow",
secondary=workflow_job_assoc)
logbook_id = Column(String, ForeignKey('logbook.logbook_id')
class Workflow(BASE, TaskFlowBase):
"""Represents Workflow detail objects"""
__tablename__ = 'workflow'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
tasks = relationship("Task", backref="workflow")
class Task(BASE, TaskFlowBase):
"""Represents Task detail objects"""
__tablename__ = 'task'
id = Column(Integer, primary_key=True)
task_id = Column(String, default=uuidutils.generate_uuid)
name = Column(String)
results = Column(Json)
exception = Column(String)
stacktrace = Column(String)
workflow_id = Column(String, ForeignKey('workflow.id'))
def create_tables():
BASE.metadata.create_all(get_engine())

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend."""
import logging
import sqlalchemy.orm
import sqlalchemy.engine
import sqlalchemy.interfaces
import sqlalchemy
from sqlalchemy.pool import NullPool
from taskflow.db import api as db_api
LOG = logging.getLogger(__name__)
_ENGINE = None
_MAKER = None
def get_session(autocommit=True, expire_on_commit=True):
"""Return a SQLAlchemy session."""
global _MAKER
if _MAKER is None:
_MAKER = get_maker(get_engine(), autocommit, expire_on_commit)
return _MAKER()
def synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode"""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise DisconnectionError("Database server went away")
else:
raise
def get_engine():
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
connection_dict = sqlalchemy.engine.url.make_url(_get_sql_connection())
engine_args = {
"pool_recycle": _get_sql_idle_timeout(),
"echo": False,
"convert_unicode": True
}
if "sqlite" in connection_dict.drivername:
engine_args['poolclass'] = NullPool
_ENGINE = sqlalchemy.create_engine(_get_sql_connection(),
**engine_args)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
if 'sqlite' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
#TODO: Check to make sure engine connected
return _ENGINE
def get_maker(engine, autocommit=True, expire_on_commit=False):
"Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)
def _get_sql_connection():
return db_api.SQL_CONNECTION
def _get_sql_idle_timeout():
return db_api.SQL_IDLE_TIMEOUT

View File

@@ -0,0 +1,31 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exceptions common to OpenStack projects
"""
import logging
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)
class NotFound(Error):
pass

View File

@@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Import related utilities and helper functions.
"""
import sys
import traceback
def import_class(import_str):
"""Returns a class from a string including module and class"""
mod_str, _sep, class_str = import_str.rpartition('.')
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))
def import_object(import_str, *args, **kwargs):
"""Import a class and return an instance of it."""
return import_class(import_str)(*args, **kwargs)
def import_object_ns(name_space, import_str, *args, **kwargs):
"""
Import a class and return an instance of it, first by trying
to find the class in a default namespace, then failing back to
a full path if not found in the default namespace.
"""
import_value = "%s.%s" % (name_space, import_str)
try:
return import_class(import_value)(*args, **kwargs)
except ImportError:
return import_class(import_str)(*args, **kwargs)
def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@@ -0,0 +1,33 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import datetime
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None

View File

@@ -0,0 +1,27 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
UUID related utilities and helper functions.
"""
import uuid
def generate_uuid():
return str(uuid.uuid4())

View File

@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow import logbook
from celery import chord
LOG = logging.getLogger(__name__)
class Flow(object):
"""A linear chain of independent tasks that can be applied as one unit or
rolled back as one unit."""
def __init__(self, name, tolerant=False, parents=None):
self.name = name
self.root = None
self._tasks = []
logbook.add_workflow(name)
def chain_listeners(self, context, initial_task, callback_task):
""" Register one listener for a task """
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
self._tasks.append(initial_task)
LOG.info('WF %s root task set to %s' % (self.name, initial_task.name))
callback_task.name = '%s.%s' % (self.name, callback_task.name)
self._tasks.append(callback_task)
initial_task.link(callback_task.s(context))
def split_listeners(self, context, initial_task, callback_tasks):
""" Register multiple listeners for one task """
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
self._tasks.append(initial_task)
LOG.info('WF %s root task set to %s' % (self.name, initial_task.name))
for task in callback_tasks:
task.name = '%s.%s' % (self.name, task.name)
self._tasks.append(task)
initial_task.link(task.s(context))
def merge_listeners(self, context, inital_tasks, callback_task):
""" Register one listener for multiple tasks """
header = []
if self.root is None:
self.root = []
for task in initial_tasks:
task.name = '%s.%s' % (self.name, task.name)
self._tasks.append(task)
header.append(task.s(context))
if isinstance(self.root, list):
self.root.append(task.s(context))
LOG.info('WF %s added root task %s' %
(self.name, task.name))
callback_task.name = '%s.%s' % (self.name, callback_task.name)
self._tasks.append(callback_task)
#TODO: Need to set up chord so that it's not executed immediately
c = chord(header, body=callback_task)
def run(self, context, *args, **kwargs):
""" Start root task and kick off workflow """
self.root(context)
LOG.info('WF %s has been started' % (self.name,))

View File

@@ -113,3 +113,30 @@ class ReaderWriterLock(object):
self.readers_ok.acquire() self.readers_ok.acquire()
self.readers_ok.notifyAll() self.readers_ok.notifyAll()
self.readers_ok.release() self.readers_ok.release()
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
def __get_backend(self):
if not self.__backend:
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)