diff --git a/taskflow/backends/celery/.celeryapp.py.swp b/taskflow/backends/celery/.celeryapp.py.swp new file mode 100644 index 00000000..09e4e67a Binary files /dev/null and b/taskflow/backends/celery/.celeryapp.py.swp differ diff --git a/taskflow/backends/celery/.celeryconfig.py.swp b/taskflow/backends/celery/.celeryconfig.py.swp index 7aac29e4..db942e2d 100644 Binary files a/taskflow/backends/celery/.celeryconfig.py.swp and b/taskflow/backends/celery/.celeryconfig.py.swp differ diff --git a/taskflow/common/.config.py.swp b/taskflow/common/.config.py.swp new file mode 100644 index 00000000..26ac485a Binary files /dev/null and b/taskflow/common/.config.py.swp differ diff --git a/taskflow/common/config.py b/taskflow/common/config.py index 27d2cd3c..ab075725 100644 --- a/taskflow/common/config.py +++ b/taskflow/common/config.py @@ -25,7 +25,8 @@ from oslo.config import cfg db_opts = [ cfg.StrOpt('sql_connection', - default='mysql://task:flow@localhost/taskflow', + #default='mysql://task:flow@localhost/taskflow', + default='sqlite:///test.db', help='The SQLAlchemy connection string used to connect to the ' 'database'), cfg.IntOpt('sql_idle_timeout', diff --git a/taskflow/db/sqlalchemy/.api.py.swp b/taskflow/db/sqlalchemy/.api.py.swp new file mode 100644 index 00000000..b3138489 Binary files /dev/null and b/taskflow/db/sqlalchemy/.api.py.swp differ diff --git a/taskflow/db/sqlalchemy/.models.py.swp b/taskflow/db/sqlalchemy/.models.py.swp new file mode 100644 index 00000000..74b81cca Binary files /dev/null and b/taskflow/db/sqlalchemy/.models.py.swp differ diff --git a/taskflow/db/sqlalchemy/api.py b/taskflow/db/sqlalchemy/api.py index c6163e5c..63305d53 100644 --- a/taskflow/db/sqlalchemy/api.py +++ b/taskflow/db/sqlalchemy/api.py @@ -33,3 +33,171 @@ def model_query(context, *args, **kwargs): query = session.query(*args) return query + +""" +LOGBOOK +""" + +def logbook_get(context, lb_id, session=None): + """Return a logbook with matching lb_id""" + query = model_query(context, models.LogBook, session=session).\ + filter_by(logbook_id=lb_id) + + if not query: + raise exception.NotFound("No LogBook found with id " + "%s." % (lb_id,)) + + return query.first() + +def logbook_get_by_name(context, lb_name): + """Return all logbooks with matching name""" + query = model_query(context, models.LogBook).\ + filter_by(name=lb_name) + + if not query: + raise exception.NotFound("LogBook %s not found." + % (lb_name,)) + + return query.all() + +def logbook_create(context, name, lb_id=None): + """Create a new logbook""" + lb_ref = models.LogBook() + lb_ref.name = name + if lb_id: + lb_ref.logbook_id = lb_id + lb_ref.save() + + return lb_ref + +def logbook_get_workflows(context, lb_id): + """Return all workflows associated with a logbook""" + lb = logbook_get(context, lb_id) + + return lb.workflows + +def logbook_add_workflow(context, lb_id, wf_name): + """Add Workflow to given LogBook""" + session = get_session() + with session.begin(): + wf = workflow_get(context, wf_name, session=session) + lb = logbook_get(context, lb_id, session=session) + + lb.workflows.append(wf) + + return lb.workflows + +""" +JOB +""" + +def job_get(context, job_id, session=None): + """Return Job with matching job_id""" + query = model_query(context, models.Workflow, session=session).\ + filter_by(job_id=job_id) + + if not query: + raise exception.NotFound("No Job with id %s found" + % (job_id,)) + + +""" +WORKFLOW +""" + +def workflow_get(context, wf_name, session=None): + """Return one workflow with matching workflow_id""" + query = model_query(context, models.Workflow, session=session).\ + filter_by(name=wf_name) + + if not query: + raise exception.NotFound("Workflow %s not found." % (wf_name,)) + + return query.first() + +def workflow_get_all(context): + """Return all workflows""" + results = model_query(context, models.Workflow).all() + + if not results: + raise exception.NotFound("No Workflows were found.") + + return results + +def workflow_get_names(context): + """Return all workflow names""" + results = model_query(context, models.Workflow.name).all() + + return zip(*results) + +def workflow_get_tasks(context, wf_name): + """Return all tasks for a given Workflow""" + wf = workflow_get(context, wf_name) + + return wf.tasks + +def workflow_add_task(context, wf_id, task_id): + """Add a task to a given workflow""" + session = get_session() + with session.begin(): + task = task_get(context, task_id, session=session) + wf = workflow_get(context, wf_id, session=session) + wf.tasks.append(task) + return wf.tasks + +def workflow_create(context, workflow_name): + """Create new workflow with workflow_id""" + workflow_ref = models.Workflow() + workflow_ref.name = workflow_name + workflow_ref.save() + + return workflow_ref + +def workflow_destroy(context, wf_id): + """Delete a given Workflow""" + session = get_session() + with session.begin(): + wf = workflow_get(context, wf_id, session=session) + wf.delete() + +""" +TASK +""" + +def task_get(context, task_id, session=None): + """Return Task with task_id""" + result = model_query(context, models.Task, session=session).\ + filter_by(task_id=task_id) + + if not result: + raise exception.NotFound("No Task found with id " + "%s." % (task_id,)) + + return result + +def task_create(context, task_name, wf_id, task_id=None): + """Create task associated with given workflow""" + task_ref = models.Task() + task_ref.name = task_name + task_ref.wf_id = wf_id + if task_id: + task_ref.task_id = task_id + task_ref.save() + + return task_ref + +def task_update(context, task_id, values): + """Update Task with given values""" + session = get_session() + with session.begin(): + task = task_get(context, task_id) + + task.update(values) + task.save() + +def task_destroy(context, task_id): + """Delete an existing Task""" + session = get_session() + with session.begin(): + task = task_get(context, task_id, session=session) + task.delete() diff --git a/taskflow/db/sqlalchemy/models.py b/taskflow/db/sqlalchemy/models.py index 6db81578..a7f754b5 100644 --- a/taskflow/db/sqlalchemy/models.py +++ b/taskflow/db/sqlalchemy/models.py @@ -22,7 +22,7 @@ SQLAlchemy models for taskflow data. from oslo.config import cfg -from sqlalchemy import Column, Integer, String, Table +from sqlalchemy import Column, Integer, String, Table, MetaData from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import object_mapper, relationship, backref @@ -31,13 +31,12 @@ from sqlalchemy import types as types from json import dumps, loads -from taskflow.db.sqlalchemy.session import get_session +from taskflow.db.sqlalchemy.session import get_session, get_engine from taskflow.openstack.common import timeutils, uuidutils CONF = cfg.CONF BASE = declarative_base() - class Json(types.TypeDecorator, types.MutableType): impl = types.Text @@ -128,6 +127,8 @@ class LogBook(BASE, TaskFlowBase): __tablename__ = 'logbook' id = Column(Integer, primary_key=True) + logbook_id = Column(String, default=uuidutils.generate_uuid, + unique=True) name = Column(String) workflows = relationship("Workflow", secondary=workflow_logbook_assoc) @@ -138,7 +139,8 @@ class Job(BASE, TaskFlowBase): __tablename__ = 'job' id = Column(Integer, primary_key=True) - job_id = Column(String, default=uuidutils.generate_uuid) + job_id = Column(String, default=uuidutils.generate_uuid, + unique=True) name = Column(String) owner = Column(String) state = Column(String) @@ -152,8 +154,7 @@ class Workflow(BASE, TaskFlowBase): __tablename__ = 'workflow' id = Column(Integer, primary_key=True) - workflow_id = Column(String, default=uuidutils.generate_uuid) - name = Column(String) + name = Column(String, unique=True) tasks = relationship("Task", backref="workflow") class Task(BASE, TaskFlowBase): @@ -168,3 +169,6 @@ class Task(BASE, TaskFlowBase): exception = Column(String) stacktrace = Column(String) workflow_id = Column(String, ForeignKey('workflow.id')) + +def create_tables(): + BASE.metadata.create_all(get_engine()) diff --git a/taskflow/db/sqlalchemy/session.py b/taskflow/db/sqlalchemy/session.py index 477565a7..b95238ad 100644 --- a/taskflow/db/sqlalchemy/session.py +++ b/taskflow/db/sqlalchemy/session.py @@ -78,17 +78,17 @@ def get_engine(): "convert_unicode": True } - if "sqlite" in connection_dict.drivername: - engine_args['poolclass'] = NullPool + if "sqlite" in connection_dict.drivername: + engine_args['poolclass'] = NullPool - _ENGINE = sqlalchemy.create_engine(_get_sql_connection(), - **engine_args) + _ENGINE = sqlalchemy.create_engine(_get_sql_connection(), + **engine_args) - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener) - if 'sqlite' in connection_dict.drivername: - sqlalchemy.event.listen(_ENGINE, 'connect', - synchronous_switch_listener) + if 'mysql' in connection_dict.drivername: + sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener) + if 'sqlite' in connection_dict.drivername: + sqlalchemy.event.listen(_ENGINE, 'connect', + synchronous_switch_listener) #TODO: Check to make sure engine connected diff --git a/taskflow/test.db b/taskflow/test.db new file mode 100644 index 00000000..7aa4e1b6 Binary files /dev/null and b/taskflow/test.db differ