Merge "Refactor engine to use plugins"
This commit is contained in:
commit
c37061ca5e
@ -35,8 +35,8 @@ host = 0.0.0.0
|
||||
port = 8989
|
||||
|
||||
[engine]
|
||||
# Mistral engine class (string value)
|
||||
#engine=mistral.engine.scalable.engine
|
||||
# Mistral engine plugin (string value)
|
||||
#engine=default
|
||||
|
||||
# Name of the engine node. This can be an opaque identifier.
|
||||
# It is not necessarily a hostname, FQDN, or IP address. (string value)
|
||||
|
@ -15,7 +15,6 @@
|
||||
from pecan.hooks import PecanHook
|
||||
|
||||
from mistral import engine
|
||||
from mistral.engine import client
|
||||
from mistral.openstack.common import log as logging
|
||||
|
||||
|
||||
@ -26,7 +25,7 @@ class EngineHook(PecanHook):
|
||||
|
||||
def __init__(self, transport=None):
|
||||
self.transport = engine.get_transport(transport)
|
||||
self.engine = client.EngineClient(self.transport)
|
||||
self.engine = engine.EngineClient(self.transport)
|
||||
|
||||
def before(self, state):
|
||||
state.request.context['engine'] = self.engine
|
||||
|
@ -38,7 +38,7 @@ from oslo.config import cfg
|
||||
|
||||
from mistral import config
|
||||
from mistral import engine
|
||||
from mistral.engine.scalable.executor import server
|
||||
from mistral.engine import executor
|
||||
from mistral.api import app
|
||||
from wsgiref import simple_server
|
||||
from mistral.openstack.common import log as logging
|
||||
@ -50,7 +50,9 @@ LOG = logging.getLogger(__name__)
|
||||
def launch_executor(transport):
|
||||
target = messaging.Target(topic=cfg.CONF.executor.topic,
|
||||
server=cfg.CONF.executor.host)
|
||||
endpoints = [server.Executor(transport)]
|
||||
# Since engine and executor are tightly coupled, use the engine
|
||||
# configuration to decide which executor to get.
|
||||
endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)]
|
||||
ex_server = messaging.get_rpc_server(
|
||||
transport, target, endpoints, executor='eventlet')
|
||||
ex_server.start()
|
||||
@ -60,7 +62,7 @@ def launch_executor(transport):
|
||||
def launch_engine(transport):
|
||||
target = messaging.Target(topic=cfg.CONF.engine.topic,
|
||||
server=cfg.CONF.engine.host)
|
||||
endpoints = [engine.Engine(transport)]
|
||||
endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)]
|
||||
en_server = messaging.get_rpc_server(
|
||||
transport, target, endpoints, executor='eventlet')
|
||||
en_server.start()
|
||||
|
@ -30,8 +30,8 @@ api_opts = [
|
||||
]
|
||||
|
||||
engine_opts = [
|
||||
cfg.StrOpt('engine', default='mistral.engine.scalable.engine',
|
||||
help='Mistral engine class'),
|
||||
cfg.StrOpt('engine', default='default',
|
||||
help='Mistral engine plugin'),
|
||||
cfg.StrOpt('host', default='0.0.0.0',
|
||||
help='Name of the engine node. This can be an opaque '
|
||||
'identifier. It is not necessarily a hostname, '
|
||||
|
@ -12,28 +12,62 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import eventlet
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
# If mistral.config is not imported here, nosetests will fail on import
|
||||
# because workflow_trace_log_name is not registered. The use of importutils
|
||||
# to import mistral.config instead of simply "from mistral import config" is
|
||||
# to avoid pep8 error on module referenced but not used.
|
||||
# TODO(m4dcoder): Refactor and clean up configuration registration.
|
||||
from mistral.openstack.common import importutils
|
||||
importutils.import_module("mistral.config")
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.db import api as db_api
|
||||
from mistral import dsl_parser as parser
|
||||
from mistral import exceptions as exc
|
||||
from mistral.engine import states
|
||||
from mistral.engine import workflow
|
||||
from mistral.engine import data_flow
|
||||
from mistral.engine import retry
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
|
||||
|
||||
def get_transport(transport=None):
|
||||
return (transport if transport else messaging.get_transport(cfg.CONF))
|
||||
|
||||
|
||||
def get_engine(name, transport):
|
||||
mgr = driver.DriverManager(
|
||||
namespace='mistral.engine.drivers',
|
||||
name=name,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds={'transport': transport})
|
||||
return mgr.driver
|
||||
|
||||
|
||||
class Engine(object):
|
||||
"""Abstract engine for workflow execution."""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
transport = None
|
||||
|
||||
def __init__(self, transport=None):
|
||||
module_name = cfg.CONF.engine.engine
|
||||
module = importutils.import_module(module_name)
|
||||
self.transport = get_transport(transport)
|
||||
self.backend = module.get_engine()
|
||||
self.backend.transport = self.transport
|
||||
|
||||
@abc.abstractmethod
|
||||
def _run_tasks(cls, tasks):
|
||||
raise NotImplementedError()
|
||||
|
||||
def start_workflow_execution(self, cntx, **kwargs):
|
||||
"""Starts a workflow execution based on the specified workbook name
|
||||
@ -48,8 +82,47 @@ class Engine(object):
|
||||
workbook_name = kwargs.get('workbook_name')
|
||||
task_name = kwargs.get('task_name')
|
||||
context = kwargs.get('context', None)
|
||||
return self.backend.start_workflow_execution(
|
||||
workbook_name, task_name, context)
|
||||
|
||||
context = copy.copy(context) if context else {}
|
||||
|
||||
db_api.start_tx()
|
||||
|
||||
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', "
|
||||
"task_name = '%s']" % (workbook_name, task_name))
|
||||
|
||||
# Persist execution and tasks in DB.
|
||||
try:
|
||||
workbook = self._get_workbook(workbook_name)
|
||||
execution = self._create_execution(workbook_name,
|
||||
task_name,
|
||||
context)
|
||||
|
||||
tasks = self._create_tasks(
|
||||
workflow.find_workflow_tasks(workbook, task_name),
|
||||
workbook,
|
||||
workbook_name, execution['id']
|
||||
)
|
||||
|
||||
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
|
||||
|
||||
self._add_variables_to_data_flow_context(context, execution)
|
||||
|
||||
data_flow.prepare_tasks(tasks_to_start, context)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create necessary DB objects.")
|
||||
raise exc.EngineException("Failed to create necessary DB objects:"
|
||||
" %s" % e)
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
for task in delayed_tasks:
|
||||
self._schedule_run(workbook, task, context)
|
||||
|
||||
self._run_tasks(tasks_to_start)
|
||||
|
||||
return execution
|
||||
|
||||
def stop_workflow_execution(self, cntx, **kwargs):
|
||||
"""Stops the workflow execution with the given id.
|
||||
@ -62,8 +135,9 @@ class Engine(object):
|
||||
"""
|
||||
workbook_name = kwargs.get('workbook_name')
|
||||
execution_id = kwargs.get('execution_id')
|
||||
return self.backend.stop_workflow_execution(
|
||||
workbook_name, execution_id)
|
||||
|
||||
return db_api.execution_update(workbook_name, execution_id,
|
||||
{"state": states.STOPPED})
|
||||
|
||||
def convey_task_result(self, cntx, **kwargs):
|
||||
"""Conveys task result to Mistral Engine.
|
||||
@ -88,8 +162,74 @@ class Engine(object):
|
||||
task_id = kwargs.get('task_id')
|
||||
state = kwargs.get('state')
|
||||
result = kwargs.get('result')
|
||||
return self.backend.convey_task_result(
|
||||
workbook_name, execution_id, task_id, state, result)
|
||||
|
||||
db_api.start_tx()
|
||||
|
||||
try:
|
||||
workbook = self._get_workbook(workbook_name)
|
||||
#TODO(rakhmerov): validate state transition
|
||||
task = db_api.task_get(workbook_name, execution_id, task_id)
|
||||
|
||||
wf_trace_msg = "Task '%s' [%s -> %s" % \
|
||||
(task['name'], task['state'], state)
|
||||
|
||||
wf_trace_msg += ']' if state == states.ERROR \
|
||||
else ", result = %s]" % result
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
|
||||
task_output = data_flow.get_task_output(task, result)
|
||||
|
||||
# Update task state.
|
||||
task, outbound_context = self._update_task(workbook, task, state,
|
||||
task_output)
|
||||
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
self._create_next_tasks(task, workbook)
|
||||
|
||||
# Determine what tasks need to be started.
|
||||
tasks = db_api.tasks_get(workbook_name, execution_id)
|
||||
|
||||
new_exec_state = self._determine_execution_state(execution, tasks)
|
||||
|
||||
if execution['state'] != new_exec_state:
|
||||
wf_trace_msg = \
|
||||
"Execution '%s' [%s -> %s]" % \
|
||||
(execution_id, execution['state'], new_exec_state)
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
|
||||
execution = \
|
||||
db_api.execution_update(workbook_name, execution_id, {
|
||||
"state": new_exec_state
|
||||
})
|
||||
|
||||
LOG.info("Changed execution state: %s" % execution)
|
||||
|
||||
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
|
||||
|
||||
self._add_variables_to_data_flow_context(outbound_context,
|
||||
execution)
|
||||
|
||||
data_flow.prepare_tasks(tasks_to_start, outbound_context)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create necessary DB objects.")
|
||||
raise exc.EngineException("Failed to create necessary DB objects:"
|
||||
" %s" % e)
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
if states.is_stopped_or_finished(execution["state"]):
|
||||
return task
|
||||
|
||||
for task in delayed_tasks:
|
||||
self._schedule_run(workbook, task, outbound_context)
|
||||
|
||||
if tasks_to_start:
|
||||
self._run_tasks(tasks_to_start)
|
||||
|
||||
return task
|
||||
|
||||
def get_workflow_execution_state(self, cntx, **kwargs):
|
||||
"""Gets the workflow execution state.
|
||||
@ -102,8 +242,15 @@ class Engine(object):
|
||||
"""
|
||||
workbook_name = kwargs.get('workbook_name')
|
||||
execution_id = kwargs.get('execution_id')
|
||||
return self.backend.get_workflow_execution_state(
|
||||
workbook_name, execution_id)
|
||||
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
if not execution:
|
||||
raise exc.EngineException("Workflow execution not found "
|
||||
"[workbook_name=%s, execution_id=%s]"
|
||||
% (workbook_name, execution_id))
|
||||
|
||||
return execution["state"]
|
||||
|
||||
def get_task_state(self, cntx, **kwargs):
|
||||
"""Gets task state.
|
||||
@ -117,5 +264,252 @@ class Engine(object):
|
||||
workbook_name = kwargs.get('workbook_name')
|
||||
execution_id = kwargs.get('execution_id')
|
||||
task_id = kwargs.get('task_id')
|
||||
return self.backend.get_task_state(
|
||||
workbook_name, execution_id, task_id)
|
||||
|
||||
task = db_api.task_get(workbook_name, execution_id, task_id)
|
||||
|
||||
if not task:
|
||||
raise exc.EngineException("Task not found.")
|
||||
|
||||
return task["state"]
|
||||
|
||||
@classmethod
|
||||
def _create_execution(cls, workbook_name, task_name, context):
|
||||
return db_api.execution_create(workbook_name, {
|
||||
"workbook_name": workbook_name,
|
||||
"task": task_name,
|
||||
"state": states.RUNNING,
|
||||
"context": context
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def _add_variables_to_data_flow_context(cls, context, execution):
|
||||
db_workbook = db_api.workbook_get(execution['workbook_name'])
|
||||
|
||||
data_flow.add_token_to_context(context, db_workbook)
|
||||
data_flow.add_execution_to_context(context, execution)
|
||||
|
||||
@classmethod
|
||||
def _create_next_tasks(cls, task, workbook):
|
||||
tasks = workflow.find_tasks_after_completion(task, workbook)
|
||||
|
||||
db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'],
|
||||
task['execution_id'])
|
||||
return workflow.find_resolved_tasks(db_tasks)
|
||||
|
||||
@classmethod
|
||||
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
|
||||
tasks = []
|
||||
|
||||
for task in task_list:
|
||||
state, task_runtime_context = retry.get_task_runtime(task)
|
||||
action_ns = workbook.namespaces.get(task.get_action_namespace())
|
||||
|
||||
action_spec = None
|
||||
if action_ns:
|
||||
action_spec = \
|
||||
action_ns.actions.get(task.get_action_name())
|
||||
|
||||
db_task = db_api.task_create(workbook_name, execution_id, {
|
||||
"name": task.name,
|
||||
"requires": task.requires,
|
||||
"task_spec": task.to_dict(),
|
||||
"action_spec": {} if not action_spec
|
||||
else action_spec.to_dict(),
|
||||
"state": state,
|
||||
"tags": task.get_property("tags", None),
|
||||
"task_runtime_context": task_runtime_context
|
||||
})
|
||||
|
||||
tasks.append(db_task)
|
||||
|
||||
return tasks
|
||||
|
||||
@classmethod
|
||||
def _get_workbook(cls, workbook_name):
|
||||
wb = db_api.workbook_get(workbook_name)
|
||||
return parser.get_workbook(wb["definition"])
|
||||
|
||||
@classmethod
|
||||
def _determine_execution_state(cls, execution, tasks):
|
||||
if workflow.is_error(tasks):
|
||||
return states.ERROR
|
||||
|
||||
if workflow.is_success(tasks) or workflow.is_finished(tasks):
|
||||
return states.SUCCESS
|
||||
|
||||
return execution['state']
|
||||
|
||||
@classmethod
|
||||
def _update_task(cls, workbook, task, state, task_output):
|
||||
"""
|
||||
Update the task with the runtime information. The outbound_context
|
||||
for this task is also calculated.
|
||||
:return: task, outbound_context. task is the updated task and
|
||||
computed outbound context.
|
||||
"""
|
||||
workbook_name = task['workbook_name']
|
||||
execution_id = task['execution_id']
|
||||
task_spec = workbook.tasks.get(task["name"])
|
||||
task_runtime_context = task["task_runtime_context"]
|
||||
|
||||
# Compute the outbound_context, state and exec_flow_context.
|
||||
outbound_context = data_flow.get_outbound_context(task, task_output)
|
||||
state, task_runtime_context = retry.get_task_runtime(
|
||||
task_spec, state, outbound_context, task_runtime_context)
|
||||
|
||||
# Update the task.
|
||||
update_values = {"state": state,
|
||||
"output": task_output,
|
||||
"task_runtime_context": task_runtime_context}
|
||||
task = db_api.task_update(workbook_name, execution_id, task["id"],
|
||||
update_values)
|
||||
|
||||
return task, outbound_context
|
||||
|
||||
def _schedule_run(cls, workbook, task, outbound_context):
|
||||
"""
|
||||
Schedules task to run after the delay defined in the task
|
||||
specification. If no delay is specified this method is a no-op.
|
||||
"""
|
||||
|
||||
def run_delayed_task():
|
||||
"""
|
||||
Runs the delayed task. Performs all the steps required to setup
|
||||
a task to run which are not already done. This is mostly code
|
||||
copied over from convey_task_result.
|
||||
"""
|
||||
db_api.start_tx()
|
||||
try:
|
||||
workbook_name = task['workbook_name']
|
||||
execution_id = task['execution_id']
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
# Change state from DELAYED to IDLE to unblock processing.
|
||||
|
||||
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
|
||||
% (task['name'],
|
||||
task['state'], states.IDLE))
|
||||
|
||||
db_task = db_api.task_update(workbook_name,
|
||||
execution_id,
|
||||
task['id'],
|
||||
{"state": states.IDLE})
|
||||
task_to_start = [db_task]
|
||||
data_flow.prepare_tasks(task_to_start, outbound_context)
|
||||
db_api.commit_tx()
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
if not states.is_stopped_or_finished(execution["state"]):
|
||||
cls._run_tasks(task_to_start)
|
||||
|
||||
task_spec = workbook.tasks.get(task['name'])
|
||||
retries, break_on, delay_sec = task_spec.get_retry_parameters()
|
||||
if delay_sec > 0:
|
||||
# Run the task after the specified delay.
|
||||
eventlet.spawn_after(delay_sec, run_delayed_task)
|
||||
else:
|
||||
LOG.warn("No delay specified for task(id=%s) name=%s. Not "
|
||||
"scheduling for execution." % (task['id'], task['name']))
|
||||
|
||||
|
||||
class EngineClient(object):
|
||||
"""
|
||||
RPC client for the Engine.
|
||||
"""
|
||||
|
||||
def __init__(self, transport):
|
||||
"""Construct an RPC client for the Engine.
|
||||
|
||||
:param transport: a messaging transport handle
|
||||
:type transport: Transport
|
||||
"""
|
||||
target = messaging.Target(topic=cfg.CONF.engine.topic)
|
||||
self._client = messaging.RPCClient(transport, target)
|
||||
|
||||
def start_workflow_execution(self, workbook_name, task_name, context=None):
|
||||
"""Starts a workflow execution based on the specified workbook name
|
||||
and target task.
|
||||
|
||||
:param workbook_name: Workbook name
|
||||
:param task_name: Target task name
|
||||
:param context: Execution context which defines a workflow input
|
||||
:return: Workflow execution.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'task_name': task_name,
|
||||
'context': context}
|
||||
return self._client.call(cntx, 'start_workflow_execution', **kwargs)
|
||||
|
||||
def stop_workflow_execution(self, workbook_name, execution_id):
|
||||
"""Stops the workflow execution with the given id.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:return: Workflow execution.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id}
|
||||
return self._client.call(cntx, 'stop_workflow_execution', **kwargs)
|
||||
|
||||
def convey_task_result(self, workbook_name, execution_id,
|
||||
task_id, state, result):
|
||||
"""Conveys task result to Mistral Engine.
|
||||
|
||||
This method should be used by clients of Mistral Engine to update
|
||||
state of a task once task action has been performed. One of the
|
||||
clients of this method is Mistral REST API server that receives
|
||||
task result from the outside action handlers.
|
||||
|
||||
Note: calling this method serves an event notifying Mistral that
|
||||
it possibly needs to move the workflow on, i.e. run other workflow
|
||||
tasks for which all dependencies are satisfied.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:param task_id: Task id.
|
||||
:param state: New task state.
|
||||
:param result: Task result data.
|
||||
:return: Task.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id,
|
||||
'task_id': task_id,
|
||||
'state': state,
|
||||
'result': result}
|
||||
return self._client.call(cntx, 'convey_task_result', **kwargs)
|
||||
|
||||
def get_workflow_execution_state(self, workbook_name, execution_id):
|
||||
"""Gets the workflow execution state.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:return: Current workflow state.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id}
|
||||
return self._client.call(
|
||||
cntx, 'get_workflow_execution_state', **kwargs)
|
||||
|
||||
def get_task_state(self, workbook_name, execution_id, task_id):
|
||||
"""Gets task state.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:param task_id: Task id.
|
||||
:return: Current task state.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'executioin_id': execution_id,
|
||||
'task_id': task_id}
|
||||
return self._client.call(cntx, 'get_task_state', **kwargs)
|
||||
|
@ -1,323 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import eventlet
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.db import api as db_api
|
||||
from mistral import dsl_parser as parser
|
||||
from mistral import exceptions as exc
|
||||
from mistral.engine import states
|
||||
from mistral.engine import workflow
|
||||
from mistral.engine import data_flow
|
||||
from mistral.engine import retry
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
|
||||
|
||||
class AbstractEngine(object):
|
||||
transport = None
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def _run_tasks(cls, tasks):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def start_workflow_execution(cls, workbook_name, task_name, context):
|
||||
context = copy.copy(context) if context else {}
|
||||
|
||||
db_api.start_tx()
|
||||
|
||||
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', "
|
||||
"task_name = '%s']" % (workbook_name, task_name))
|
||||
|
||||
# Persist execution and tasks in DB.
|
||||
try:
|
||||
workbook = cls._get_workbook(workbook_name)
|
||||
execution = cls._create_execution(workbook_name,
|
||||
task_name,
|
||||
context)
|
||||
|
||||
tasks = cls._create_tasks(
|
||||
workflow.find_workflow_tasks(workbook, task_name),
|
||||
workbook,
|
||||
workbook_name, execution['id']
|
||||
)
|
||||
|
||||
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
|
||||
|
||||
cls._add_variables_to_data_flow_context(context, execution)
|
||||
|
||||
data_flow.prepare_tasks(tasks_to_start, context)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create necessary DB objects.")
|
||||
raise exc.EngineException("Failed to create necessary DB objects:"
|
||||
" %s" % e)
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
for task in delayed_tasks:
|
||||
cls._schedule_run(workbook, task, context)
|
||||
|
||||
cls._run_tasks(tasks_to_start)
|
||||
|
||||
return execution
|
||||
|
||||
@classmethod
|
||||
def convey_task_result(cls, workbook_name, execution_id,
|
||||
task_id, state, result):
|
||||
db_api.start_tx()
|
||||
|
||||
try:
|
||||
workbook = cls._get_workbook(workbook_name)
|
||||
#TODO(rakhmerov): validate state transition
|
||||
task = db_api.task_get(workbook_name, execution_id, task_id)
|
||||
|
||||
wf_trace_msg = "Task '%s' [%s -> %s" % \
|
||||
(task['name'], task['state'], state)
|
||||
|
||||
wf_trace_msg += ']' if state == states.ERROR \
|
||||
else ", result = %s]" % result
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
|
||||
task_output = data_flow.get_task_output(task, result)
|
||||
|
||||
# Update task state.
|
||||
task, outbound_context = cls._update_task(workbook, task, state,
|
||||
task_output)
|
||||
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
cls._create_next_tasks(task, workbook)
|
||||
|
||||
# Determine what tasks need to be started.
|
||||
tasks = db_api.tasks_get(workbook_name, execution_id)
|
||||
|
||||
new_exec_state = cls._determine_execution_state(execution, tasks)
|
||||
|
||||
if execution['state'] != new_exec_state:
|
||||
wf_trace_msg = \
|
||||
"Execution '%s' [%s -> %s]" % \
|
||||
(execution_id, execution['state'], new_exec_state)
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
|
||||
execution = \
|
||||
db_api.execution_update(workbook_name, execution_id, {
|
||||
"state": new_exec_state
|
||||
})
|
||||
|
||||
LOG.info("Changed execution state: %s" % execution)
|
||||
|
||||
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
|
||||
|
||||
cls._add_variables_to_data_flow_context(outbound_context,
|
||||
execution)
|
||||
|
||||
data_flow.prepare_tasks(tasks_to_start, outbound_context)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create necessary DB objects.")
|
||||
raise exc.EngineException("Failed to create necessary DB objects:"
|
||||
" %s" % e)
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
if states.is_stopped_or_finished(execution["state"]):
|
||||
return task
|
||||
|
||||
for task in delayed_tasks:
|
||||
cls._schedule_run(workbook, task, outbound_context)
|
||||
|
||||
if tasks_to_start:
|
||||
cls._run_tasks(tasks_to_start)
|
||||
|
||||
return task
|
||||
|
||||
@classmethod
|
||||
def stop_workflow_execution(cls, workbook_name, execution_id):
|
||||
return db_api.execution_update(workbook_name, execution_id,
|
||||
{"state": states.STOPPED})
|
||||
|
||||
@classmethod
|
||||
def get_workflow_execution_state(cls, workbook_name, execution_id):
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
if not execution:
|
||||
raise exc.EngineException("Workflow execution not found "
|
||||
"[workbook_name=%s, execution_id=%s]"
|
||||
% (workbook_name, execution_id))
|
||||
|
||||
return execution["state"]
|
||||
|
||||
@classmethod
|
||||
def get_task_state(cls, workbook_name, execution_id, task_id):
|
||||
task = db_api.task_get(workbook_name, execution_id, task_id)
|
||||
|
||||
if not task:
|
||||
raise exc.EngineException("Task not found.")
|
||||
|
||||
return task["state"]
|
||||
|
||||
@classmethod
|
||||
def _create_execution(cls, workbook_name, task_name, context):
|
||||
return db_api.execution_create(workbook_name, {
|
||||
"workbook_name": workbook_name,
|
||||
"task": task_name,
|
||||
"state": states.RUNNING,
|
||||
"context": context
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def _add_variables_to_data_flow_context(cls, context, execution):
|
||||
db_workbook = db_api.workbook_get(execution['workbook_name'])
|
||||
|
||||
data_flow.add_token_to_context(context, db_workbook)
|
||||
data_flow.add_execution_to_context(context, execution)
|
||||
|
||||
@classmethod
|
||||
def _create_next_tasks(cls, task, workbook):
|
||||
tasks = workflow.find_tasks_after_completion(task, workbook)
|
||||
|
||||
db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'],
|
||||
task['execution_id'])
|
||||
return workflow.find_resolved_tasks(db_tasks)
|
||||
|
||||
@classmethod
|
||||
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
|
||||
tasks = []
|
||||
|
||||
for task in task_list:
|
||||
state, task_runtime_context = retry.get_task_runtime(task)
|
||||
action_ns = workbook.namespaces.get(task.get_action_namespace())
|
||||
|
||||
action_spec = None
|
||||
if action_ns:
|
||||
action_spec = \
|
||||
action_ns.actions.get(task.get_action_name())
|
||||
|
||||
db_task = db_api.task_create(workbook_name, execution_id, {
|
||||
"name": task.name,
|
||||
"requires": task.requires,
|
||||
"task_spec": task.to_dict(),
|
||||
"action_spec": {} if not action_spec
|
||||
else action_spec.to_dict(),
|
||||
"state": state,
|
||||
"tags": task.get_property("tags", None),
|
||||
"task_runtime_context": task_runtime_context
|
||||
})
|
||||
|
||||
tasks.append(db_task)
|
||||
|
||||
return tasks
|
||||
|
||||
@classmethod
|
||||
def _get_workbook(cls, workbook_name):
|
||||
wb = db_api.workbook_get(workbook_name)
|
||||
return parser.get_workbook(wb["definition"])
|
||||
|
||||
@classmethod
|
||||
def _determine_execution_state(cls, execution, tasks):
|
||||
if workflow.is_error(tasks):
|
||||
return states.ERROR
|
||||
|
||||
if workflow.is_success(tasks) or workflow.is_finished(tasks):
|
||||
return states.SUCCESS
|
||||
|
||||
return execution['state']
|
||||
|
||||
@classmethod
|
||||
def _update_task(cls, workbook, task, state, task_output):
|
||||
"""
|
||||
Update the task with the runtime information. The outbound_context
|
||||
for this task is also calculated.
|
||||
:return: task, outbound_context. task is the updated task and
|
||||
computed outbound context.
|
||||
"""
|
||||
workbook_name = task['workbook_name']
|
||||
execution_id = task['execution_id']
|
||||
task_spec = workbook.tasks.get(task["name"])
|
||||
task_runtime_context = task["task_runtime_context"]
|
||||
|
||||
# Compute the outbound_context, state and exec_flow_context.
|
||||
outbound_context = data_flow.get_outbound_context(task, task_output)
|
||||
state, task_runtime_context = retry.get_task_runtime(
|
||||
task_spec, state, outbound_context, task_runtime_context)
|
||||
|
||||
# Update the task.
|
||||
update_values = {"state": state,
|
||||
"output": task_output,
|
||||
"task_runtime_context": task_runtime_context}
|
||||
task = db_api.task_update(workbook_name, execution_id, task["id"],
|
||||
update_values)
|
||||
|
||||
return task, outbound_context
|
||||
|
||||
@classmethod
|
||||
def _schedule_run(cls, workbook, task, outbound_context):
|
||||
"""
|
||||
Schedules task to run after the delay defined in the task
|
||||
specification. If no delay is specified this method is a no-op.
|
||||
"""
|
||||
|
||||
def run_delayed_task():
|
||||
"""
|
||||
Runs the delayed task. Performs all the steps required to setup
|
||||
a task to run which are not already done. This is mostly code
|
||||
copied over from convey_task_result.
|
||||
"""
|
||||
db_api.start_tx()
|
||||
try:
|
||||
workbook_name = task['workbook_name']
|
||||
execution_id = task['execution_id']
|
||||
execution = db_api.execution_get(workbook_name, execution_id)
|
||||
|
||||
# Change state from DELAYED to IDLE to unblock processing.
|
||||
|
||||
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
|
||||
% (task['name'],
|
||||
task['state'], states.IDLE))
|
||||
|
||||
db_task = db_api.task_update(workbook_name,
|
||||
execution_id,
|
||||
task['id'],
|
||||
{"state": states.IDLE})
|
||||
task_to_start = [db_task]
|
||||
data_flow.prepare_tasks(task_to_start, outbound_context)
|
||||
db_api.commit_tx()
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
if not states.is_stopped_or_finished(execution["state"]):
|
||||
cls._run_tasks(task_to_start)
|
||||
|
||||
task_spec = workbook.tasks.get(task['name'])
|
||||
retries, break_on, delay_sec = task_spec.get_retry_parameters()
|
||||
if delay_sec > 0:
|
||||
# Run the task after the specified delay.
|
||||
eventlet.spawn_after(delay_sec, run_delayed_task)
|
||||
else:
|
||||
LOG.warn("No delay specified for task(id=%s) name=%s. Not "
|
||||
"scheduling for execution." % (task['id'], task['name']))
|
@ -1,123 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EngineClient(object):
|
||||
"""
|
||||
RPC client for the Engine.
|
||||
"""
|
||||
|
||||
def __init__(self, transport):
|
||||
"""Construct an RPC client for the Engine.
|
||||
|
||||
:param transport: a messaging transport handle
|
||||
:type transport: Transport
|
||||
"""
|
||||
target = messaging.Target(topic=cfg.CONF.engine.topic)
|
||||
self._client = messaging.RPCClient(transport, target)
|
||||
|
||||
def start_workflow_execution(self, workbook_name, task_name, context=None):
|
||||
"""Starts a workflow execution based on the specified workbook name
|
||||
and target task.
|
||||
|
||||
:param workbook_name: Workbook name
|
||||
:param task_name: Target task name
|
||||
:param context: Execution context which defines a workflow input
|
||||
:return: Workflow execution.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'task_name': task_name,
|
||||
'context': context}
|
||||
return self._client.call(cntx, 'start_workflow_execution', **kwargs)
|
||||
|
||||
def stop_workflow_execution(self, workbook_name, execution_id):
|
||||
"""Stops the workflow execution with the given id.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:return: Workflow execution.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id}
|
||||
return self._client.call(cntx, 'stop_workflow_execution', **kwargs)
|
||||
|
||||
def convey_task_result(self, workbook_name, execution_id,
|
||||
task_id, state, result):
|
||||
"""Conveys task result to Mistral Engine.
|
||||
|
||||
This method should be used by clients of Mistral Engine to update
|
||||
state of a task once task action has been performed. One of the
|
||||
clients of this method is Mistral REST API server that receives
|
||||
task result from the outside action handlers.
|
||||
|
||||
Note: calling this method serves an event notifying Mistral that
|
||||
it possibly needs to move the workflow on, i.e. run other workflow
|
||||
tasks for which all dependencies are satisfied.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:param task_id: Task id.
|
||||
:param state: New task state.
|
||||
:param result: Task result data.
|
||||
:return: Task.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id,
|
||||
'task_id': task_id,
|
||||
'state': state,
|
||||
'result': result}
|
||||
return self._client.call(cntx, 'convey_task_result', **kwargs)
|
||||
|
||||
def get_workflow_execution_state(self, workbook_name, execution_id):
|
||||
"""Gets the workflow execution state.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:return: Current workflow state.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id}
|
||||
return self._client.call(
|
||||
cntx, 'get_workflow_execution_state', **kwargs)
|
||||
|
||||
def get_task_state(self, workbook_name, execution_id, task_id):
|
||||
"""Gets task state.
|
||||
|
||||
:param workbook_name: Workbook name.
|
||||
:param execution_id: Workflow execution id.
|
||||
:param task_id: Task id.
|
||||
:return: Current task state.
|
||||
"""
|
||||
# TODO(m4dcoder): refactor auth context
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'executioin_id': execution_id,
|
||||
'task_id': task_id}
|
||||
return self._client.call(cntx, 'get_task_state', **kwargs)
|
@ -1,7 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
@ -16,29 +14,28 @@
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.engine.scalable.executor import client
|
||||
from mistral.engine import abstract_engine as abs_eng
|
||||
from mistral import engine
|
||||
from mistral.engine import executor
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScalableEngine(abs_eng.AbstractEngine):
|
||||
@classmethod
|
||||
def _notify_task_executors(cls, tasks):
|
||||
class DefaultEngine(engine.Engine):
|
||||
def _notify_task_executors(self, tasks):
|
||||
# TODO(m4dcoder): Use a pool for transport and client
|
||||
if not cls.transport:
|
||||
cls.transport = messaging.get_transport(cfg.CONF)
|
||||
ex_client = client.ExecutorClient(cls.transport)
|
||||
if not self.transport:
|
||||
self.transport = messaging.get_transport(cfg.CONF)
|
||||
exctr = executor.ExecutorClient(self.transport)
|
||||
for task in tasks:
|
||||
# TODO(m4dcoder): Fill request context argument with auth info
|
||||
context = {}
|
||||
ex_client.handle_task(context, task=task)
|
||||
exctr.handle_task(context, task=task)
|
||||
LOG.info("Submitted task for execution: '%s'" % task)
|
||||
|
||||
@classmethod
|
||||
def _run_tasks(cls, tasks):
|
||||
def _run_tasks(self, tasks):
|
||||
# TODO(rakhmerov):
|
||||
# This call outside of DB transaction creates a window
|
||||
# when the engine may crash and DB will not be consistent with
|
||||
@ -47,8 +44,4 @@ class ScalableEngine(abs_eng.AbstractEngine):
|
||||
# However, making this call in DB transaction is really bad
|
||||
# since it makes transaction much longer in time and under load
|
||||
# may overload DB with open transactions.
|
||||
cls._notify_task_executors(tasks)
|
||||
|
||||
|
||||
def get_engine():
|
||||
return ScalableEngine
|
||||
self._notify_task_executors(tasks)
|
@ -19,9 +19,8 @@ from oslo.config import cfg
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.db import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral import engine
|
||||
from mistral.engine import client
|
||||
from mistral.engine import states
|
||||
from mistral.engine import executor
|
||||
from mistral.actions import action_factory as a_f
|
||||
|
||||
|
||||
@ -29,10 +28,7 @@ LOG = logging.getLogger(__name__)
|
||||
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
|
||||
|
||||
class Executor(object):
|
||||
def __init__(self, transport=None):
|
||||
self.transport = engine.get_transport(transport)
|
||||
self.engine = client.EngineClient(self.transport)
|
||||
class DefaultExecutor(executor.Executor):
|
||||
|
||||
def _do_task_action(self, task):
|
||||
"""Executes the action defined by the task and return result.
|
@ -1,7 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
@ -14,15 +12,42 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral import engine
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_executor(name, transport):
|
||||
mgr = driver.DriverManager(
|
||||
namespace='mistral.executor.drivers',
|
||||
name=name,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds={'transport': transport})
|
||||
return mgr.driver
|
||||
|
||||
|
||||
class Executor(object):
|
||||
"""Abstract class for task execution."""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, transport=None):
|
||||
self.transport = engine.get_transport(transport)
|
||||
self.engine = engine.EngineClient(self.transport)
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_task(self, cntx, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class ExecutorClient(object):
|
||||
"""
|
||||
RPC client for the Executor.
|
@ -16,7 +16,6 @@
|
||||
|
||||
from mistral.db import api as db_api
|
||||
from mistral import engine
|
||||
from mistral.engine import client
|
||||
from mistral.openstack.common import log
|
||||
from mistral.openstack.common import periodic_task
|
||||
from mistral.openstack.common import threadgroup
|
||||
@ -34,7 +33,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
def __init__(self, transport=None):
|
||||
super(MistralPeriodicTasks, self).__init__()
|
||||
self.transport = engine.get_transport(transport)
|
||||
self.engine = client.EngineClient(self.transport)
|
||||
self.engine = engine.EngineClient(self.transport)
|
||||
|
||||
@periodic_task.periodic_task(spacing=1, run_immediately=True)
|
||||
def scheduler_triggers(self, ctx):
|
||||
|
@ -21,7 +21,7 @@ from mistral import exceptions as ex
|
||||
from webtest.app import AppError
|
||||
from mistral.tests.api import base
|
||||
from mistral.db import api as db_api
|
||||
from mistral.engine import client
|
||||
from mistral import engine
|
||||
|
||||
# TODO: later we need additional tests verifying all the errors etc.
|
||||
|
||||
@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertDictEqual(UPDATED_EXEC, canonize(resp.json))
|
||||
|
||||
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
|
||||
@mock.patch.object(engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(return_value=EXECS[0]))
|
||||
def test_post(self):
|
||||
new_exec = EXECS[0].copy()
|
||||
@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest):
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertDictEqual(EXECS[0], canonize(resp.json))
|
||||
|
||||
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
|
||||
@mock.patch.object(engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(side_effect=ex.MistralException))
|
||||
def test_post_throws_exception(self):
|
||||
with self.assertRaises(AppError) as context:
|
||||
|
@ -18,7 +18,7 @@ import mock
|
||||
|
||||
from mistral.tests.api import base
|
||||
from mistral.db import api as db_api
|
||||
from mistral.engine import client
|
||||
from mistral import engine
|
||||
|
||||
# TODO: later we need additional tests verifying all the errors etc.
|
||||
|
||||
@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertDictEqual(TASKS[0], resp.json)
|
||||
|
||||
@mock.patch.object(client.EngineClient, "convey_task_result",
|
||||
@mock.patch.object(engine.EngineClient, "convey_task_result",
|
||||
mock.MagicMock(return_value=UPDATED_TASK))
|
||||
def test_put(self):
|
||||
resp = self.app.put_json(
|
||||
|
@ -34,9 +34,8 @@ from mistral.db.sqlalchemy import api as db_api
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.openstack.common.db.sqlalchemy import session
|
||||
from mistral import version
|
||||
from mistral.engine import client
|
||||
from mistral.engine.scalable import engine
|
||||
from mistral.engine.scalable.executor import server
|
||||
from mistral import engine
|
||||
from mistral.engine import executor
|
||||
|
||||
|
||||
RESOURCES_PATH = 'tests/resources/'
|
||||
@ -121,10 +120,11 @@ class DbTestCase(BaseTest):
|
||||
|
||||
class EngineTestCase(DbTestCase):
|
||||
transport = get_fake_transport()
|
||||
backend = engine.get_engine(cfg.CONF.engine.engine, transport)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(EngineTestCase, self).__init__(*args, **kwargs)
|
||||
self.engine = client.EngineClient(self.transport)
|
||||
self.engine = engine.EngineClient(self.transport)
|
||||
|
||||
@classmethod
|
||||
def mock_task_result(cls, workbook_name, execution_id,
|
||||
@ -133,8 +133,13 @@ class EngineTestCase(DbTestCase):
|
||||
Mock the engine convey_task_results to send request directly
|
||||
to the engine instead of going through the oslo.messaging transport.
|
||||
"""
|
||||
return engine.ScalableEngine.convey_task_result(
|
||||
workbook_name, execution_id, task_id, state, result)
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id,
|
||||
'task_id': task_id,
|
||||
'state': state,
|
||||
'result': result}
|
||||
return cls.backend.convey_task_result(cntx, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def mock_start_workflow(cls, workbook_name, task_name, context=None):
|
||||
@ -142,8 +147,11 @@ class EngineTestCase(DbTestCase):
|
||||
Mock the engine start_workflow_execution to send request directly
|
||||
to the engine instead of going through the oslo.messaging transport.
|
||||
"""
|
||||
return engine.ScalableEngine.start_workflow_execution(
|
||||
workbook_name, task_name, context)
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'task_name': task_name,
|
||||
'context': context}
|
||||
return cls.backend.start_workflow_execution(cntx, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def mock_get_workflow_state(cls, workbook_name, execution_id):
|
||||
@ -151,8 +159,10 @@ class EngineTestCase(DbTestCase):
|
||||
Mock the engine get_workflow_execution_state to send request directly
|
||||
to the engine instead of going through the oslo.messaging transport.
|
||||
"""
|
||||
return engine.ScalableEngine.get_workflow_execution_state(
|
||||
workbook_name, execution_id)
|
||||
cntx = {}
|
||||
kwargs = {'workbook_name': workbook_name,
|
||||
'execution_id': execution_id}
|
||||
return cls.backend.get_workflow_execution_state(cntx, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def mock_run_tasks(cls, tasks):
|
||||
@ -160,9 +170,9 @@ class EngineTestCase(DbTestCase):
|
||||
Mock the engine _run_tasks to send requests directly to the task
|
||||
executor instead of going through the oslo.messaging transport.
|
||||
"""
|
||||
executor = server.Executor(transport=cls.transport)
|
||||
exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport)
|
||||
for task in tasks:
|
||||
executor.handle_task({}, task=task)
|
||||
exctr.handle_task({}, task=task)
|
||||
|
||||
@classmethod
|
||||
def mock_handle_task(cls, cntx, **kwargs):
|
||||
@ -170,5 +180,5 @@ class EngineTestCase(DbTestCase):
|
||||
Mock the executor handle_task to send requests directory to the task
|
||||
executor instead of going through the oslo.messaging transport.
|
||||
"""
|
||||
executor = server.Executor(transport=cls.transport)
|
||||
return executor.handle_task(cntx, **kwargs)
|
||||
exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport)
|
||||
return exctr.handle_task(cntx, **kwargs)
|
||||
|
@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging
|
||||
from mistral.db import api as db_api
|
||||
from mistral.actions import std_actions
|
||||
from mistral import expressions
|
||||
from mistral.engine.scalable import engine
|
||||
from mistral import engine
|
||||
from mistral.engine import states
|
||||
from mistral.engine import client
|
||||
from mistral.engine.drivers.default import engine as concrete_engine
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -40,10 +40,10 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'start_workflow_execution',
|
||||
engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'convey_task_result',
|
||||
engine.EngineClient, 'convey_task_result',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
|
||||
@mock.patch.object(
|
||||
db_api, 'workbook_get',
|
||||
@ -54,7 +54,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
mock.MagicMock(return_value={'state': states.SUCCESS}))
|
||||
class TestScalableEngine(base.EngineTestCase):
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, "_notify_task_executors",
|
||||
concrete_engine.DefaultEngine, "_notify_task_executors",
|
||||
mock.MagicMock(return_value=""))
|
||||
def test_engine_one_task(self):
|
||||
execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
|
||||
@ -72,11 +72,11 @@ class TestScalableEngine(base.EngineTestCase):
|
||||
self.assertEqual(task['state'], states.SUCCESS)
|
||||
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'get_workflow_execution_state',
|
||||
engine.EngineClient, 'get_workflow_execution_state',
|
||||
mock.MagicMock(
|
||||
side_effect=base.EngineTestCase.mock_get_workflow_state))
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, "_notify_task_executors",
|
||||
concrete_engine.DefaultEngine, "_notify_task_executors",
|
||||
mock.MagicMock(return_value=""))
|
||||
def test_engine_multiple_tasks(self):
|
||||
execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
|
||||
@ -116,7 +116,7 @@ class TestScalableEngine(base.EngineTestCase):
|
||||
WB_NAME, execution['id']))
|
||||
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, '_run_tasks',
|
||||
concrete_engine.DefaultEngine, '_run_tasks',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
|
||||
@mock.patch.object(
|
||||
states, "get_state_by_http_status_code",
|
||||
@ -135,7 +135,7 @@ class TestScalableEngine(base.EngineTestCase):
|
||||
self.assertEqual(task['state'], states.SUCCESS)
|
||||
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, '_run_tasks',
|
||||
concrete_engine.DefaultEngine, '_run_tasks',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
|
||||
@mock.patch.object(
|
||||
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
|
||||
@ -201,7 +201,7 @@ class TestScalableEngine(base.EngineTestCase):
|
||||
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
|
||||
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, '_run_tasks',
|
||||
concrete_engine.DefaultEngine, '_run_tasks',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
|
||||
@mock.patch.object(
|
||||
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
|
@ -28,8 +28,8 @@ from mistral.openstack.common import importutils
|
||||
from mistral.engine import states
|
||||
from mistral.db import api as db_api
|
||||
from mistral.actions import std_actions
|
||||
from mistral.engine import client as engine
|
||||
from mistral.engine.scalable.executor import client as executor
|
||||
from mistral import engine
|
||||
from mistral.engine import executor
|
||||
|
||||
|
||||
# We need to make sure that all configuration properties are registered.
|
@ -22,10 +22,10 @@ from oslo.config import cfg
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.tests import base
|
||||
from mistral.db import api as db_api
|
||||
from mistral.engine.scalable import engine
|
||||
from mistral.actions import std_actions
|
||||
from mistral import engine
|
||||
from mistral.engine import states
|
||||
from mistral.engine import client
|
||||
from mistral.engine.drivers.default import engine as concrete_engine
|
||||
from mistral.utils.openstack import keystone
|
||||
|
||||
|
||||
@ -62,13 +62,13 @@ def create_workbook(definition_path):
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'start_workflow_execution',
|
||||
engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'convey_task_result',
|
||||
engine.EngineClient, 'convey_task_result',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, '_run_tasks',
|
||||
concrete_engine.DefaultEngine, '_run_tasks',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
|
||||
class DataFlowTest(base.EngineTestCase):
|
||||
def _check_in_context_execution(self, task):
|
||||
|
@ -24,10 +24,10 @@ from mistral import exceptions as exc
|
||||
from mistral.openstack.common import log as logging
|
||||
from mistral.tests import base
|
||||
from mistral.db import api as db_api
|
||||
from mistral.engine import client
|
||||
from mistral.engine.scalable import engine
|
||||
from mistral.actions import std_actions
|
||||
from mistral import engine
|
||||
from mistral.engine import states
|
||||
from mistral.engine.drivers.default import engine as concrete_engine
|
||||
from mistral import dsl_parser as parser
|
||||
|
||||
|
||||
@ -59,13 +59,13 @@ class FailBeforeSuccessMocker(object):
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'start_workflow_execution',
|
||||
engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
|
||||
@mock.patch.object(
|
||||
client.EngineClient, 'convey_task_result',
|
||||
engine.EngineClient, 'convey_task_result',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
|
||||
@mock.patch.object(
|
||||
engine.ScalableEngine, '_run_tasks',
|
||||
concrete_engine.DefaultEngine, '_run_tasks',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
|
||||
@mock.patch.object(
|
||||
db_api, 'workbook_get',
|
||||
|
Loading…
Reference in New Issue
Block a user