Create standard workflows and actions

* Standard workflows are creating during sync_db.sh
 * Standard workflows include:
     - std.create_instance
     - std.delete_instance
     (list will be extended in future)
 * Standard actions:
     - std.wait_ssh (needed for std.create_instance)
 * Make it possible to see these workflows from any project (global scope)
 * Small changes in sqlalchemy api and workflows_service

Partially implements blueprint mistral-multitenancy

Change-Id: I8a8ace40949b2b711a292aac94d7e6354d1dff9c
This commit is contained in:
Nikolay Mahotkin 2014-10-09 17:07:54 +04:00
parent 004745ea8b
commit bb5d09b0e3
9 changed files with 208 additions and 27 deletions

View File

@ -172,8 +172,12 @@ def _get_workbooks(**kwargs):
def _get_workbook(name):
query = b.model_query(models.Workbook)
return query.filter_by(name=name,
project_id=context.ctx().project_id).first()
project_id = context.ctx().project_id if context.has_ctx() else None
proj = query.filter_by(name=name, project_id=project_id)
public = query.filter_by(name=name, scope='public')
return proj.union(public).first()
@b.session_aware()
@ -206,7 +210,7 @@ def create_workflow(values, session=None):
wf = models.Workflow()
wf.update(values.copy())
wf['project_id'] = context.ctx().project_id
wf['project_id'] = context.ctx().project_id if context.has_ctx() else None
try:
wf.save(session=session)
@ -226,7 +230,7 @@ def update_workflow(name, values, session=None):
"Workflow not found [workflow_name=%s]" % name)
wf.update(values.copy())
wf['project_id'] = context.ctx().project_id
wf['project_id'] = context.ctx().project_id if context.has_ctx() else None
return wf
@ -273,8 +277,12 @@ def _get_workflows(**kwargs):
def _get_workflow(name):
query = b.model_query(models.Workflow)
return query.filter_by(name=name,
project_id=context.ctx().project_id).first()
project_id = context.ctx().project_id if context.has_ctx() else None
proj = query.filter_by(name=name, project_id=project_id)
public = query.filter_by(name=name, scope='public')
return proj.union(public).first()
# Executions.

View File

@ -22,14 +22,25 @@ from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.services import actions
from mistral import utils
from mistral.utils import inspect_utils as i_utils
LOG = logging.getLogger(__name__)
ACTIONS_PATH = '../resources/actions'
_ACTION_CTX_PARAM = 'action_context'
def register_standard_actions():
action_paths = utils.get_file_list(ACTIONS_PATH)
for action_path in action_paths:
action_definition = open(action_path).read()
actions.update_actions(action_definition, scope='public')
def get_registered_actions(**kwargs):
return db_api.get_actions(**kwargs)
@ -60,6 +71,7 @@ def _clear_system_action_db():
def sync_db():
_clear_system_action_db()
register_action_classes()
register_standard_actions()
def _register_dynamic_action_classes():

View File

