Almost done with sqlalchemy api

This commit is contained in:
Jessica Lucci
2013-05-21 22:52:59 -05:00
parent 229855aa1a
commit 941a48d58b
10 changed files with 189 additions and 16 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -25,7 +25,8 @@ from oslo.config import cfg
db_opts = [
cfg.StrOpt('sql_connection',
default='mysql://task:flow@localhost/taskflow',
#default='mysql://task:flow@localhost/taskflow',
default='sqlite:///test.db',
help='The SQLAlchemy connection string used to connect to the '
'database'),
cfg.IntOpt('sql_idle_timeout',

Binary file not shown.

Binary file not shown.

View File

@@ -33,3 +33,171 @@ def model_query(context, *args, **kwargs):
query = session.query(*args)
return query
"""
LOGBOOK
"""
def logbook_get(context, lb_id, session=None):
"""Return a logbook with matching lb_id"""
query = model_query(context, models.LogBook, session=session).\
filter_by(logbook_id=lb_id)
if not query:
raise exception.NotFound("No LogBook found with id "
"%s." % (lb_id,))
return query.first()
def logbook_get_by_name(context, lb_name):
"""Return all logbooks with matching name"""
query = model_query(context, models.LogBook).\
filter_by(name=lb_name)
if not query:
raise exception.NotFound("LogBook %s not found."
% (lb_name,))
return query.all()
def logbook_create(context, name, lb_id=None):
"""Create a new logbook"""
lb_ref = models.LogBook()
lb_ref.name = name
if lb_id:
lb_ref.logbook_id = lb_id
lb_ref.save()
return lb_ref
def logbook_get_workflows(context, lb_id):
"""Return all workflows associated with a logbook"""
lb = logbook_get(context, lb_id)
return lb.workflows
def logbook_add_workflow(context, lb_id, wf_name):
"""Add Workflow to given LogBook"""
session = get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
lb = logbook_get(context, lb_id, session=session)
lb.workflows.append(wf)
return lb.workflows
"""
JOB
"""
def job_get(context, job_id, session=None):
"""Return Job with matching job_id"""
query = model_query(context, models.Workflow, session=session).\
filter_by(job_id=job_id)
if not query:
raise exception.NotFound("No Job with id %s found"
% (job_id,))
"""
WORKFLOW
"""
def workflow_get(context, wf_name, session=None):
"""Return one workflow with matching workflow_id"""
query = model_query(context, models.Workflow, session=session).\
filter_by(name=wf_name)
if not query:
raise exception.NotFound("Workflow %s not found." % (wf_name,))
return query.first()
def workflow_get_all(context):
"""Return all workflows"""
results = model_query(context, models.Workflow).all()
if not results:
raise exception.NotFound("No Workflows were found.")
return results
def workflow_get_names(context):
"""Return all workflow names"""
results = model_query(context, models.Workflow.name).all()
return zip(*results)
def workflow_get_tasks(context, wf_name):
"""Return all tasks for a given Workflow"""
wf = workflow_get(context, wf_name)
return wf.tasks
def workflow_add_task(context, wf_id, task_id):
"""Add a task to a given workflow"""
session = get_session()
with session.begin():
task = task_get(context, task_id, session=session)
wf = workflow_get(context, wf_id, session=session)
wf.tasks.append(task)
return wf.tasks
def workflow_create(context, workflow_name):
"""Create new workflow with workflow_id"""
workflow_ref = models.Workflow()
workflow_ref.name = workflow_name
workflow_ref.save()
return workflow_ref
def workflow_destroy(context, wf_id):
"""Delete a given Workflow"""
session = get_session()
with session.begin():
wf = workflow_get(context, wf_id, session=session)
wf.delete()
"""
TASK
"""
def task_get(context, task_id, session=None):
"""Return Task with task_id"""
result = model_query(context, models.Task, session=session).\
filter_by(task_id=task_id)
if not result:
raise exception.NotFound("No Task found with id "
"%s." % (task_id,))
return result
def task_create(context, task_name, wf_id, task_id=None):
"""Create task associated with given workflow"""
task_ref = models.Task()
task_ref.name = task_name
task_ref.wf_id = wf_id
if task_id:
task_ref.task_id = task_id
task_ref.save()
return task_ref
def task_update(context, task_id, values):
"""Update Task with given values"""
session = get_session()
with session.begin():
task = task_get(context, task_id)
task.update(values)
task.save()
def task_destroy(context, task_id):
"""Delete an existing Task"""
session = get_session()
with session.begin():
task = task_get(context, task_id, session=session)
task.delete()

View File

@@ -22,7 +22,7 @@ SQLAlchemy models for taskflow data.
from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy import Column, Integer, String, Table, MetaData
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import object_mapper, relationship, backref
@@ -31,13 +31,12 @@ from sqlalchemy import types as types
from json import dumps, loads
from taskflow.db.sqlalchemy.session import get_session
from taskflow.db.sqlalchemy.session import get_session, get_engine
from taskflow.openstack.common import timeutils, uuidutils
CONF = cfg.CONF
BASE = declarative_base()
class Json(types.TypeDecorator, types.MutableType):
impl = types.Text
@@ -128,6 +127,8 @@ class LogBook(BASE, TaskFlowBase):
__tablename__ = 'logbook'
id = Column(Integer, primary_key=True)
logbook_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
workflows = relationship("Workflow",
secondary=workflow_logbook_assoc)
@@ -138,7 +139,8 @@ class Job(BASE, TaskFlowBase):
__tablename__ = 'job'
id = Column(Integer, primary_key=True)
job_id = Column(String, default=uuidutils.generate_uuid)
job_id = Column(String, default=uuidutils.generate_uuid,
unique=True)
name = Column(String)
owner = Column(String)
state = Column(String)
@@ -152,8 +154,7 @@ class Workflow(BASE, TaskFlowBase):
__tablename__ = 'workflow'
id = Column(Integer, primary_key=True)
workflow_id = Column(String, default=uuidutils.generate_uuid)
name = Column(String)
name = Column(String, unique=True)
tasks = relationship("Task", backref="workflow")
class Task(BASE, TaskFlowBase):
@@ -168,3 +169,6 @@ class Task(BASE, TaskFlowBase):
exception = Column(String)
stacktrace = Column(String)
workflow_id = Column(String, ForeignKey('workflow.id'))
def create_tables():
BASE.metadata.create_all(get_engine())

View File

@@ -78,17 +78,17 @@ def get_engine():
"convert_unicode": True
}
if "sqlite" in connection_dict.drivername:
engine_args['poolclass'] = NullPool
if "sqlite" in connection_dict.drivername:
engine_args['poolclass'] = NullPool
_ENGINE = sqlalchemy.create_engine(_get_sql_connection(),
**engine_args)
_ENGINE = sqlalchemy.create_engine(_get_sql_connection(),
**engine_args)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
if 'sqlite' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
if 'sqlite' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
#TODO: Check to make sure engine connected

BIN
taskflow/test.db Normal file

Binary file not shown.