Merge "Fixing scheduler work"

This commit is contained in:
Jenkins 2015-03-12 08:22:26 +00:00 committed by Gerrit Code Review
commit d650ddc98e
2 changed files with 90 additions and 44 deletions

View File

@ -82,46 +82,60 @@ def schedule_call(factory_method_path, target_method_name,
class CallScheduler(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=1)
def run_delayed_calls(self, ctx=None):
datetime_filter = (datetime.datetime.now() +
datetime.timedelta(seconds=1))
delayed_calls = db_api.get_delayed_calls_to_start(datetime_filter)
# Wrap delayed calls processing in transaction to
# guarantee that calls will be processed just once.
# Do delete query to DB first to force hanging up all
# parallel transactions.
# It should work on isolation level 'READ-COMMITTED',
# 'REPEATABLE-READ' and above.
#
# 'REPEATABLE-READ' is by default in MySQL and
# 'READ-COMMITTED is by default in PostgreSQL.
with db_api.transaction():
delayed_calls = db_api.get_delayed_calls_to_start(datetime_filter)
for call in delayed_calls:
LOG.debug('Processing next delayed call: %s', call)
context.set_ctx(context.MistralContext(call.auth_context))
if call.factory_method_path:
factory = importutils.import_class(call.factory_method_path)
target_method = getattr(factory(), call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name
)
method_args = copy.copy(call.method_arguments)
if call.serializers:
# Deserialize arguments.
for arg_name, serializer_path in call.serializers.items():
serializer = importutils.import_class(serializer_path)()
deserialized = serializer.deserialize(
method_args[arg_name])
method_args[arg_name] = deserialized
try:
# Call the method.
target_method(**method_args)
except Exception as e:
LOG.debug(
"Delayed call failed [call=%s, exception=%s]", call, e
)
finally:
# After call, delete this delayed call from DB.
for call in delayed_calls:
# Delete this delayed call from DB before the making call in
# order to prevent calling from parallel transaction.
db_api.delete_delayed_call(call.id)
LOG.debug('Processing next delayed call: %s', call)
context.set_ctx(context.MistralContext(call.auth_context))
if call.factory_method_path:
factory = importutils.import_class(
call.factory_method_path
)
target_method = getattr(factory(), call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name
)
method_args = copy.copy(call.method_arguments)
if call.serializers:
# Deserialize arguments.
for arg_name, serializer_path in call.serializers.items():
serializer = importutils.import_class(
serializer_path
)()
deserialized = serializer.deserialize(
method_args[arg_name])
method_args[arg_name] = deserialized
try:
# Call the method.
target_method(**method_args)
except Exception as e:
LOG.debug(
"Delayed call failed [call=%s, exception=%s]", call, e
)
def setup():
tg = threadgroup.ThreadGroup()

View File

@ -47,7 +47,7 @@ class SchedulerServiceTest(base.DbTestCase):
def test_scheduler_with_factory(self, factory):
target_method = 'run_something'
method_args = {'name': 'task', 'id': '123'}
delay = 0.5
delay = 1.5
scheduler.schedule_call(
FACTORY_METHOD_NAME,
@ -57,7 +57,7 @@ class SchedulerServiceTest(base.DbTestCase):
)
calls = db_api.get_delayed_calls_to_start(
datetime.datetime.now() + datetime.timedelta(seconds=1)
datetime.datetime.now() + datetime.timedelta(seconds=2)
)
call = self._assert_single_item(
@ -67,7 +67,7 @@ class SchedulerServiceTest(base.DbTestCase):
self.assertIn('name', call['method_arguments'])
eventlet.sleep(delay * 2)
eventlet.sleep(delay)
factory().run_something.assert_called_once_with(name='task', id='123')
@ -79,7 +79,7 @@ class SchedulerServiceTest(base.DbTestCase):
@mock.patch(TARGET_METHOD_NAME)
def test_scheduler_without_factory(self, method):
method_args = {'name': 'task', 'id': '321'}
delay = 0.5
delay = 1.5
scheduler.schedule_call(
None,
@ -88,7 +88,7 @@ class SchedulerServiceTest(base.DbTestCase):
**method_args
)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=0.5)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=2)
calls = db_api.get_delayed_calls_to_start(time_filter)
call = self._assert_single_item(
@ -98,7 +98,7 @@ class SchedulerServiceTest(base.DbTestCase):
self.assertIn('name', call['method_arguments'])
eventlet.sleep(delay * 2)
eventlet.sleep(delay)
method.assert_called_once_with(name='task', id='321')
@ -123,7 +123,7 @@ class SchedulerServiceTest(base.DbTestCase):
'result': 'mistral.workflow.utils.TaskResultSerializer'
}
delay = 0.5
delay = 1.5
scheduler.schedule_call(
FACTORY_METHOD_NAME,
@ -134,7 +134,7 @@ class SchedulerServiceTest(base.DbTestCase):
)
calls = db_api.get_delayed_calls_to_start(
datetime.datetime.now() + datetime.timedelta(seconds=0.5)
datetime.datetime.now() + datetime.timedelta(seconds=2)
)
call = self._assert_single_item(
@ -144,7 +144,7 @@ class SchedulerServiceTest(base.DbTestCase):
self.assertIn('name', call['method_arguments'])
eventlet.sleep(delay * 2)
eventlet.sleep(delay)
result = factory().run_something.call_args[1].get('result')
@ -157,3 +157,35 @@ class SchedulerServiceTest(base.DbTestCase):
)
self.assertEqual(0, len(calls))
@mock.patch(TARGET_METHOD_NAME)
def test_scheduler_multi_instance(self, method):
def stop_thread_groups():
[tg.stop() for tg in self.tgs]
self.tgs = [scheduler.setup(), scheduler.setup()]
self.addCleanup(stop_thread_groups)
method_args = {'name': 'task', 'id': '321'}
delay = 1.5
scheduler.schedule_call(
None,
TARGET_METHOD_NAME,
delay,
**method_args
)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=2)
calls = db_api.get_delayed_calls_to_start(time_filter)
self._assert_single_item(calls, target_method_name=TARGET_METHOD_NAME)
eventlet.sleep(delay)
method.assert_called_once_with(name='task', id='321')
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(0, len(calls))