Modify action_factory to store actions in DB

* Action factory register actions in DB at starting
 * Added script tools/sync_db.sh for sync with db
   (Updates system actions, it is required before
   first Mistral launch)
 * Changed init Mistral in devstack: we must do
   sync_db before the launching.

 * Since we have to write all actions in DB, unit
   test are also modified:
    * Now in tearDown() we don't drop db at all,
      but just delete all things except Actions.
    * Created heavy_init() method as a replacement
      setUpClass(), see comments in tests/base.py
    * Register actions operation is too much expensive
      operation so we create db and init actions in
      heavy_init() method.

TODO:
 * Provide executor info to construct action class in
   runtime using action_factory
 * Modify/write an instruction 'How to start Mistral'

Change-Id: If4416c4da5c05189126c109aa613a0303c1b7ef0
This commit is contained in:
Nikolay Mahotkin 2014-08-22 18:00:43 +04:00
parent 5c90f9eac4
commit c5d442e6e4
14 changed files with 407 additions and 64 deletions

View File

@ -136,6 +136,7 @@ function configure_mistral {
function init_mistral {
# (re)create Mistral database
recreate_database mistral utf8
python $MISTRAL_DIR/tools/sync_db.py --config-file $MISTRAL_CONF_FILE
}

View File

@ -19,9 +19,12 @@ from stevedore import extension
from mistral.actions import base
from mistral.actions import generator_factory
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.openstack.common import importutils
from mistral.openstack.common import log as logging
from mistral.utils import inspect_utils as i_utils
from mistral.workbook import parser as spec_parser
@ -31,6 +34,7 @@ _ACTION_CTX_PARAM = 'action_context'
_NAMESPACES = {}
# TODO(nmakhotkin): It's not used anywhere.
def _find_or_create_namespace(name):
ns = _NAMESPACES.get(name)
@ -41,32 +45,68 @@ def _find_or_create_namespace(name):
return ns
def get_registered_namespaces():
return _NAMESPACES.copy()
def get_registered_actions(**kwargs):
return db_api.get_actions(**kwargs)
def _register_action_in_db(name, action_class, attributes,
description=None):
values = {
'name': name,
'action_class': action_class,
'attributes': attributes,
'description': description,
'is_system': True
}
try:
LOG.debug("Registering action in DB: %s" % name)
db_api.create_action(values)
except exc.DBDuplicateEntry:
LOG.debug("Action %s already exists in DB." % name)
def _clear_system_action_db():
db_api.delete_actions(is_system=True)
def sync_db():
_clear_system_action_db()
register_action_classes()
def _register_dynamic_action_classes():
all_generators = generator_factory.all_generators()
for generator in all_generators:
ns = _find_or_create_namespace(generator.action_namespace)
action_classes = generator.create_action_classes()
module = generator.base_action_class.__module__
class_name = generator.base_action_class.__name__
action_class_str = "%s.%s" % (module, class_name)
for action_name, action in action_classes.items():
ns.add(action_name, action)
attrs = i_utils.get_public_fields(action)
full_action_name = "%s.%s" % (generator.action_namespace,
action_name)
_register_action_in_db(full_action_name,
action_class_str,
attrs)
def _register_action_classes():
def register_action_classes():
mgr = extension.ExtensionManager(
namespace='mistral.actions',
invoke_on_load=False)
for name in mgr.names():
ns = _find_or_create_namespace(name.split('.')[0])
ns.add(name.split('.')[1], mgr[name].plugin)
with db_api.transaction():
for name in mgr.names():
action_class_str = mgr[name].entry_point_target.replace(':', '.')
attrs = i_utils.get_public_fields(mgr[name].plugin)
for ns in _NAMESPACES:
_NAMESPACES[ns].log()
_register_action_in_db(name, action_class_str, attrs)
_register_dynamic_action_classes()
_register_dynamic_action_classes()
def get_action_class(action_full_name):
@ -75,18 +115,23 @@ def get_action_class(action_full_name):
:param action_full_name: Full action name (that includes namespace).
:return: Action class or None if not found.
"""
arr = action_full_name.split('.')
if len(arr) != 2:
# TODO(nmakhotkin) Validate action_name.
if action_full_name.find('.') == -1:
raise exc.ActionException('Invalid action name: %s' %
action_full_name)
ns = _NAMESPACES.get(arr[0])
if not ns:
# TODO(nmakhotkin) Temporary hack to return None if action not found
try:
action_db = db_api.get_action(action_full_name)
except exc.NotFoundException:
return None
return ns.get_action_class(arr[1])
# Rebuild action class and restore attributes.
action_class = importutils.import_class(action_db.action_class)
for name, value in action_db.attributes.items():
setattr(action_class, name, value)
return action_class
def _get_action_context(db_task, openstack_context):
@ -220,6 +265,3 @@ def convert_adhoc_action_result(workbook, action_name, result):
# Use base action result as a context for evaluating expressions.
return expr.evaluate_recursively(transformer, result)
# Registering actions on module load.
_register_action_classes()

View File

@ -99,6 +99,10 @@ def workbook_definition_put(workbook_name, text):
return IMPL.workbook_update(workbook_name, {'definition': text})
def workbooks_delete(**kwargs):
return IMPL.workbooks_delete(**kwargs)
# Executions
@ -126,6 +130,10 @@ def executions_get(**kwargs):
return IMPL.executions_get(**kwargs)
def executions_delete(**kwargs):
return IMPL.executions_delete(**kwargs)
# Tasks
def task_get(id):
@ -144,6 +152,10 @@ def task_delete(id):
return IMPL.task_delete(id)
def tasks_delete(**kwargs):
return IMPL.tasks_delete(**kwargs)
def tasks_get(**kwargs):
return IMPL.tasks_get(**kwargs)
@ -189,3 +201,7 @@ def trigger_update(trigger_id, values):
def get_next_triggers(time):
return IMPL.get_next_triggers(time)
def triggers_delete(**kwargs):
return IMPL.triggers_delete(**kwargs)

View File

@ -88,6 +88,11 @@ def transaction():
end_tx()
def _delete_all(model, session=None, **kwargs):
query = b.model_query(model)
query.filter_by(**kwargs).delete()
# Triggers.
@b.session_aware()
@ -157,6 +162,11 @@ def triggers_get_all(**kwargs):
return _triggers_get_all(**kwargs)
@b.session_aware()
def triggers_delete(**kwargs):
return _delete_all(models.Trigger, **kwargs)
# Workbooks.
@b.session_aware()
@ -230,6 +240,11 @@ def _workbook_get(workbook_name, session=None):
project_id=context.ctx().project_id).first()
@b.session_aware()
def workbooks_delete(**kwargs):
return _delete_all(models.Workbook, **kwargs)
# Workflow executions.
@ -270,6 +285,11 @@ def execution_delete(execution_id, session=None):
session.delete(execution)
@b.session_aware()
def executions_delete(**kwargs):
return _delete_all(models.WorkflowExecution, **kwargs)
def execution_get(execution_id):
execution = _execution_get(execution_id)
@ -339,6 +359,11 @@ def task_delete(task_id, session=None):
session.delete(task)
@b.session_aware()
def tasks_delete(**kwargs):
return _delete_all(models.Task, **kwargs)
def task_get(task_id):
task = _task_get(task_id)
if not task:

View File

@ -92,6 +92,9 @@ def delete_workbook(name):
IMPL.delete_workbook(name)
def delete_workbooks(**kwargs):
IMPL.delete_workbooks(**kwargs)
# Workflows.
@ -124,6 +127,10 @@ def delete_workflow(name):
IMPL.delete_workflow(name)
def delete_workflows(**kwargs):
IMPL.delete_workflows(**kwargs)
# Executions.
def get_execution(id):
@ -159,6 +166,10 @@ def delete_execution(id):
return IMPL.delete_execution(id)
def delete_executions(**kwargs):
IMPL.delete_executions(**kwargs)
# Tasks.
def get_task(id):
@ -190,6 +201,10 @@ def delete_task(id):
return IMPL.delete_task(id)
def delete_tasks(**kwargs):
return IMPL.delete_tasks(**kwargs)
# Delayed calls.
@ -203,3 +218,29 @@ def delete_delayed_call(id):
def get_delayed_calls_to_start(time):
return IMPL.get_delayed_calls_to_start(time)
# Actions.
def get_action(name):
return IMPL.get_action(name)
def get_actions(**kwargs):
return IMPL.get_actions(**kwargs)
def create_action(values):
return IMPL.create_action(values)
def update_action(name, values):
return IMPL.update_action(name, values)
def delete_action(name):
return IMPL.delete_action(name)
def delete_actions(**kwargs):
return IMPL.delete_actions(**kwargs)

View File

@ -88,6 +88,11 @@ def transaction():
end_tx()
def _delete_all(model, session=None, **kwargs):
query = b.model_query(model).filter_by(**kwargs)
query.delete()
# Workbooks.
def get_workbook(name):
@ -175,6 +180,11 @@ def _get_workbook(name):
project_id=context.ctx().project_id).first()
@b.session_aware()
def delete_workbooks(**kwargs):
return _delete_all(models.Workbook, **kwargs)
# Workflows.
def get_workflow(name):
@ -246,6 +256,11 @@ def delete_workflow(name, session=None):
session.delete(wf)
@b.session_aware()
def delete_workflows(**kwargs):
return _delete_all(models.Workflow, **kwargs)
def _get_workflows(**kwargs):
query = b.model_query(models.Workflow)
@ -339,6 +354,12 @@ def delete_execution(id, session=None):
session.delete(execution)
@b.session_aware()
def delete_executions(**kwargs):
_delete_all(models.Task)
return _delete_all(models.Execution, **kwargs)
def _get_executions(**kwargs):
query = b.model_query(models.Execution)
@ -420,6 +441,11 @@ def delete_task(id, session=None):
session.delete(task)
@b.session_aware()
def delete_tasks(**kwargs):
return _delete_all(models.Task, **kwargs)
def _get_task(id):
query = b.model_query(models.Task)
@ -472,3 +498,75 @@ def _get_delayed_call(delayed_call_id, session=None):
query = b.model_query(models.DelayedCall)
return query.filter_by(id=delayed_call_id).first()
# Actions.
def get_action(name):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
return action
def get_actions(**kwargs):
return _get_actions(**kwargs)
@b.session_aware()
def create_action(values, session=None):
action = models.Action()
action.update(values)
try:
action.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for action %s: %s"
% (action.name, e.columns))
return action
@b.session_aware()
def delete_actions(**kwargs):
return _delete_all(models.Action, **kwargs)
@b.session_aware()
def update_action(name, values, session=None):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
action.update(values.copy())
return action
@b.session_aware()
def delete_action(name, session=None):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
session.delete(action)
def _get_action(name):
query = b.model_query(models.Action)
return query.filter_by(name=name).first()
def _get_actions(**kwargs):
query = b.model_query(models.Action)
return query.filter_by(**kwargs).all()

View File

@ -127,3 +127,23 @@ class DelayedCall(mb.MistralModelBase):
method_arguments = sa.Column(st.JsonDictType())
auth_context = sa.Column(st.JsonDictType())
execution_time = sa.Column(sa.DateTime, nullable=False)
class Action(mb.MistralModelBase):
"""Contains info about registered Actions."""
__tablename__ = 'actions_v2'
__table_args__ = (
sa.UniqueConstraint('name'),
)
# Main properties.
id = mb.id_column()
name = sa.Column(sa.String(200))
description = sa.Column(sa.Text())
# Service properties.
action_class = sa.Column(sa.String(200))
attributes = sa.Column(st.JsonDictType())
is_system = sa.Column(sa.Boolean())

View File

@ -2,7 +2,7 @@
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@ -16,6 +16,7 @@
import pkg_resources as pkg
import sys
import time
from oslo.config import cfg
from oslo import messaging
@ -24,8 +25,7 @@ from oslotest import base
from stevedore import driver
import testtools.matchers as ttm
import time
from mistral.actions import action_factory
from mistral import context as auth_context
from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.v1 import api as db_api_v1
@ -169,14 +169,50 @@ class BaseTest(base.BaseTestCase):
class DbTestCase(BaseTest):
def setUp(self):
super(DbTestCase, self).setUp()
is_heavy_init_called = False
@classmethod
def __heavy_init(cls):
"""Make this method private to prevent extending this one.
It runs heavy_init() only once.
Note: setUpClass() can be used, but it magically is not invoked
from child class in another module.
"""
if not cls.is_heavy_init_called:
cls.heavy_init()
cls.is_heavy_init_called = True
@classmethod
def heavy_init(cls):
"""Runs a long initialization (runs once by class)
and can be extended by child classes.
"""
cfg.CONF.set_default('connection', 'sqlite://', group='database')
db_api_v1.setup_db()
db_api_v2.setup_db()
action_factory.sync_db()
self.addCleanup(db_api_v1.drop_db)
def _clean_db(self):
with db_api_v1.transaction():
db_api_v1.workbooks_delete()
db_api_v1.executions_delete()
db_api_v1.triggers_delete()
db_api_v1.tasks_delete()
with db_api_v2.transaction():
db_api_v2.delete_workbooks()
db_api_v2.delete_executions()
db_api_v2.delete_workflows()
db_api_v2.delete_tasks()
def setUp(self):
super(DbTestCase, self).setUp()
self.__heavy_init()
cfg.CONF.set_default('connection', 'sqlite://', group='database')
db_api_v1.setup_db()
db_api_v2.setup_db()
self.ctx = auth_context.MistralContext(user_id='1-2-3-4',
project_id='5-6-7-8',
@ -184,7 +220,9 @@ class DbTestCase(BaseTest):
project_name='test-project',
is_admin=False)
auth_context.set_ctx(self.ctx)
self.addCleanup(auth_context.set_ctx, None)
self.addCleanup(self._clean_db)
def is_db_session_open(self):
return db_sa_base._get_thread_local_session() is not None

View File

@ -89,53 +89,36 @@ DB_TASK_ADHOC = {
}
class ActionFactoryTest(base.BaseTest):
class ActionFactoryTest(base.DbTestCase):
def test_register_standard_actions(self):
namespaces = a_f.get_registered_namespaces()
action_list = a_f.get_registered_actions()
self.assertIn("nova", namespaces)
self.assertIn("glance", namespaces)
self.assertIn("keystone", namespaces)
self.assertIn("std", namespaces)
self._assert_single_item(action_list, name="std.echo")
self._assert_single_item(action_list, name="std.email")
self._assert_single_item(action_list, name="std.http")
self._assert_single_item(action_list, name="std.mistral_http")
self._assert_single_item(action_list, name="std.ssh")
std_ns = namespaces["std"]
nova_ns = namespaces["nova"]
keystone_ns = namespaces["keystone"]
glance_ns = namespaces["glance"]
self._assert_single_item(action_list, name="nova.servers_get")
self._assert_single_item(action_list, name="nova.volumes_delete")
self.assertEqual(5, len(std_ns))
self._assert_single_item(action_list, name="keystone.users_list")
self._assert_single_item(action_list, name="keystone.trusts_create")
self.assertTrue(nova_ns.contains_action_name("servers_get"))
self.assertTrue(nova_ns.contains_action_name("volumes_delete"))
self.assertTrue(keystone_ns.contains_action_name("users_list"))
self.assertTrue(keystone_ns.contains_action_name("trusts_create"))
self.assertTrue(glance_ns.contains_action_name("images_list"))
self.assertTrue(glance_ns.contains_action_name("images_delete"))
self.assertTrue(std_ns.contains_action_name("echo"))
self.assertTrue(std_ns.contains_action_name("http"))
self.assertTrue(std_ns.contains_action_name("mistral_http"))
self.assertTrue(std_ns.contains_action_name("email"))
self.assertEqual(std.EchoAction, std_ns.get_action_class("echo"))
self.assertEqual(std.HTTPAction, std_ns.get_action_class("http"))
self.assertEqual(std.MistralHTTPAction,
std_ns.get_action_class("mistral_http"))
self.assertEqual(std.SendEmailAction,
std_ns.get_action_class("email"))
self._assert_single_item(action_list, name="glance.images_list")
self._assert_single_item(action_list, name="glance.images_delete")
def test_get_action_class(self):
self.assertEqual(std.EchoAction, a_f.get_action_class("std.echo"))
self.assertEqual(std.HTTPAction, a_f.get_action_class("std.http"))
self.assertEqual(std.MistralHTTPAction,
a_f.get_action_class("std.mistral_http"))
self.assertEqual(std.SendEmailAction,
a_f.get_action_class("std.email"))
def test_get_action_class_failure(self):
exc = self.assertRaises(exceptions.ActionException,
a_f.get_action_class, 'echo')
self.assertIn('Invalid action name', exc.message)
self.assertRaises(exceptions.ActionException,
a_f.get_action_class, 'echo')
def test_create_http_action(self):
db_task = models.Task()

View File

@ -111,6 +111,8 @@ class TestExecutor(base.DbTestCase):
self.execution = db_api.execution_create(
SAMPLE_EXECUTION['workbook_name'], SAMPLE_EXECUTION)
self.addCleanup(db_api.execution_delete, SAMPLE_EXECUTION['id'])
# Create a new task.
SAMPLE_TASK['execution_id'] = self.execution['id']
self.task = db_api.task_create(

View File

@ -57,8 +57,11 @@ class DefaultEngineTest(base.DbTestCase):
def setUp(self):
super(DefaultEngineTest, self).setUp()
self.wb_name = self.getUniqueString("wb")
wb_service.create_workbook_v2({
'name': 'my_wb',
'name': self.wb_name,
'description': 'Simple workbook for testing engine.',
'definition': WORKBOOK,
'tags': ['test']
})
@ -75,7 +78,7 @@ class DefaultEngineTest(base.DbTestCase):
# Start workflow.
exec_db = self.engine.start_workflow(
'my_wb.wf1',
'%s.wf1' % self.wb_name,
wf_input,
task_name='task2'
)
@ -110,7 +113,7 @@ class DefaultEngineTest(base.DbTestCase):
# Start workflow.
exec_db = self.engine.start_workflow(
'my_wb.wf1',
'%s.wf1' % self.wb_name,
wf_input,
task_name='task2'
)

View File

@ -0,0 +1,34 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
def get_public_fields(obj):
"""Returns only public fields from object or class."""
public_attributes = [attr for attr in dir(obj)
if not attr.startswith("_")]
public_fields = {}
for attribute_str in public_attributes:
attr = getattr(obj, attribute_str)
is_field = not (inspect.isbuiltin(attr)
or inspect.isfunction(attr)
or inspect.ismethod(attr))
if is_field:
public_fields[attribute_str] = attr
return public_fields

37
tools/sync_db.py Normal file
View File

@ -0,0 +1,37 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from mistral.actions import action_factory
from mistral.db.v2 import api as db_api
from mistral import config
from mistral.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def main():
config.parse_args()
logging.setup('Mistral')
db_api.setup_db()
action_factory.sync_db()
if __name__ == '__main__':
main()

3
tools/sync_db.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
tox -evenv -- python tools/sync_db.py "$@"