diff --git a/mistral/services/legacy_scheduler.py b/mistral/services/legacy_scheduler.py index 777805264..47ae4dec4 100644 --- a/mistral/services/legacy_scheduler.py +++ b/mistral/services/legacy_scheduler.py @@ -112,7 +112,7 @@ class LegacyScheduler(sched_base.Scheduler): job.func_name, job.run_after, serializers=job.func_arg_serializers, - key=None, + key=job.key, **job.func_args ) diff --git a/mistral/tests/unit/services/test_legacy_scheduler.py b/mistral/tests/unit/services/test_legacy_scheduler.py index 1b871c981..69cd7fc4b 100644 --- a/mistral/tests/unit/services/test_legacy_scheduler.py +++ b/mistral/tests/unit/services/test_legacy_scheduler.py @@ -68,6 +68,7 @@ class LegacySchedulerTest(base.DbTestCase): def target_check_context_method(self, expected_project_id): actual_project_id = auth_context.ctx().project_id + self.queue.put(item=(expected_project_id == actual_project_id)) @mock.patch(TARGET_METHOD_PATH) @@ -87,22 +88,27 @@ class LegacySchedulerTest(base.DbTestCase): run_after=DELAY, target_factory_func_name=TARGET_METHOD_PATH, func_name=target_method_name, - func_args={'name': 'task', 'id': '123'} + func_args={'name': 'task', 'id': '123'}, + key='my_job_key' ) self.scheduler.schedule(job) calls = db_api.get_delayed_calls_to_start(get_time_delay()) + call = self._assert_single_item( calls, - target_method_name=target_method_name + target_method_name=target_method_name, + key='my_job_key' ) self.assertIn('name', call['method_arguments']) self.queue.get() + factory().run_something.assert_called_once_with(name='task', id='123') calls = db_api.get_delayed_calls_to_start(get_time_delay()) + self.assertEqual(0, len(calls)) @mock.patch(TARGET_METHOD_PATH) @@ -112,22 +118,27 @@ class LegacySchedulerTest(base.DbTestCase): job = sched_base.SchedulerJob( run_after=DELAY, func_name=TARGET_METHOD_PATH, - func_args={'name': 'task', 'id': '321'} + func_args={'name': 'task', 'id': '321'}, + key='my_job_key' ) self.scheduler.schedule(job) calls = db_api.get_delayed_calls_to_start(get_time_delay()) + call = self._assert_single_item( calls, - target_method_name=TARGET_METHOD_PATH + target_method_name=TARGET_METHOD_PATH, + key='my_job_key' ) self.assertIn('name', call['method_arguments']) self.queue.get() + method.assert_called_once_with(name='task', id='321') calls = db_api.get_delayed_calls_to_start(get_time_delay()) + self.assertEqual(0, len(calls)) @mock.patch(TARGET_METHOD_PATH) @@ -201,6 +212,7 @@ class LegacySchedulerTest(base.DbTestCase): self.scheduler.schedule(job) calls = db_api.get_delayed_calls_to_start(get_time_delay()) + call = self._assert_single_item( calls, target_method_name=target_method_name @@ -216,6 +228,7 @@ class LegacySchedulerTest(base.DbTestCase): self.assertEqual('error', result.error) calls = db_api.get_delayed_calls_to_start(get_time_delay()) + self.assertEqual(0, len(calls)) @mock.patch(TARGET_METHOD_PATH) @@ -236,12 +249,15 @@ class LegacySchedulerTest(base.DbTestCase): second_scheduler.schedule(job) calls = db_api.get_delayed_calls_to_start(get_time_delay()) + self._assert_single_item(calls, target_method_name=TARGET_METHOD_PATH) self.queue.get() + method.assert_called_once_with(name='task', id='321') calls = db_api.get_delayed_calls_to_start(get_time_delay()) + self.assertEqual(0, len(calls)) @mock.patch(TARGET_METHOD_PATH) @@ -287,6 +303,7 @@ class LegacySchedulerTest(base.DbTestCase): call = db_api.create_delayed_call(values) calls = db_api.get_delayed_calls_to_start(get_time_delay(10)) + self.assertEqual(0, len(calls)) db_api.delete_delayed_call(call.id) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index cbac91af5..cf3087adb 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -76,7 +76,7 @@ class DirectWorkflowController(base.WorkflowController): task_ex ) - # Checking if task_ex is empty here is a serious optimization here + # Checking if task_ex is empty is a serious optimization here # because 'self.wf_ex.task_executions' leads to initialization of # the entire collection which in case of highly-parallel workflows # may be very expensive. @@ -107,6 +107,10 @@ class DirectWorkflowController(base.WorkflowController): for t_s in self.wf_spec.find_start_tasks() ] + @profiler.trace( + 'direct-wf-controller-find-next-commands-for-task', + hide_args=True + ) def _find_next_commands_for_task(self, task_ex): """Finds next commands based on the state of the given task.