Removing unnecessary workflow specification parsing

* If a workflow is large then parsing a spec takes significant
  time and it's too expensive to do it often. For example,
  on a regular laptop parsing a workflow consisting from
  100 tasks takes about 2 seconds. That was the reason why
  such a workflow used to take so much time to start, ~200
  seconds. This patch eliminates unnecessary parsing by
  applying caching for workflow specifications.

Change-Id: Iabfdea2182a81cda07a8692f5b84c6bfb561bb83
This commit is contained in:
Renat Akhmerov 2016-07-27 17:48:06 +07:00
parent c7aa89e03d
commit 5d51dfcc5b
16 changed files with 172 additions and 47 deletions

View File

@ -61,9 +61,7 @@ def _build_action(action_ex):
if action_ex.workflow_name:
wf_name = action_ex.workflow_name
wf_spec = spec_parser.get_workflow_spec(
action_ex.task_execution.workflow_execution.spec
)
wf_spec = spec_parser.get_workflow_spec_by_id(action_ex.workflow_id)
wf_spec_name = wf_spec.get_name()
adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name')

View File

@ -452,7 +452,9 @@ class WorkflowAction(Action):
assert not self.action_ex
parent_wf_ex = self.task_ex.workflow_execution
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
parent_wf_spec = spec_parser.get_workflow_spec_by_id(
parent_wf_ex.workflow_id
)
task_spec = spec_parser.get_task_spec(self.task_ex.spec)
@ -464,7 +466,7 @@ class WorkflowAction(Action):
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id)
wf_params = {
'task_execution_id': self.task_ex.id,

View File

@ -82,6 +82,7 @@ def on_action_complete(action_ex):
task = _create_task(
wf_ex,
spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id),
task_spec,
task_ex.in_context,
task_ex
@ -109,7 +110,10 @@ def on_action_complete(action_ex):
def fail_task(task_ex, msg):
task = _build_task_from_execution(task_ex)
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
)
task.set_state(states.ERROR, msg)
@ -117,7 +121,10 @@ def fail_task(task_ex, msg):
def continue_task(task_ex):
task = _build_task_from_execution(task_ex)
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
)
try:
task.run()
@ -142,7 +149,10 @@ def continue_task(task_ex):
def complete_task(task_ex, state, state_info):
task = _build_task_from_execution(task_ex)
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
)
try:
task.complete(state, state_info)
@ -166,19 +176,22 @@ def complete_task(task_ex, state, state_info):
wf_handler.on_task_complete(task_ex)
def _build_task_from_execution(task_ex, task_spec=None):
def _build_task_from_execution(wf_spec, task_ex, task_spec=None):
return _create_task(
task_ex.workflow_execution,
wf_spec,
task_spec or spec_parser.get_task_spec(task_ex.spec),
task_ex.in_context,
task_ex
)
@profiler.trace('task-handler-build-task-from-command')
def _build_task_from_command(cmd):
if isinstance(cmd, wf_cmds.RunExistingTask):
task = _create_task(
cmd.wf_ex,
cmd.wf_spec,
spec_parser.get_task_spec(cmd.task_ex.spec),
cmd.ctx,
cmd.task_ex
@ -190,7 +203,7 @@ def _build_task_from_command(cmd):
return task
if isinstance(cmd, wf_cmds.RunTask):
task = _create_task(cmd.wf_ex, cmd.task_spec, cmd.ctx)
task = _create_task(cmd.wf_ex, cmd.wf_spec, cmd.task_spec, cmd.ctx)
if cmd.is_waiting():
task.defer()
@ -200,8 +213,8 @@ def _build_task_from_command(cmd):
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
def _create_task(wf_ex, task_spec, ctx, task_ex=None):
def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None):
if task_spec.get_with_items():
return tasks.WithItemsTask(wf_ex, task_spec, ctx, task_ex)
return tasks.WithItemsTask(wf_ex, wf_spec, task_spec, ctx, task_ex)
return tasks.RegularTask(wf_ex, task_spec, ctx, task_ex)
return tasks.RegularTask(wf_ex, wf_spec, task_spec, ctx, task_ex)

