diff --git a/AUTHORS b/AUTHORS index 761b75a2..aa7870ef 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,13 +1,16 @@ Alexander Kuznetsov +Anastasia Kuznetsova Angus Salkeld +Christian Berendt Dmitri Zimine +Jeremy Stanley Kirill Izotov Manas Kelshikar Nikolay Mahotkin +Ray Chen Renat Akhmerov Sergey Kolekonov Sergey Murashov Timur Nurlygayanov Winson Chan - - +Bryan Havenstein diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index eb32f7ab..97f7a8bd 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -1,6 +1,6 @@ # Copyright 2014 - Mirantis, Inc. # -# Licensed under the Apache License, Version 2.0 (the "License"); +# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # @@ -44,12 +44,11 @@ def schedule_call(factory_method_path, target_method_name, Serializer for the object type must implement serializer interface in mistral/utils/serializer.py :param method_args: Target method keyword arguments. - :return: None """ ctx = context.ctx().to_dict() if context.has_ctx() else {} - execution_time = (datetime.datetime.now() - + datetime.timedelta(seconds=run_after)) + execution_time = (datetime.datetime.now() + + datetime.timedelta(seconds=run_after)) if serializers: for arg_name, serializer_path in serializers.items(): @@ -63,8 +62,9 @@ def schedule_call(factory_method_path, target_method_name, raise ImportError("Cannot import class %s: %s" % (serializer_path, e)) - serialized = serializer.serialize(method_args[arg_name]) - method_args[arg_name] = serialized + method_args[arg_name] = serializer.serialize( + method_args[arg_name] + ) values = { 'factory_method_path': factory_method_path, @@ -83,22 +83,21 @@ class CallScheduler(periodic_task.PeriodicTasks): def run_delayed_calls(self, ctx=None): LOG.debug('Processing next delayed calls.') - datetime_filter = (datetime.datetime.now() - + datetime.timedelta(seconds=1)) + datetime_filter = (datetime.datetime.now() + + datetime.timedelta(seconds=1)) delayed_calls = db_api.get_delayed_calls_to_start(datetime_filter) for call in delayed_calls: - ctx = context.MistralContext(call.auth_context) - context.set_ctx(ctx) + context.set_ctx(context.MistralContext(call.auth_context)) if call.factory_method_path: factory = importutils.import_class(call.factory_method_path) - target_object = factory() - target_method = getattr(target_object, - call.target_method_name) + + target_method = getattr(factory(), call.target_method_name) else: target_method = importutils.import_class( - call.target_method_name) + call.target_method_name + ) method_args = copy.copy(call.method_arguments) @@ -115,8 +114,9 @@ class CallScheduler(periodic_task.PeriodicTasks): # Call the method. target_method(**method_args) except Exception as e: - LOG.debug("Exception was thrown during the " - "delayed call %s - %s", call, e) + LOG.debug( + "Delayed call failed [call=%s, exception=%s]", call, e + ) finally: # After call, delete this delayed call from DB. db_api.delete_delayed_call(call.id) @@ -124,12 +124,12 @@ class CallScheduler(periodic_task.PeriodicTasks): def setup(): tg = threadgroup.ThreadGroup() - pt = CallScheduler() tg.add_dynamic_timer( - pt.run_periodic_tasks, + CallScheduler().run_periodic_tasks, initial_delay=None, periodic_interval_max=1, - context=None) + context=None + ) return tg diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index 647acda6..3c56b5ec 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -21,6 +21,13 @@ from mistral.services import scheduler from mistral.tests import base from mistral.workflow import utils as wf_utils +# TODO(rakhmerov): This test is fragile because it's fully based on delays. +# TODO(rakhmerov): Think how to redesign it. + +FACTORY_METHOD_NAME = ('mistral.tests.unit.services.test_scheduler.' + 'factory_method') +TARGET_METHOD_NAME = FACTORY_METHOD_NAME + def factory_method(): pass @@ -30,27 +37,31 @@ class SchedulerServiceTest(base.DbTestCase): def setUp(self): super(SchedulerServiceTest, self).setUp() - thread_group = scheduler.setup() - self.addCleanup(thread_group.stop) + self.thread_group = scheduler.setup() - @mock.patch('mistral.tests.unit.services.test_scheduler.factory_method') + self.addCleanup(self.thread_group.stop) + + @mock.patch(FACTORY_METHOD_NAME) def test_scheduler_with_factory(self, factory): - factory_method = ('mistral.tests.unit.services.' - 'test_scheduler.factory_method') target_method = 'run_something' method_args = {'name': 'task', 'id': '123'} delay = 0.5 - scheduler.schedule_call(factory_method, - target_method, - delay, - **method_args) + scheduler.schedule_call( + FACTORY_METHOD_NAME, + target_method, + delay, + **method_args + ) - time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) - calls = db_api.get_delayed_calls_to_start(time_filter) + calls = db_api.get_delayed_calls_to_start( + datetime.datetime.now() + datetime.timedelta(seconds=1) + ) - call = self._assert_single_item(calls, - target_method_name=target_method) + call = self._assert_single_item( + calls, + target_method_name=target_method + ) self.assertIn('name', call['method_arguments']) @@ -63,23 +74,25 @@ class SchedulerServiceTest(base.DbTestCase): self.assertEqual(0, len(calls)) - @mock.patch('mistral.tests.unit.services.test_scheduler.factory_method') + @mock.patch(TARGET_METHOD_NAME) def test_scheduler_without_factory(self, method): - target_method = ('mistral.tests.unit.services.' - 'test_scheduler.factory_method') method_args = {'name': 'task', 'id': '321'} delay = 0.5 - scheduler.schedule_call(None, - target_method, - delay, - **method_args) + scheduler.schedule_call( + None, + TARGET_METHOD_NAME, + delay, + **method_args + ) time_filter = datetime.datetime.now() + datetime.timedelta(seconds=0.5) calls = db_api.get_delayed_calls_to_start(time_filter) - call = self._assert_single_item(calls, - target_method_name=target_method) + call = self._assert_single_item( + calls, + target_method_name=TARGET_METHOD_NAME + ) self.assertIn('name', call['method_arguments']) @@ -92,10 +105,8 @@ class SchedulerServiceTest(base.DbTestCase): self.assertEqual(0, len(calls)) - @mock.patch('mistral.tests.unit.services.test_scheduler.factory_method') + @mock.patch(FACTORY_METHOD_NAME) def test_scheduler_with_serializer(self, factory): - factory_method = ('mistral.tests.unit.services.' - 'test_scheduler.factory_method') target_method = 'run_something' task_result = wf_utils.TaskResult('data', 'error') @@ -103,26 +114,33 @@ class SchedulerServiceTest(base.DbTestCase): method_args = { 'name': 'task', 'id': '123', - 'result': task_result} + 'result': task_result + } - serializers_map = { + serializers = { 'result': 'mistral.workflow.utils.TaskResultSerializer' } delay = 0.5 - scheduler.schedule_call(factory_method, - target_method, - delay, - serializers=serializers_map, - **method_args) + scheduler.schedule_call( + FACTORY_METHOD_NAME, + target_method, + delay, + serializers=serializers, + **method_args + ) - time_filter = datetime.datetime.now() + datetime.timedelta(seconds=0.5) - calls = db_api.get_delayed_calls_to_start(time_filter) + calls = db_api.get_delayed_calls_to_start( + datetime.datetime.now() + datetime.timedelta(seconds=0.5) + ) self.assertEqual(1, len(calls)) - call = self._assert_single_item(calls, - target_method_name=target_method) + + call = self._assert_single_item( + calls, + target_method_name=target_method + ) self.assertIn('name', call['method_arguments']) @@ -134,7 +152,8 @@ class SchedulerServiceTest(base.DbTestCase): self.assertEqual('data', result.data) self.assertEqual('error', result.error) - time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) - calls = db_api.get_delayed_calls_to_start(time_filter) + calls = db_api.get_delayed_calls_to_start( + datetime.datetime.now() + datetime.timedelta(seconds=1) + ) self.assertEqual(0, len(calls))