Add SQLAlchemy models and access methods

* Workflow execution
 * Workbook
 * Task
 * Fix controller tests

Change-Id: I1ec952837fb31451440375fc4a32ad2846ce345b
This commit is contained in:
Nikolay Mahotkin 2013-12-09 15:04:04 +04:00
parent 9403a52bd5
commit 711e6c1731
7 changed files with 381 additions and 18 deletions

View File

@ -39,76 +39,79 @@ def drop_db():
# Workbook
def workbook_get(name):
return {}
return IMPL.workbook_get(name)
def workbook_create(values):
return values
return IMPL.workbook_create(values)
def workbook_update(name, values):
return values
return IMPL.workbook_update(name, values)
def workbook_delete(name):
pass
IMPL.workbook_delete(name)
def workbooks_get():
return [{}]
return IMPL.workbooks_get_all()
def workbook_definition_get(workbook_name):
return ""
return IMPL.workbook_get(workbook_name)['doc']
def workbook_definition_put(workbook_name, text):
return text
workbook = IMPL.workbook_update(workbook_name, {'doc': text})
IMPL.create_associated_events(workbook)
return workbook
# Executions
def execution_get(workbook_name, id):
return {}
return IMPL.execution_get(workbook_name, id)
def execution_create(workbook_name, values):
return values
return IMPL.execution_create(workbook_name, values)
def execution_update(workbook_name, id, values):
return values
return IMPL.execution_update(workbook_name, id, values)
def execution_delete(workbook_name, id):
pass
return IMPL.execution_delete(workbook_name, id)
def executions_get(workbook_name):
return [{}]
return IMPL.executions_get_all(workbook_name=workbook_name)
# Tasks
def task_get(workbook_name, execution_id, id):
return {}
return IMPL.task_get(workbook_name, execution_id, id)
def task_create(workbook_name, execution_id, values):
return values
return IMPL.task_create(workbook_name, execution_id, values)
def task_update(workbook_name, execution_id, id, values):
return values
return IMPL.task_update(workbook_name, execution_id, id, values)
def task_delete(workbook_name, execution_id, id):
pass
return IMPL.task_delete(workbook_name, execution_id, id)
def tasks_get(workbook_name, execution_id):
return [{}]
return IMPL.tasks_get_all(workbook_name=workbook_name,
execution_id=execution_id)
# Listeners

View File