View File

@ -28,7 +28,6 @@ from mistral import exceptions as exc
from mistral import expressions as expr
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import data_flow
from mistral.workflow import states
@ -47,12 +46,13 @@ class Task(object):
Mistral engine or its components in order to manipulate with tasks.
"""
def __init__(self, wf_ex, task_spec, ctx, task_ex=None):
@profiler.trace('task-create')
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None):
self.wf_ex = wf_ex
self.task_spec = task_spec
self.ctx = ctx
self.task_ex = task_ex
self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self.wf_spec = wf_spec
self.waiting = False
self.reset_flag = False

View File

@ -56,7 +56,7 @@ class Workflow(object):
def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def
self.wf_ex = wf_ex
self.wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
self.wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id)
@profiler.trace('workflow-start')
def start(self, input_dict, desc='', params=None):

View File

@ -35,7 +35,7 @@ from mistral.services import security
from mistral.tests.unit import config as test_config
from mistral.utils import inspect_utils as i_utils
from mistral import version
from mistral.workbook import parser as spec_parser
RESOURCES_PATH = 'tests/resources/'
LOG = logging.getLogger(__name__)
@ -98,6 +98,11 @@ class FakeHTTPResponse(object):
class BaseTest(base.BaseTestCase):
def setUp(self):
super(BaseTest, self).setUp()
self.addCleanup(spec_parser.clear_caches)
def assertListEqual(self, l1, l2):
if tuple(sys.version_info)[0:2] < (2, 7):
# for python 2.6 compatibility

View File

@ -38,7 +38,7 @@ class InspectUtilsTest(base.BaseTest):
clazz = commands.RunTask
parameters_str = i_u.get_arg_list_as_str(clazz.__init__)
self.assertEqual("wf_ex, task_spec, ctx", parameters_str)
self.assertEqual('wf_ex, wf_spec, task_spec, ctx', parameters_str)
def test_get_parameters_str_with_function_parameter(self):
@ -46,4 +46,5 @@ class InspectUtilsTest(base.BaseTest):
pass
parameters_str = i_u.get_arg_list_as_str(test_func)
self.assertEqual("foo, bar=null", parameters_str)

View File

@ -0,0 +1,38 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
class SpecificationCachingTest(base.DbTestCase):
def test_workflow_spec_caching(self):
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="Echo"
"""
wfs = wf_service.create_workflows(wf_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
self.assertIsNotNone(wf_spec)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())

View File

