Initial commit for the new engine

* Skeleton for the new engine
* Main modules and interfaces (Engine, WorkflowHandler, policies)
* Sketches of main engine methods
* Reverse workflow handler
* Function to check valid state transitions
* Unit tests

Change-Id: Id6ae293154bca7cab69965725ea6960d5f8d8790
This commit is contained in:
Renat Akhmerov 2014-07-17 14:31:43 +07:00
parent 1dc074c293
commit 8f7a1a221a
20 changed files with 1006 additions and 1 deletions

View File

126
mistral/engine1/base.py Normal file
View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
class TaskResult(object):
"""Explicit data structure containing a result of task execution."""
def __init__(self, data=None, error=None):
self.data = data
self.error = error
def is_error(self):
return self.error is not None
def is_success(self):
return not self.is_error()
@six.add_metaclass(abc.ABCMeta)
class Engine(object):
"""Engine interface."""
@abc.abstractmethod
def start_workflow(self, workbook_name, workflow_name, task_name, input):
"""Starts the specified workflow.
:param workbook_name: Workbook name.
:param workflow_name: Workflow name.
:param task_name: Task name.
:param input: Workflow input data as a dictionary.
:return: Workflow execution object.
"""
raise NotImplemented
@abc.abstractmethod
def on_task_result(self, task_id, task_result):
"""Accepts workflow task raw result and continues the workflow.
Task result here is a raw task result which comes from a corresponding
action/workflow associated which the task is associated with.
:param task_id: Task id.
:param task_result: Task result object.
:return:
"""
raise NotImplemented
@abc.abstractmethod
def stop_workflow(self, execution_id):
"""Stops workflow execution.
:param execution_id: Execution id.
:return: Workflow execution object.
"""
raise NotImplemented
@abc.abstractmethod
def resume_workflow(self, execution_id):
"""Resumes workflow execution.
:param execution_id: Execution id.
:return: Workflow execution object.
"""
raise NotImplemented
@abc.abstractmethod
def rollback_workflow(self, execution_id):
"""Rolls back workflow execution.
:param execution_id: Execution id.
:return: Workflow execution object.
"""
raise NotImplemented
@six.add_metaclass(abc.ABCMeta)
class WorkflowPolicy(object):
"""Workflow policy.
Provides the interface to change the workflow state depending on certain
conditions.
"""
@abc.abstractmethod
def on_task_finish(self, exec_db, task_db):
"""Calculates workflow state after task completion.
:param task_db: Completed task.
:return: New workflow state.
"""
raise NotImplemented
@six.add_metaclass(abc.ABCMeta)
class TaskPolicy(object):
"""Task policy.
Provides the interface to perform any work after a has completed.
An example of task policy may be 'retry' policy that makes engine
to run a task repeatedly if it finishes with a failure.
"""
@abc.abstractmethod
def on_task_finish(self, task_db):
"""Calculates workflow state after task completion.
:param task_db: Completed task.
:return: New task state.
"""
raise NotImplemented

View File

