diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 14a7a6d99..37a6cdbe7 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -36,6 +36,63 @@ from mistral import utils LOG = logging.getLogger(__name__) +def _get_hash_function_by(column_name): + def calc_hash(context): + val = context.current_parameters[column_name] or {} + + if isinstance(val, dict): + # If the value is a dictionary we need to make sure to have + # keys in the same order in a string representation. + hash_base = json.dumps(sorted(val.items())) + else: + hash_base = str(val) + + return hashlib.sha256(hash_base.encode('utf-8')).hexdigest() + + return calc_hash + + +def validate_long_type_length(cls, field_name, value): + """Makes sure the value does not exceeds the maximum size.""" + if value: + # Get the configured limit. + size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb + + # If the size is unlimited. + if size_limit_kb < 0: + return + + size_kb = int(sys.getsizeof(str(value)) / 1024) + + if size_kb > size_limit_kb: + LOG.error( + "Size limit %dKB exceed for class [%s], " + "field %s of size %dKB.", + size_limit_kb, str(cls), field_name, size_kb + ) + + raise exc.SizeLimitExceededException( + field_name, + size_kb, + size_limit_kb + ) + + +def register_length_validator(attr_name): + """Register an event listener on the attribute. + + This event listener will validate the size every + time a 'set' occurs. + """ + for cls in utils.iter_subclasses(Execution): + if hasattr(cls, attr_name): + event.listen( + getattr(cls, attr_name), + 'set', + lambda t, v, o, i: validate_long_type_length(cls, attr_name, v) + ) + + class Definition(mb.MistralSecureModelBase): __abstract__ = True @@ -200,46 +257,6 @@ for cls in utils.iter_subclasses(Execution): ) -def validate_long_type_length(cls, field_name, value): - """Makes sure the value does not exceeds the maximum size.""" - if value: - # Get the configured limit. - size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb - - # If the size is unlimited. - if size_limit_kb < 0: - return - - size_kb = int(sys.getsizeof(str(value)) / 1024) - - if size_kb > size_limit_kb: - LOG.error( - "Size limit %dKB exceed for class [%s], " - "field %s of size %dKB.", - size_limit_kb, str(cls), field_name, size_kb - ) - - raise exc.SizeLimitExceededException( - field_name, - size_kb, - size_limit_kb - ) - - -def register_length_validator(attr_name): - """Register an event listener on the attribute. - - This event listener will validate the size every - time a 'set' occurs. - """ - for cls in utils.iter_subclasses(Execution): - if hasattr(cls, attr_name): - event.listen( - getattr(cls, attr_name), - 'set', - lambda t, v, o, i: validate_long_type_length(cls, attr_name, v) - ) - # Many-to-one for 'ActionExecution' and 'TaskExecution'. ActionExecution.task_execution_id = sa.Column( @@ -350,16 +367,6 @@ class Environment(mb.MistralSecureModelBase): variables = sa.Column(st.JsonDictType()) -def _get_hash_function_by(column_name): - def calc_hash(context): - d = context.current_parameters[column_name] or {} - - return hashlib.sha256(json.dumps(sorted(d.items())). - encode('utf-8')).hexdigest() - - return calc_hash - - class CronTrigger(mb.MistralSecureModelBase): """Contains info about cron triggers.""" diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index e329e1d70..e18a9b898 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -61,7 +61,9 @@ def _build_action(action_ex): if action_ex.workflow_name: wf_name = action_ex.workflow_name - wf_spec = spec_parser.get_workflow_spec_by_id(action_ex.workflow_id) + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + action_ex.task_execution.workflow_execution_id + ) wf_spec_name = wf_spec.get_name() adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name') diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index a72de9a3f..c130c36f0 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -451,8 +451,8 @@ class WorkflowAction(Action): assert not self.action_ex parent_wf_ex = self.task_ex.workflow_execution - parent_wf_spec = spec_parser.get_workflow_spec_by_id( - parent_wf_ex.workflow_id + parent_wf_spec = spec_parser.get_workflow_spec_by_execution_id( + parent_wf_ex.id ) task_spec = spec_parser.get_task_spec(self.task_ex.spec) @@ -465,7 +465,10 @@ class WorkflowAction(Action): wf_spec_name ) - wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) wf_params = { 'task_execution_id': self.task_ex.id, diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 695134d4e..1f1a424af 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -97,7 +97,7 @@ def _on_action_complete(action_ex): task = _create_task( wf_ex, - spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id), + spec_parser.get_workflow_spec_by_execution_id(wf_ex.id), task_spec, task_ex.in_context, task_ex @@ -125,22 +125,24 @@ def _on_action_complete(action_ex): def fail_task(task_ex, msg): - task = _build_task_from_execution( - spec_parser.get_workflow_spec_by_id(task_ex.workflow_id), - task_ex + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + task_ex.workflow_execution_id ) + task = _build_task_from_execution(wf_spec, task_ex) + task.set_state(states.ERROR, msg) wf_handler.fail_workflow(task_ex.workflow_execution, msg) def continue_task(task_ex): - task = _build_task_from_execution( - spec_parser.get_workflow_spec_by_id(task_ex.workflow_id), - task_ex + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + task_ex.workflow_execution_id ) + task = _build_task_from_execution(wf_spec, task_ex) + try: task.set_state(states.RUNNING, None) @@ -166,11 +168,12 @@ def continue_task(task_ex): def complete_task(task_ex, state, state_info): - task = _build_task_from_execution( - spec_parser.get_workflow_spec_by_id(task_ex.workflow_id), - task_ex + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + task_ex.workflow_execution_id ) + task = _build_task_from_execution(wf_spec, task_ex) + try: task.complete(state, state_info) except exc.MistralException as e: @@ -263,9 +266,13 @@ def _check_task_start_allowed(task_ex_id): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) + wf_spec = spec_parser.get_workflow_spec_by_execution_id( + task_ex.workflow_execution_id + ) + wf_ctrl = wf_base.get_controller( task_ex.workflow_execution, - spec_parser.get_workflow_spec_by_id(task_ex.workflow_id) + wf_spec ) if wf_ctrl.is_task_start_allowed(task_ex): diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index ab3447673..61f3746ef 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -56,7 +56,10 @@ class Workflow(object): def __init__(self, wf_def, wf_ex=None): self.wf_def = wf_def self.wf_ex = wf_ex - self.wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id) + self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) @profiler.trace('workflow-start') def start(self, input_dict, desc='', params=None): diff --git a/mistral/services/workbooks.py b/mistral/services/workbooks.py index f8a981f4d..07f3939c5 100644 --- a/mistral/services/workbooks.py +++ b/mistral/services/workbooks.py @@ -46,10 +46,6 @@ def update_workbook_v2(definition, scope='private'): _, db_wfs = _on_workbook_update(wb_db, wb_spec) - # Once transaction has committed we need to update specification cache. - for db_wf, wf_spec in zip(db_wfs, wb_spec.get_workflows()): - spec_parser.update_workflow_cache(db_wf.id, wf_spec) - return wb_db diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index a70cef97d..649edc1c7 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -102,11 +102,6 @@ def update_workflows(definition, scope='private', identifier=None): identifier=identifier )) - # Once transaction has committed we need to update specification cache. - - for db_wf, wf_spec in zip(db_wfs, wf_list_spec.get_workflows()): - spec_parser.update_workflow_cache(db_wf.id, wf_spec) - return db_wfs diff --git a/mistral/tests/unit/workbook/test_spec_caching.py b/mistral/tests/unit/workbook/test_spec_caching.py index 5d863516f..0ffaae3b0 100644 --- a/mistral/tests/unit/workbook/test_spec_caching.py +++ b/mistral/tests/unit/workbook/test_spec_caching.py @@ -32,12 +32,17 @@ class SpecificationCachingTest(base.DbTestCase): wfs = wf_service.create_workflows(wf_text) - self.assertEqual(0, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size()) - wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wfs[0].id, + wfs[0].updated_at + ) self.assertIsNotNone(wf_spec) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) def test_workflow_spec_cache_update_via_workflow_service(self): wf_text = """ @@ -51,12 +56,17 @@ class SpecificationCachingTest(base.DbTestCase): wfs = wf_service.create_workflows(wf_text) - self.assertEqual(0, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size()) - wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wfs[0].id, + wfs[0].updated_at + ) self.assertEqual(1, len(wf_spec.get_tasks())) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) # Now update workflow definition and check that cache is updated too. @@ -74,12 +84,16 @@ class SpecificationCachingTest(base.DbTestCase): wfs = wf_service.update_workflows(wf_text) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) - wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wfs[0].id, + wfs[0].updated_at + ) self.assertEqual(2, len(wf_spec.get_tasks())) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) def test_workflow_spec_cache_update_via_workbook_service(self): wb_text = """ @@ -96,14 +110,19 @@ class SpecificationCachingTest(base.DbTestCase): wb_service.create_workbook_v2(wb_text) - self.assertEqual(0, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size()) wf = db_api.get_workflow_definition('wb.wf') - wf_spec = spec_parser.get_workflow_spec_by_id(wf.id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf.id, + wf.updated_at + ) self.assertEqual(1, len(wf_spec.get_tasks())) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) # Now update workflow definition and check that cache is updated too. @@ -124,9 +143,16 @@ class SpecificationCachingTest(base.DbTestCase): wb_service.update_workbook_v2(wb_text) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size()) - wf_spec = spec_parser.get_workflow_spec_by_id(wf.id) + wf = db_api.get_workflow_definition(wf.id) + + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf.id, + wf.updated_at + ) self.assertEqual(2, len(wf_spec.get_tasks())) - self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size()) + self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size()) diff --git a/mistral/tests/unit/workflow/test_direct_workflow.py b/mistral/tests/unit/workflow/test_direct_workflow.py index 16600f3ad..a97060e26 100644 --- a/mistral/tests/unit/workflow/test_direct_workflow.py +++ b/mistral/tests/unit/workflow/test_direct_workflow.py @@ -27,7 +27,10 @@ from mistral.workflow import states class DirectWorkflowControllerTest(base.DbTestCase): def _prepare_test(self, wf_text): wfs = wf_service.create_workflows(wf_text) - wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wfs[0].id, + wfs[0].updated_at + ) wf_ex = models.WorkflowExecution( id='1-2-3-4', @@ -38,7 +41,8 @@ class DirectWorkflowControllerTest(base.DbTestCase): self.wf_ex = wf_ex self.wf_spec = wf_spec - self.wf_ctrl = d_wf.DirectWorkflowController(wf_ex) + + return wf_ex def _create_task_execution(self, name, state): tasks_spec = self.wf_spec.get_tasks() @@ -54,8 +58,10 @@ class DirectWorkflowControllerTest(base.DbTestCase): return task_ex + @mock.patch.object(db_api, 'get_workflow_execution') @mock.patch.object(db_api, 'get_task_execution') - def test_continue_workflow(self, get_task_execution): + def test_continue_workflow(self, get_task_execution, + get_workflow_execution): wf_text = """--- version: '2.0' @@ -78,16 +84,20 @@ class DirectWorkflowControllerTest(base.DbTestCase): action: std.echo output="Hoy" """ - self._prepare_test(wf_text) + wf_ex = self._prepare_test(wf_text) + + get_workflow_execution.return_value = wf_ex + + wf_ctrl = d_wf.DirectWorkflowController(wf_ex) # Workflow execution is in initial step. No running tasks. - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() self.assertEqual(1, len(cmds)) cmd = cmds[0] - self.assertIs(self.wf_ctrl.wf_ex, cmd.wf_ex) + self.assertIs(wf_ctrl.wf_ex, cmd.wf_ex) self.assertIsNotNone(cmd.task_spec) self.assertEqual('task1', cmd.task_spec.get_name()) self.assertEqual(states.RUNNING, self.wf_ex.state) @@ -109,7 +119,7 @@ class DirectWorkflowControllerTest(base.DbTestCase): ) ) - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() task1_ex.processed = True @@ -131,7 +141,7 @@ class DirectWorkflowControllerTest(base.DbTestCase): ) ) - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() task2_ex.processed = True diff --git a/mistral/tests/unit/workflow/test_reverse_workflow.py b/mistral/tests/unit/workflow/test_reverse_workflow.py index 243b2a593..b6ad5fd3d 100644 --- a/mistral/tests/unit/workflow/test_reverse_workflow.py +++ b/mistral/tests/unit/workflow/test_reverse_workflow.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock + from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral import exceptions as exc @@ -63,7 +65,6 @@ class ReverseWorkflowControllerTest(base.DbTestCase): self.wf_ex = wf_ex self.wb_spec = wb_spec - self.wf_ctrl = reverse_wf.ReverseWorkflowController(wf_ex) def _create_task_execution(self, name, state): tasks_spec = self.wb_spec.get_workflows()['wf'].get_tasks() @@ -78,29 +79,46 @@ class ReverseWorkflowControllerTest(base.DbTestCase): return task_ex - def test_start_workflow_task2(self): + @mock.patch.object(db_api, 'get_workflow_execution') + def test_start_workflow_task2(self, get_workflow_execution): + get_workflow_execution.return_value = self.wf_ex + + wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex) + self.wf_ex.params = {'task_name': 'task2'} - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() self.assertEqual(1, len(cmds)) self.assertEqual('task1', cmds[0].task_spec.get_name()) - def test_start_workflow_task1(self): + @mock.patch.object(db_api, 'get_workflow_execution') + def test_start_workflow_task1(self, get_workflow_execution): + get_workflow_execution.return_value = self.wf_ex + + wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex) + self.wf_ex.params = {'task_name': 'task1'} - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() self.assertEqual(1, len(cmds)) self.assertEqual('task1', cmds[0].task_spec.get_name()) - def test_start_workflow_without_task(self): - self.assertRaises( - exc.WorkflowException, - self.wf_ctrl.continue_workflow - ) + @mock.patch.object(db_api, 'get_workflow_execution') + def test_start_workflow_without_task(self, get_workflow_execution): + get_workflow_execution.return_value = self.wf_ex + + wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex) + + self.assertRaises(exc.WorkflowException, wf_ctrl.continue_workflow) + + @mock.patch.object(db_api, 'get_workflow_execution') + def test_continue_workflow(self, get_workflow_execution): + get_workflow_execution.return_value = self.wf_ex + + wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex) - def test_continue_workflow(self): self.wf_ex.params = {'task_name': 'task2'} # Assume task1 completed. @@ -115,7 +133,7 @@ class ReverseWorkflowControllerTest(base.DbTestCase): ) ) - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() task1_ex.processed = True @@ -134,7 +152,7 @@ class ReverseWorkflowControllerTest(base.DbTestCase): ) ) - cmds = self.wf_ctrl.continue_workflow() + cmds = wf_ctrl.continue_workflow() task1_ex.processed = True diff --git a/mistral/workbook/parser.py b/mistral/workbook/parser.py index 86af531ac..8b224aead 100644 --- a/mistral/workbook/parser.py +++ b/mistral/workbook/parser.py @@ -14,7 +14,6 @@ # limitations under the License. from cachetools import cached -from cachetools import hashkey from cachetools import LRUCache from threading import RLock import yaml @@ -35,8 +34,11 @@ V2_0 = '2.0' ALL_VERSIONS = [V2_0] -_WF_CACHE = LRUCache(maxsize=100) -_WF_CACHE_LOCK = RLock() +_WF_EX_CACHE = LRUCache(maxsize=100) +_WF_EX_CACHE_LOCK = RLock() + +_WF_DEF_CACHE = LRUCache(maxsize=100) +_WF_DEF_CACHE_LOCK = RLock() def parse_yaml(text): @@ -108,9 +110,9 @@ def get_workflow_spec(spec_dict): """Get workflow specification object from dictionary. NOTE: For large workflows this method can work very long (seconds). - For this reason, method 'get_workflow_spec_by_id' should be used - whenever possible because it caches specification objects by - workflow definition id. + For this reason, method 'get_workflow_spec_by_definition_id' or + 'get_workflow_spec_by_execution_id' should be used whenever possible + because they cache specification objects. :param spec_dict: Raw specification dictionary. """ @@ -188,9 +190,43 @@ def _parse_def_from_wb(wb_def, section_name, item_name): # Methods for obtaining specifications in a more efficient way using # caching techniques. +@cached(_WF_EX_CACHE, lock=_WF_EX_CACHE_LOCK) +def get_workflow_spec_by_execution_id(wf_ex_id): + """Gets workflow specification by workflow execution id. -@cached(_WF_CACHE, lock=_WF_CACHE_LOCK) -def get_workflow_spec_by_id(wf_def_id): + The idea is that when a workflow execution is running we + must be getting the same workflow specification even if + + :param wf_ex_id: Workflow execution id. + :return: Workflow specification. + """ + if not wf_ex_id: + return None + + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + return get_workflow_spec(wf_ex.spec) + + +@cached(_WF_DEF_CACHE, lock=_WF_DEF_CACHE_LOCK) +def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at): + """Gets specification by workflow definition id and its 'updated_at'. + + The idea of this method is to return a cached specification for the + given workflow id and workflow definition 'updated_at'. As long as the + given workflow definition remains the same in DB users of this method + will be getting a cached value. Once the workflow definition has + changed clients will be providing a different 'updated_at' value and + hence this method will be called and spec is updated for this combination + of parameters. Old cached values will be kicked out by LRU algorithm + if the cache runs out of space. + + :param wf_def_id: Workflow definition id. + :param wf_def_updated_at: Workflow definition 'updated_at' value. It + serves only as part of cache key and is not explicitly used in the + method. + :return: Workflow specification. + """ if not wf_def_id: return None @@ -199,16 +235,18 @@ def get_workflow_spec_by_id(wf_def_id): return get_workflow_spec(wf_def.spec) -def get_workflow_spec_cache_size(): - return len(_WF_CACHE) +def get_wf_execution_spec_cache_size(): + return len(_WF_EX_CACHE) + + +def get_wf_definition_spec_cache_size(): + return len(_WF_DEF_CACHE) def clear_caches(): """Clears all specification caches.""" - _WF_CACHE.clear() + with _WF_EX_CACHE_LOCK: + _WF_EX_CACHE.clear() - -def update_workflow_cache(wf_def_id, spec): - with _WF_CACHE_LOCK: - # We have to use hashkey function because @cached uses it implicitly. - _WF_CACHE[hashkey(wf_def_id)] = spec + with _WF_DEF_CACHE_LOCK: + _WF_DEF_CACHE.clear() diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 11ff01637..5410cb55f 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -43,7 +43,7 @@ def get_controller(wf_ex, wf_spec=None): """ if not wf_spec: - wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id) + wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id) wf_type = wf_spec.get_type() @@ -81,7 +81,7 @@ class WorkflowController(object): self.wf_ex = wf_ex if wf_spec is None: - wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id) + wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id) self.wf_spec = wf_spec