@ -17,6 +17,7 @@ import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import direct_workflow as d_wf
@ -25,14 +26,15 @@ from mistral.workflow import states
class DirectWorkflowControllerTest(base.DbTestCase):
def _prepare_test(self, wf_text):
wf_spec = spec_parser.get_workflow_list_spec_from_yaml(wf_text)[0]
wfs = wf_service.create_workflows(wf_text)
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
wf_ex = models.WorkflowExecution()
wf_ex.update({
'id': '1-2-3-4',
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
wf_ex = models.WorkflowExecution(
id='1-2-3-4',
spec=wf_spec.to_dict(),
state=states.RUNNING,
workflow_id=wfs[0].id
)
self.wf_ex = wf_ex
self.wf_spec = wf_spec

View File

@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import reverse_workflow as reverse_wf
@ -41,17 +43,22 @@ workflows:
"""
class ReverseWorkflowControllerTest(base.BaseTest):
class ReverseWorkflowControllerTest(base.DbTestCase):
def setUp(self):
super(ReverseWorkflowControllerTest, self).setUp()
wb_service.create_workbook_v2(WB)
wf_def = db_api.get_workflow_definitions()[0]
wb_spec = spec_parser.get_workbook_spec_from_yaml(WB)
wf_ex = models.WorkflowExecution(
id='1-2-3-4',
spec=wb_spec.get_workflows().get('wf').to_dict(),
state=states.RUNNING,
params={}
params={},
workflow_id=wf_def.id
)
self.wf_ex = wf_ex

View File

@ -13,11 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from cachetools import cached
from cachetools import LRUCache
from threading import RLock
import yaml
from yaml import error
import six
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.workbook import base
from mistral.workbook.v2 import actions as actions_v2
@ -30,6 +34,9 @@ V2_0 = '2.0'
ALL_VERSIONS = [V2_0]
_WF_CACHE = LRUCache(maxsize=100)
def parse_yaml(text):
"""Loads a text in YAML format as dictionary object.
@ -61,7 +68,6 @@ def _get_spec_version(spec_dict):
# Factory methods to get specifications either from raw YAML formatted text or
# from dictionaries parsed from YAML formatted text.
def get_workbook_spec(spec_dict):
if _get_spec_version(spec_dict) == V2_0:
return base.instantiate_spec(wb_v2.WorkbookSpec, spec_dict)
@ -97,6 +103,15 @@ def get_action_list_spec_from_yaml(text):
def get_workflow_spec(spec_dict):
"""Get workflow specification object from dictionary.
NOTE: For large workflows this method can work very long (seconds).
For this reason, method 'get_workflow_spec_by_id' should be used
whenever possible because it caches specification objects by
workflow definition id.
:param spec_dict: Raw specification dictionary.
"""
if _get_spec_version(spec_dict) == V2_0:
return base.instantiate_spec(wf_v2.WorkflowSpec, spec_dict)
@ -129,7 +144,7 @@ def get_workflow_definition(wb_def, wf_name):
def get_action_definition(wb_def, action_name):
action_name = action_name + ":"
action_name += ":"
return _parse_def_from_wb(wb_def, "actions:", action_name)
@ -166,3 +181,25 @@ def _parse_def_from_wb(wb_def, section_name, item_name):
definition = ''.join(definition).rstrip() + '\n'
return definition
# Methods for obtaining specifications in a more efficient way using
# caching techniques.
@cached(_WF_CACHE, lock=RLock())
def get_workflow_spec_by_id(wf_def_id):
if not wf_def_id:
return None
wf_def = db_api.get_workflow_definition(wf_def_id)
return get_workflow_spec(wf_def.spec)
def get_workflow_spec_cache_size():
return len(_WF_CACHE)
def clear_caches():
"""Clears all specification caches."""
_WF_CACHE.clear()

View File

@ -43,7 +43,7 @@ def get_controller(wf_ex, wf_spec=None):
"""
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id)
wf_type = wf_spec.get_type()
@ -81,7 +81,7 @@ class WorkflowController(object):
self.wf_ex = wf_ex
if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id)
self.wf_spec = wf_spec
@ -201,7 +201,10 @@ class WorkflowController(object):
for task_ex in idle_tasks:
self._update_task_ex_env(task_ex, env)
return [commands.RunExistingTask(t) for t in idle_tasks]
return [
commands.RunExistingTask(self.wf_ex, self.wf_spec, t)
for t in idle_tasks
]
def _get_rerun_commands(self, task_exs, reset=True, env=None):
"""Get commands to rerun existing task executions.
@ -218,7 +221,10 @@ class WorkflowController(object):
# state. Fix it, it should happen outside.
self._update_task_ex_env(task_ex, env)
cmds = [commands.RunExistingTask(t_e, reset) for t_e in task_exs]
cmds = [
commands.RunExistingTask(self.wf_ex, self.wf_spec, t_e, reset)
for t_e in task_exs
]
LOG.debug("Found commands: %s" % cmds)

View File

@ -27,8 +27,9 @@ class WorkflowCommand(object):
knows what to do next.
"""
def __init__(self, wf_ex, task_spec, ctx):
def __init__(self, wf_ex, wf_spec, task_spec, ctx):
self.wf_ex = wf_ex
self.wf_spec = wf_spec
self.task_spec = task_spec
self.ctx = ctx or {}
@ -43,8 +44,8 @@ class Noop(WorkflowCommand):
class RunTask(WorkflowCommand):
"""Instruction to run a workflow task."""
def __init__(self, wf_ex, task_spec, ctx):
super(RunTask, self).__init__(wf_ex, task_spec, ctx)
def __init__(self, wf_ex, wf_spec, task_spec, ctx):
super(RunTask, self).__init__(wf_ex, wf_spec, task_spec, ctx)
self.wait = False
@ -63,9 +64,10 @@ class RunTask(WorkflowCommand):
class RunExistingTask(WorkflowCommand):
"""Command for running already existent task."""
def __init__(self, task_ex, reset=True):
def __init__(self, wf_ex, wf_spec, task_ex, reset=True):
super(RunExistingTask, self).__init__(
task_ex.workflow_execution,
wf_ex,
wf_spec,
spec_parser.get_task_spec(task_ex.spec),
task_ex.in_context
)
@ -77,8 +79,8 @@ class RunExistingTask(WorkflowCommand):
class SetWorkflowState(WorkflowCommand):
"""Instruction to change a workflow state."""
def __init__(self, wf_ex, task_spec, ctx, new_state, msg):
super(SetWorkflowState, self).__init__(wf_ex, task_spec, ctx)
def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg):
super(SetWorkflowState, self).__init__(wf_ex, wf_spec, task_spec, ctx)
self.new_state = new_state
self.msg = msg
@ -87,9 +89,10 @@ class SetWorkflowState(WorkflowCommand):
class FailWorkflow(SetWorkflowState):
"""Instruction to fail a workflow."""
def __init__(self, wf_ex, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
super(FailWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.ERROR,
@ -103,9 +106,10 @@ class FailWorkflow(SetWorkflowState):
class SucceedWorkflow(SetWorkflowState):
"""Instruction to succeed a workflow."""
def __init__(self, wf_ex, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
super(SucceedWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.SUCCESS,
@ -119,9 +123,10 @@ class SucceedWorkflow(SetWorkflowState):
class PauseWorkflow(SetWorkflowState):
"""Instruction to pause a workflow."""
def __init__(self, wf_ex, task_spec, ctx, msg=None):
def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None):
super(PauseWorkflow, self).__init__(
wf_ex,
wf_spec,
task_spec,
ctx,
states.PAUSED,
@ -146,10 +151,17 @@ def get_command_class(cmd_name):
return RESERVED_CMDS[cmd_name] if cmd_name in RESERVED_CMDS else None
def create_command(cmd_name, wf_ex, task_spec, ctx, explicit_params=None):
def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx,
explicit_params=None):
cmd_cls = get_command_class(cmd_name) or RunTask
if issubclass(cmd_cls, SetWorkflowState):
return cmd_cls(wf_ex, task_spec, ctx, explicit_params.get('msg'))
return cmd_cls(
wf_ex,
wf_spec,
task_spec,
ctx,
explicit_params.get('msg')
)
else:
return cmd_cls(wf_ex, task_spec, ctx)
return cmd_cls(wf_ex, wf_spec, task_spec, ctx)

View File

@ -87,6 +87,7 @@ class DirectWorkflowController(base.WorkflowController):
return [
commands.RunTask(
self.wf_ex,
self.wf_spec,
t_s,
self._get_task_inbound_context(t_s)
)
@ -114,6 +115,7 @@ class DirectWorkflowController(base.WorkflowController):
cmd = commands.create_command(
t_n,
self.wf_ex,
self.wf_spec,
t_s,
self._get_task_inbound_context(t_s),
params

View File

@ -55,6 +55,7 @@ class ReverseWorkflowController(base.WorkflowController):
return cmds + [
commands.RunTask(
self.wf_ex,
self.wf_spec,
t_s,
self._get_task_inbound_context(t_s)
)

View File

@ -5,6 +5,7 @@
alembic>=0.8.4 # MIT
Babel>=2.3.4 # BSD
croniter>=0.3.4 # MIT License
cachetools>=1.0.0 # MIT License
eventlet!=0.18.3,>=0.18.2 # MIT
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
keystonemiddleware!=4.1.0,!=4.5.0,>=4.0.0 # Apache-2.0