@ -129,3 +129,201 @@ def _event_get(event_id, session):
@to_dict
def event_get(event_id):
return _event_get(event_id, get_session())
def create_associated_events(workbook):
if not workbook.doc:
return
#TODO(nmakhotkin) should be implemented (need DSLParser)
pass
def workbook_create(values):
values = values.copy()
workbook = m.Workbook()
workbook.update(values)
session = get_session()
with session.begin():
try:
workbook.save(session=session)
except db_exc.DBDuplicateEntry as e:
LOG.exception("Database registration exception: %s", e)
##TODO(akuznetsov) create special exception for this case
raise Exception
return workbook_get(workbook.name)
def workbook_update(workbook_name, values):
values = values.copy()
session = get_session()
with session.begin():
workbook = _workbook_get(workbook_name, session)
if workbook is None:
##TODO(akuznetsov) create special exception for this case
raise Exception
workbook.update(values)
return workbook
def workbook_delete(workbook_name):
session = get_session()
with session.begin():
workbook = _workbook_get(workbook_name, session)
if not workbook:
raise Exception
session.delete(workbook)
@to_dict
def workbook_get(workbook_name):
return _workbook_get(workbook_name, get_session())
@to_dict
def workbooks_get_all(**kwargs):
return _workbooks_get_all(get_session(), **kwargs)
def _workbooks_get_all(session, **kwargs):
query = model_query(m.Workbook, session)
return query.filter_by(**kwargs).all()
def _workbook_get(workbook_name, session):
query = model_query(m.Workbook, session)
return query.filter_by(name=workbook_name).first()
@to_dict
def execution_get(workbook_name, execution_id):
return _execution_get(workbook_name, execution_id, get_session())
@to_dict
def executions_get_all(**kwargs):
return _executions_get_all(get_session(), **kwargs)
def _executions_get_all(session, **kwargs):
query = model_query(m.WorkflowExecution, session)
return query.filter_by(**kwargs).all()
def _execution_get(workbook_name, execution_id, session):
query = model_query(m.WorkflowExecution, session)
return query.filter_by(id=execution_id,
workbook_name=workbook_name).first()
def execution_update(workbook_name, execution_id, values):
values = values.copy()
session = get_session()
with session.begin():
execution = _execution_get(workbook_name, execution_id, session)
if execution is None:
##TODO(akuznetsov) create special exception for this case
raise Exception
execution.update(values)
return execution
def execution_delete(workbook_name, execution_id):
session = get_session()
with session.begin():
execution = _execution_get(workbook_name, execution_id, session)
if not execution:
raise Exception
session.delete(execution)
def execution_create(workbook_name, values):
values = values.copy()
execution = m.WorkflowExecution()
execution.update(values)
execution.update({'workbook_name': workbook_name})
session = get_session()
with session.begin():
try:
execution.save(session=session)
except db_exc.DBDuplicateEntry as e:
LOG.exception("Database registration exception: %s", e)
##TODO(akuznetsov) create special exception for this case
raise Exception
return execution_get(workbook_name, execution.id)
@to_dict
def task_get(workbook_name, execution_id, task_id):
return _task_get(workbook_name, execution_id, task_id, get_session())
def _task_get(workbook_name, execution_id, task_id, session):
query = model_query(m.Task, session)
return query.filter_by(id=task_id,
workbook_name=workbook_name,
execution_id=execution_id).first()
@to_dict
def tasks_get_all(**kwargs):
return _executions_get_all(get_session(), **kwargs)
def _tasks_get_all(session, **kwargs):
query = model_query(m.Task, session)
return query.filter_by(**kwargs).all()
def task_update(workbook_name, execution_id, task_id, values):
values = values.copy()
session = get_session()
with session.begin():
task = _task_get(workbook_name, execution_id, task_id, session)
if task is None:
##TODO(akuznetsov) create special exception for this case
raise Exception
task.update(values)
return task
def task_delete(workbook_name, execution_id, task_id):
session = get_session()
with session.begin():
task = _task_get(workbook_name, execution_id, task_id, session)
if not task:
raise Exception
session.delete(task)
def task_create(workbook_name, execution_id, values):
values = values.copy()
task = m.Task()
task.update(values)
task.update({
'workbook_name': workbook_name,
'execution_id': execution_id
})
session = get_session()
with session.begin():
try:
task.save(session=session)
except db_exc.DBDuplicateEntry as e:
LOG.exception("Database registration exception: %s", e)
##TODO(akuznetsov) create special exception for this case
raise Exception
return task_get(workbook_name, execution_id, task.id)

View File

@ -18,6 +18,7 @@ import sqlalchemy as sa
import uuid
from mistral.db.sqlalchemy import model_base as mb
from mistral.db.sqlalchemy import types as st
## Helpers
@ -45,3 +46,51 @@ class Event(mb.MistralBase):
name = sa.Column(sa.String(80), nullable=False)
pattern = sa.Column(sa.String(20), nullable=False)
next_execution_time = sa.Column(sa.DateTime, nullable=False)
class WorkflowExecution(mb.MistralBase):
"""Contains info about particular workflow execution"""
__tablename__ = 'workflow_executions'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = _id_column()
name = sa.Column(sa.String(80))
workbook_name = sa.Column(sa.String(80))
target_task = sa.Column(sa.String(80))
workflow_state = sa.Column(sa.String(20))
class Workbook(mb.MistralBase):
"""Contains info about all DSL (workbook) content"""
__tablename__ = 'workbooks'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = _id_column()
name = sa.Column(sa.String(80), primary_key=True)
doc = sa.Column(sa.String(), nullable=True)
description = sa.Column(sa.String())
tags = sa.Column(st.JsonListType())
scope = sa.Column(sa.String())
class Task(mb.MistralBase):
"""Contains info about particular task"""
__tablename__ = 'tasks'
id = _id_column()
name = sa.Column(sa.String(80))
workbook_name = sa.Column(sa.String(80))
execution_id = sa.Column(sa.String(36))
description = sa.Column(sa.String())
action = sa.Column(sa.String(80))
state = sa.Column(sa.String(20))
tags = sa.Column(st.JsonListType())