@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from mistral.db import api as db_api
from mistral.engine1 import base
from mistral.engine1 import states
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.workflow import selector as wf_selector
LOG = logging.getLogger(__name__)
WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
# TODO(rakhmerov): Add necessary logging including WF_TRACE.
# TODO(rakhmerov): All is written here assuming data models are not dicts.
def _select_workflow_handler(exec_db):
handler_cls = wf_selector.select_workflow_handler(exec_db['wf_spec'])
if not handler_cls:
msg = 'Failed to find a workflow handler [workflow=%s.%s]' % \
(exec_db['wb_spec'].name, exec_db['wf_name'])
raise exc.EngineException(msg)
return handler_cls(exec_db)
def _create_db_execution(workbook_name, workflow_name, task_name, input):
# TODO(rakhmerov): Change DB model attributes.
return db_api.execution_create(workbook_name, {
"workbook_name": workbook_name,
"workflow_name": workflow_name,
"task": task_name,
"state": states.RUNNING,
"input": input
})
def _create_db_tasks(exec_db, task_specs):
tasks_db = []
for task_spec in task_specs:
task_db = db_api.task_create(exec_db.id, {
'execution_id': exec_db.id,
'name': task_spec.name,
'state': states.IDLE,
'specification': task_spec.to_dict(),
'parameters': {}, # TODO(rakhmerov): Evaluate.
'in_context': {}, # TODO(rakhmerov): Evaluate.
'output': {}, # TODO(rakhmerov): Evaluate.
'runtime_context': None
})
tasks_db.append(task_db)
return tasks_db
def _apply_task_policies(task_db):
# TODO(rakhmerov): Implement.
pass
def _apply_workflow_policies(exec_db, task_db):
# TODO(rakhmerov): Implement.
pass
def _run_action(t):
# TODO(rakhmerov): Implement.
pass
def _run_workflow(t):
# TODO(rakhmerov): Implement.
pass
def _process(exec_db, task_specs):
LOG.debug('Processing workflow tasks: %s' % task_specs)
tasks_db = _create_db_tasks(exec_db, task_specs)
for t in tasks_db:
if t.action:
_run_action(t)
elif t.workflow:
_run_workflow(t)
else:
msg = "Neither 'action' nor 'workflow' is defined in task" \
" specification [task_spec=%s]" % t
raise exc.WorkflowException(msg)
class DefaultEngine(base.Engine):
def start_workflow(self, workbook_name, workflow_name, task_name, input):
db_api.start_tx()
try:
exec_db = _create_db_execution(
workbook_name,
workflow_name,
task_name,
input
)
wf_handler = _select_workflow_handler(exec_db)
task_specs = wf_handler.start_workflow(task_name=task_name)
if len(task_specs) > 0:
_process(exec_db, task_specs)
db_api.commit_tx()
finally:
db_api.end_tx()
return exec_db
def on_task_result(self, task_id, task_result):
db_api.start_tx()
try:
task_db = db_api.task_get(task_id)
exec_db = db_api.execution_get(task_db.execution_id)
wf_handler = _select_workflow_handler(exec_db)
task_specs = wf_handler.on_task_result(
task_db,
task_result
)
if len(task_specs) > 0:
_apply_task_policies(task_db)
_apply_workflow_policies(exec_db, task_db)
_process(exec_db, task_specs)
db_api.commit_tx()
finally:
db_api.end_tx()
return task_db
def stop_workflow(self, execution_id):
db_api.start_tx()
try:
exec_db = db_api.execution_get(execution_id)
wf_handler = _select_workflow_handler(exec_db)
wf_handler.stop_workflow()
db_api.commit_tx()
finally:
db_api.end_tx()
return exec_db
def resume_workflow(self, execution_id):
db_api.start_tx()
try:
exec_db = db_api.execution_get(execution_id)
wf_handler = _select_workflow_handler(exec_db)
task_specs = wf_handler.resume_workflow()
if len(task_specs) > 0:
_process(exec_db, task_specs)
db_api.commit_tx()
finally:
db_api.end_tx()
return exec_db
def rollback_workflow(self, execution_id):
# TODO(rakhmerov): Implement.
raise NotImplemented

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine1 import base
# TODO(rakhmerov): Add all needed policies.
class RetryTaskPolicy(base.TaskPolicy):
def on_task_finish(self, task_db):
# TODO(rakhmerov): Implement.
raise NotImplemented

156
mistral/engine1/rpc.py Normal file
View File