@ -21,19 +21,19 @@ from mistral.services import trusts
from mistral.workbook import parser as spec_parser
def create_actions(definition):
def create_actions(definition, scope='private'):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
db_actions = []
with db_api.transaction():
for action_spec in action_list_spec.get_actions():
db_actions.append(create_action(action_spec, definition))
db_actions.append(create_action(action_spec, definition, scope))
return db_actions
def update_actions(definition):
def update_actions(definition, scope='private'):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
db_actions = []
@ -41,16 +41,21 @@ def update_actions(definition):
with db_api.transaction():
for action_spec in action_list_spec.get_actions():
db_actions.append(create_or_update_action(action_spec,
definition))
definition,
scope))
return db_actions
def create_action(action_spec, definition):
return db_api.create_action(_get_action_values(action_spec, definition))
def create_action(action_spec, definition, scope):
return db_api.create_action(_get_action_values(
action_spec,
definition,
scope
))
def create_or_update_action(action_spec, definition):
def create_or_update_action(action_spec, definition, scope):
action = db_api.load_action(action_spec.get_name())
if action and action.is_system:
@ -59,12 +64,12 @@ def create_or_update_action(action_spec, definition):
action.name
)
values = _get_action_values(action_spec, definition)
values = _get_action_values(action_spec, definition, scope)
return db_api.create_or_update_action(values['name'], values)
def _get_action_values(action_spec, definition):
def _get_action_values(action_spec, definition, scope):
values = {
'name': action_spec.get_name(),
'description': action_spec.get_description(),
@ -72,7 +77,8 @@ def _get_action_values(action_spec, definition):
'definition': definition,
'spec': action_spec.to_dict(),
'is_system': False,
'input': ", ".join(action_spec.get_input())
'input': ", ".join(action_spec.get_input()),
'scope': scope
}
_add_security_info(values)
@ -81,7 +87,7 @@ def _get_action_values(action_spec, definition):
def _add_security_info(values):
if cfg.CONF.pecan.auth_enable:
if cfg.CONF.pecan.auth_enable and not values['name'].startswith('std.'):
values.update({
'trust_id': trusts.create_trust().id,
'project_id': context.ctx().project_id

View File

@ -17,39 +17,60 @@ from oslo.config import cfg
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.services import trusts
from mistral import utils
from mistral.workbook import parser as spec_parser
def create_workflows(definition):
WORKFLOWS_PATH = '../resources/workflows'
def register_standard_workflows():
workflow_paths = utils.get_file_list(WORKFLOWS_PATH)
for wf_path in workflow_paths:
workflow_definition = open(wf_path).read()
update_workflows(workflow_definition, scope='public')
def sync_db():
register_standard_workflows()
def create_workflows(definition, scope='private'):
wf_list_spec = spec_parser.get_workflow_list_spec_from_yaml(definition)
db_wfs = []
with db_api.transaction():
for wf_spec in wf_list_spec.get_workflows():
db_wfs.append(create_workflow(wf_spec, definition))
db_wfs.append(create_workflow(wf_spec, definition, scope))
return db_wfs
def update_workflows(definition):
def update_workflows(definition, scope='private'):
wf_list_spec = spec_parser.get_workflow_list_spec_from_yaml(definition)
db_wfs = []
with db_api.transaction():
for wf_spec in wf_list_spec.get_workflows():
db_wfs.append(create_or_update_workflow(wf_spec, definition))
db_wfs.append(create_or_update_workflow(
wf_spec,
definition,
scope
))
return db_wfs
def create_workflow(wf_spec, definition):
def create_workflow(wf_spec, definition, scope):
values = {
'name': wf_spec.get_name(),
'tags': wf_spec.get_tags(),
'definition': definition,
'spec': wf_spec.to_dict()
'spec': wf_spec.to_dict(),
'scope': scope
}
_add_security_info(values)
@ -57,12 +78,13 @@ def create_workflow(wf_spec, definition):
return db_api.create_workflow(values)
def create_or_update_workflow(wf_spec, definition):
def create_or_update_workflow(wf_spec, definition, scope):
values = {
'name': wf_spec.get_name(),
'tags': wf_spec.get_tags(),
'definition': definition,
'spec': wf_spec.to_dict()
'spec': wf_spec.to_dict(),
'scope': scope
}
_add_security_info(values)
@ -71,7 +93,7 @@ def create_or_update_workflow(wf_spec, definition):
def _add_security_info(values):
if cfg.CONF.pecan.auth_enable:
if cfg.CONF.pecan.auth_enable and not values['name'].startswith('std.'):
values.update({
'trust_id': trusts.create_trust().id,
'project_id': context.ctx().project_id

View File

@ -15,10 +15,14 @@
# limitations under the License.
import logging
import os
from os import path
import threading
from eventlet import corolocal
import pkg_resources as pkg
from mistral import version
# Thread local storage.
_th_loc_storage = threading.local()
@ -120,3 +124,13 @@ def merge_dicts(left, right):
merge_dicts(left_v, v)
return left
def get_file_list(directory):
base_path = pkg.resource_filename(
version.version_info.package,
directory
)
return [path.join(base_path, f) for f in os.listdir(base_path)
if path.isfile(path.join(base_path, f))]

View File

@ -0,0 +1,15 @@
---
version: 2.0
std.wait_ssh:
description: Simple SSH command.
base: std.ssh
base-input:
host: $.host
username: $.username
password: $.password
cmd: 'ls -l'
input:
- host
- username
- password

View File

@ -0,0 +1,76 @@
---
version: 2.0
std.create_instance:
type: direct
description: |
Creates VM and waits till VM OS is up and running.
input:
- name
- image_id
- flavor_id
- ssh_username
- ssh_password
task-defaults:
on-error:
- delete_vm
output:
ip: $.vm_ip
id: $.vm_id
name: $.name
status: $.status
tasks:
create_vm:
description: Initial request to create a VM.
action: nova.servers_create name={$.name} image={$.image_id} flavor={$.flavor_id}
publish:
vm_id: $.id
on-success:
- search_for_ip
search_for_ip:
description: Gets first free ip from Nova floating IPs.
action: nova.floating_ips_findall instance_id=null
publish:
vm_ip: $[0].ip
on-success:
- wait_vm_active
wait_vm_active:
description: Waits till VM is ACTIVE.
action: nova.servers_find id={$.vm_id} status="ACTIVE"
policies:
retry:
count: 10
delay: 10
publish:
status: $.status
on-success:
- associate_ip
associate_ip:
description: Associate server with one of floating IPs.
action: nova.servers_add_floating_ip server={$.vm_id} address={$.vm_ip}
policies:
wait-after: 5
on-success:
- wait_ssh
wait_ssh:
description: Wait till operating system on the VM is up (SSH command).
action: std.wait_ssh username={$.ssh_username} password={$.ssh_password} host={$.vm_ip}
policies:
retry:
count: 10
delay: 10
delete_vm:
description: Destroy VM.
workflow: std.delete_instance instance_id={$.vm_id}
on-complete:
- fail

View File

@ -0,0 +1,26 @@
---
version: "2.0"
std.delete_instance:
type: direct
input:
- instance_id
description: Deletes VM.
tasks:
delete_vm:
description: Destroy VM.
action: nova.servers_delete server={$.instance_id}
policies:
wait-after: 10
on-success:
- find_given_vm
find_given_vm:
description: Checks that VM is already deleted.
action: nova.servers_find id={$.instance_id}
on-error:
- succeed

View File

@ -18,6 +18,7 @@ from mistral.db.v2 import api as db_api
from mistral import config
from mistral.openstack.common import log as logging
from mistral.services import action_manager
from mistral.services import workflows
CONF = cfg.CONF
@ -36,6 +37,7 @@ def main():
db_api.setup_db()
action_manager.sync_db()
workflows.sync_db()
if __name__ == '__main__':