View File

@ -0,0 +1,111 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# This module implements SQLAlchemy-based types for dict and list
# expressed by json-strings
#
import sqlalchemy as sa
from sqlalchemy.ext import mutable
from mistral.openstack.common import jsonutils
class JsonEncoded(sa.TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
impl = sa.Text
def process_bind_param(self, value, dialect):
if value is not None:
value = jsonutils.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
value = jsonutils.loads(value)
return value
class MutableDict(mutable.Mutable, dict):
@classmethod
def coerce(cls, key, value):
"""Convert plain dictionaries to MutableDict."""
if not isinstance(value, MutableDict):
if isinstance(value, dict):
return MutableDict(value)
# this call will raise ValueError
return mutable.Mutable.coerce(key, value)
return value
def update(self, e=None, **f):
"""Detect dictionary update events and emit change events."""
dict.update(self, e, **f)
self.changed()
def __setitem__(self, key, value):
"""Detect dictionary set events and emit change events."""
dict.__setitem__(self, key, value)
self.changed()
def __delitem__(self, key):
"""Detect dictionary del events and emit change events."""
dict.__delitem__(self, key)
self.changed()
class MutableList(mutable.Mutable, list):
@classmethod
def coerce(cls, key, value):
"""Convert plain lists to MutableList."""
if not isinstance(value, MutableList):
if isinstance(value, list):
return MutableList(value)
# this call will raise ValueError
return mutable.Mutable.coerce(key, value)
return value
def __add__(self, value):
"""Detect list add events and emit change events."""
list.__add__(self, value)
self.changed()
def append(self, value):
"""Detect list add events and emit change events."""
list.append(self, value)
self.changed()
def __setitem__(self, key, value):
"""Detect list set events and emit change events."""
list.__setitem__(self, key, value)
self.changed()
def __delitem__(self, i):
"""Detect list del events and emit change events."""
list.__delitem__(self, i)
self.changed()
def JsonDictType():
"""Returns an SQLAlchemy Column Type suitable to store a Json dict."""
return MutableDict.as_mutable(JsonEncoded)
def JsonListType():
"""Returns an SQLAlchemy Column Type suitable to store a Json array."""
return MutableList.as_mutable(JsonEncoded)

View File

@ -62,6 +62,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertDictEqual(EXECS[0], resp.json)
def test_delete(self):
db_api.execution_delete = mock.MagicMock(return_value=None)
resp = self.app.delete('/v1/workbooks/my_workbook/executions/123')
self.assertEqual(resp.status_int, 204)

View File

@ -38,7 +38,7 @@ class TestWorkbookDefinitionController(base.FunctionalTest):
def test_put(self):
new_definition = "new definition"
db_api.workbook_definition_update =\
db_api.workbook_definition_put =\
mock.MagicMock(return_value=new_definition)
resp = self.app.put('/v1/workbooks/my_workbook/definition',

View File

@ -60,6 +60,7 @@ class TestWorkbooksController(base.FunctionalTest):
self.assertDictEqual(WORKBOOKS[0], resp.json)
def test_delete(self):
db_api.workbook_delete = mock.MagicMock(return_value=None)
resp = self.app.delete('/v1/workbooks/my_workbook')
self.assertEqual(resp.status_int, 204)