@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo import messaging
from mistral import context as auth_ctx
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# TODO(rakhmerov): Add engine and executor servers so that we don't need to
# adopt them to work with rpc (taking care about transport, signatures etc.).
class EngineClient(object):
"""RPC client for the Engine."""
def __init__(self, transport):
"""Construct an RPC client for the Engine.
:param transport: Messaging transport.
:type transport: Transport.
"""
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer())
# TODO(rakhmerov): Clarify topic.
target = messaging.Target(
topic='mistral.engine1.default_engine:DefaultEngine'
)
self._client = messaging.RPCClient(
transport,
target,
serializer=serializer
)
def start_workflow(self, workbook_name, workflow_name, task_name, input):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param workbook_name: Workbook name.
:param task_name: Target task name.
:param input: Workflow input data.
:return: Workflow execution.
"""
kwargs = {
'workbook_name': workbook_name,
'workflow_name': workflow_name,
'task_name': task_name,
'input': input
}
return self._client.call(auth_ctx.ctx(), 'start_workflow', **kwargs)
def on_task_result(self, task_id, task_result):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param task_id: Task id.
:param task_result: Task result data.
:return: Task.
"""
kwargs = {
'task_id': task_id,
'task_result': task_result
}
return self._client.call(auth_ctx.ctx(), 'on_task_result', **kwargs)
def stop_workflow(self, execution_id):
"""Stops the workflow with the given execution id.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
kwargs = {'execution_id': execution_id}
return self._client.call(auth_ctx.ctx(), 'stop_workflow', **kwargs)
def resume_workflow(self, execution_id):
"""Resumes the workflow with the given execution id.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
kwargs = {'execution_id': execution_id}
return self._client.call(auth_ctx.ctx(), 'resume_workflow', **kwargs)
def rollback_workflow(self, execution_id):
"""Rolls back the workflow with the given execution id.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
kwargs = {'execution_id': execution_id}
return self._client.call(auth_ctx.ctx(), 'rollback_workflow', **kwargs)
class ExecutorClient(object):
"""RPC client for Executor."""
def __init__(self, transport):
"""Construct an RPC client for the Executor.
:param transport: Messaging transport.
:type transport: Transport.
"""
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer())
# TODO(rakhmerov): Clarify topic.
target = messaging.Target(
topic='mistral.engine1.default_engine:DefaultExecutor'
)
self._client = messaging.RPCClient(
transport,
target,
serializer=serializer
)
# TODO(rakhmerov): Most likely it will be a different method.
def handle_task(self, cntx, **kwargs):
"""Send the task request to Executor for execution.
:param cntx: a request context dict
:type cntx: MistralContext
:param kwargs: a dict of method arguments
:type kwargs: dict
"""
return self._client.cast(cntx, 'handle_task', **kwargs)

61
mistral/engine1/states.py Normal file
View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Valid task and workflow states."""
IDLE = 'IDLE'
RUNNING = 'RUNNING'
STOPPED = 'STOPPED'
DELAYED = 'DELAYED'
SUCCESS = 'SUCCESS'
ERROR = 'ERROR'
_ALL = [IDLE, RUNNING, SUCCESS, ERROR, STOPPED, DELAYED]
_VALID_TRANSITIONS = {
IDLE: [RUNNING, ERROR],
RUNNING: [STOPPED, DELAYED, SUCCESS, ERROR],
STOPPED: [RUNNING, ERROR],
DELAYED: [RUNNING, ERROR],
SUCCESS: [],
ERROR: []
}
def is_valid(state):
return state in _ALL
def is_invalid(state):
return not is_valid(state)
def is_finished(state):
return state in [SUCCESS, ERROR]
def is_stopped_or_finished(state):
return state == STOPPED or is_finished(state)
def is_valid_transition(from_state, to_state):
if is_invalid(from_state) or is_invalid(to_state):
return False
if from_state == to_state:
return True
return to_state in _VALID_TRANSITIONS[from_state]

View File

@ -74,7 +74,11 @@ class ActionRegistrationException(MistralException):
class EngineException(MistralException):
pass
http_code = 500
class WorkflowException(MistralException):
http_code = 400
class ApplicationContextNotFoundException(MistralException):

View File

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import log as logging
from mistral.tests import base
LOG = logging.getLogger(__name__)
class DefaultEngineTest(base.BaseTest):
def test_start_workflow(self):
# TODO(rakhmerov): Implement.
pass

View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine1 import states as s
from mistral.openstack.common import log as logging
from mistral.tests import base
LOG = logging.getLogger(__name__)
class StatesModuleTest(base.BaseTest):
def test_is_valid_transition(self):
# From IDLE
self.assertTrue(s.is_valid_transition(s.IDLE, s.IDLE))
self.assertTrue(s.is_valid_transition(s.IDLE, s.RUNNING))
self.assertTrue(s.is_valid_transition(s.IDLE, s.ERROR))
self.assertFalse(s.is_valid_transition(s.IDLE, s.STOPPED))
self.assertFalse(s.is_valid_transition(s.IDLE, s.DELAYED))
self.assertFalse(s.is_valid_transition(s.IDLE, s.SUCCESS))
# From RUNNING
self.assertTrue(s.is_valid_transition(s.RUNNING, s.RUNNING))
self.assertTrue(s.is_valid_transition(s.RUNNING, s.ERROR))
self.assertTrue(s.is_valid_transition(s.RUNNING, s.STOPPED))
self.assertTrue(s.is_valid_transition(s.RUNNING, s.DELAYED))
self.assertTrue(s.is_valid_transition(s.RUNNING, s.SUCCESS))
self.assertFalse(s.is_valid_transition(s.RUNNING, s.IDLE))
# From STOPPED
self.assertTrue(s.is_valid_transition(s.STOPPED, s.STOPPED))
self.assertTrue(s.is_valid_transition(s.STOPPED, s.RUNNING))
self.assertTrue(s.is_valid_transition(s.STOPPED, s.ERROR))
self.assertFalse(s.is_valid_transition(s.STOPPED, s.DELAYED))
self.assertFalse(s.is_valid_transition(s.STOPPED, s.SUCCESS))
self.assertFalse(s.is_valid_transition(s.STOPPED, s.IDLE))
# From DELAYED
self.assertTrue(s.is_valid_transition(s.DELAYED, s.DELAYED))
self.assertTrue(s.is_valid_transition(s.DELAYED, s.RUNNING))
self.assertTrue(s.is_valid_transition(s.DELAYED, s.ERROR))
self.assertFalse(s.is_valid_transition(s.DELAYED, s.STOPPED))
self.assertFalse(s.is_valid_transition(s.DELAYED, s.SUCCESS))
self.assertFalse(s.is_valid_transition(s.DELAYED, s.IDLE))
# From SUCCESS
self.assertTrue(s.is_valid_transition(s.SUCCESS, s.SUCCESS))
self.assertFalse(s.is_valid_transition(s.SUCCESS, s.RUNNING))
self.assertFalse(s.is_valid_transition(s.SUCCESS, s.ERROR))
self.assertFalse(s.is_valid_transition(s.SUCCESS, s.STOPPED))
self.assertFalse(s.is_valid_transition(s.SUCCESS, s.DELAYED))
self.assertFalse(s.is_valid_transition(s.SUCCESS, s.IDLE))
# From ERROR
self.assertTrue(s.is_valid_transition(s.ERROR, s.ERROR))
self.assertFalse(s.is_valid_transition(s.ERROR, s.RUNNING))
self.assertFalse(s.is_valid_transition(s.ERROR, s.STOPPED))
self.assertFalse(s.is_valid_transition(s.ERROR, s.DELAYED))
self.assertFalse(s.is_valid_transition(s.ERROR, s.SUCCESS))
self.assertFalse(s.is_valid_transition(s.ERROR, s.IDLE))

View File

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import log as logging
from mistral.tests import base
LOG = logging.getLogger(__name__)
class DirectWorkflowHandlerTest(base.BaseTest):
def test_start_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_stop_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_resume_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_on_task_result_workflow(self):
# TODO(rakhmerov): Implement.
pass

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import log as logging
from mistral.tests import base
LOG = logging.getLogger(__name__)
class ReverseWorkflowHandlerTest(base.BaseTest):
def test_start_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_stop_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_resume_workflow(self):
# TODO(rakhmerov): Implement.
pass
def test_on_task_result_workflow(self):
# TODO(rakhmerov): Implement.
pass

View File

View File

View File

121
mistral/workflow/base.py Normal file
View File

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from mistral.engine1 import states
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class WorkflowHandler(object):
"""Workflow Handler base class.
Different workflow handler implement different workflow algorithms.
In practice it may actually mean that there may be multiple ways of
describing workflow models (and even languages) that will be supported
by Mistral.
"""
def __init__(self, exec_db):
"""Creates new workflow handler.
:param exec_db: Execution.
"""
self.exec_db = exec_db
def is_stopped_or_finished(self):
return states.is_stopped_or_finished(self.exec_db.state)
def stop_workflow(self):
"""Stops workflow this handler is associated with.
:return: Execution object.
"""
state = self.exec_db.state
if states.is_valid_transition(state, states.STOPPED):
self.exec_db.state = states.STOPPED
LOG.info('Stopped workflow [execution=%s]' % self.exec_db)
else:
LOG.info("Can't change workflow state [execution=%s,"
" state=%s, new state=%s]" %
(self.exec_db, state, states.STOPPED))
return self.exec_db
def resume_workflow(self):
"""Resumes workflow this handler is associated with.
:return: Tasks available to run.
"""
state = self.exec_db.state
if states.is_valid_transition(state, states.RUNNING):
self.exec_db.state = states.RUNNING
LOG.info('Resumed workflow [execution=%s]' % self.exec_db)
else:
LOG.info("Can't change workflow state [execution=%s,"
" state=%s, new state=%s]" %
(self.exec_db, state, states.RUNNING))
# TODO(rakhmerov): A concrete handler should also find tasks to run.
return []
@abc.abstractmethod
def start_workflow(self, **kwargs):
"""Starts workflow.
Given a workflow specification this method makes required analysis
according to this workflow type rules and identifies a list of
tasks that can be scheduled for execution.
:param kwargs: Additional parameters specific to workflow type.
:return: List of tasks that can be scheduled for execution.
"""
raise NotImplemented
@abc.abstractmethod
def on_task_result(self, task, task_result):
"""Handles event of arriving a task result.
Given task result performs analysis of the workflow execution and
identifies tasks that can be scheduled for execution.
:param task: Task that the result corresponds to.
:param task_result: Task result.
:return List of tasks that can be scheduled for execution.
"""
raise NotImplemented
class FlowControl(object):
"""Flow control structure.
Expresses a control structure that influences the way how workflow
execution goes at a certain point.
"""
def decide(self, upstream_tasks, downstream_tasks):
"""Makes a decision in a form of changed states of downstream tasks.
:param upstream_tasks: Upstream workflow tasks.
:param downstream_tasks: Downstream workflow tasks.
:return: Dictionary {task: state} for those tasks whose states
have changed. {task} is a subset of {downstream_tasks}.
"""
raise NotImplemented

View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.workflow import base
class DirectWorkflowHandler(base.WorkflowHandler):
@classmethod
def start_workflow(cls, workflow_spec, execution, **kwargs):
# TODO(rakhmerov): Implement.
raise NotImplemented
def on_task_result(cls, workflow_spec, execution, task, task_result):
# TODO(rakhmerov): Implement.
raise NotImplemented

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine1 import states
from mistral import exceptions as exc
from mistral.workflow import base
class ReverseWorkflowHandler(base.WorkflowHandler):
"""'Reverse workflow' handler.
This handler implements the workflow pattern which is based on
dependencies between tasks, i.e. each task in a workflow graph
may be dependent on other tasks. To run this type of workflow
user must specify a task name that serves a target node in the
graph that the algorithm should come to by resolving all
dependencies.
For example, if there's a workflow consisting of two tasks
'A' and 'B' where 'A' depends on 'B' and if we specify a target
task name 'A' then the handler first will run task 'B' and then,
when a dependency of 'A' is resolved, will run task 'A'.
"""
def start_workflow(self, **kwargs):
wf_spec = self.exec_db.wf_spec
task_name = kwargs.get('task_name')
if task_name not in wf_spec.tasks:
msg = 'Invalid task name [wf_spec=%s, task_name=%s]' % \
(wf_spec, task_name)
raise exc.WorkflowException(msg)
return self._find_tasks_with_no_dependencies(task_name)
def on_task_result(self, task, task_result):
task.state = states.ERROR if task_result.is_error() else states.SUCCESS
task.output = task_result.data
if task.state == states.ERROR:
# No need to check state transition since it's possible to switch
# to ERROR state from any other state.
self.exec_db.state = states.ERROR
return []
return self._find_resolved_tasks()
def _find_tasks_with_no_dependencies(self, target_task_name):
"""Given a target task name finds tasks with no dependencies.
:param target_task_name: Name of the target task in the workflow graph
that dependencies are unwound from.
:return: Tasks with no dependencies.
"""
# TODO(rakhmerov): Implement.
raise NotImplemented
def _find_resolved_tasks(self):
"""Finds all tasks with resolved dependencies.
:return: Tasks with resolved dependencies.
"""
# TODO(rakhmerov): Implement.
raise NotImplemented

View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.workflow import direct_workflow
from mistral.workflow import reverse_workflow
# TODO(rakhmerov): Take DSL versions into account.
def select_workflow_handler(workflow_spec):
# TODO(rakhmerov): This algorithm is actually for DSL v2.
wf_type = workflow_spec.type or 'direct'
if wf_type == 'reverse':
return reverse_workflow.ReverseWorkflowHandler
if wf_type == 'direct':
return direct_workflow.DirectWorkflowHandler
return None