From 026ac2f207f0dc667aec55b63618908307c598f6 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Wed, 20 Aug 2014 11:34:18 +0400 Subject: [PATCH] Add service for delayed calls * Delayed call mechanism works over DB Partially implements: blueprint mistral-delayed-messaging Change-Id: Ic7a517640673633e8cd2963650df91953dc958aa --- mistral/api/app.py | 2 + .../api/controllers/v1/workbook_definition.py | 3 + mistral/db/v2/api.py | 15 ++ mistral/db/v2/sqlalchemy/api.py | 42 ++++++ mistral/db/v2/sqlalchemy/models.py | 13 ++ mistral/engine/drivers/default/executor.py | 3 +- mistral/services/periodic.py | 6 +- mistral/services/scheduler.py | 134 ++++++++++-------- mistral/services/triggers.py | 84 +++++++++++ mistral/services/workbooks.py | 4 +- mistral/tests/unit/services/test_scheduler.py | 92 ++++++++++++ ...t_scheduler.py => test_trigger_service.py} | 20 +-- mistral/tests/unit/triggers/test_triggers.py | 6 +- 13 files changed, 346 insertions(+), 78 deletions(-) create mode 100644 mistral/services/triggers.py create mode 100644 mistral/tests/unit/services/test_scheduler.py rename mistral/tests/unit/{test_scheduler.py => test_trigger_service.py} (74%) diff --git a/mistral/api/app.py b/mistral/api/app.py index 70f5008e4..995a3bdda 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -22,6 +22,7 @@ from mistral.api.hooks import engine from mistral import context as ctx from mistral.db.v1 import api as db_api from mistral.services import periodic +from mistral.services import scheduler def get_pecan_config(): @@ -50,6 +51,7 @@ def setup_app(config=None, transport=None): # TODO(akuznetsov) move this to trigger scheduling to separate process periodic.setup(transport) + scheduler.setup() app = pecan.make_app( app_conf.pop('root'), diff --git a/mistral/api/controllers/v1/workbook_definition.py b/mistral/api/controllers/v1/workbook_definition.py index 8998625d5..6d9caad74 100644 --- a/mistral/api/controllers/v1/workbook_definition.py +++ b/mistral/api/controllers/v1/workbook_definition.py @@ -18,6 +18,7 @@ import pecan from mistral.db.v1 import api as db_api from mistral.openstack.common import log as logging +from mistral.services import triggers from mistral.services import workbooks from mistral.utils import rest_utils @@ -46,4 +47,6 @@ class WorkbookDefinitionController(pecan.rest.RestController): wb = workbooks.update_workbook_v1(workbook_name, {'definition': text}) + triggers.create_associated_triggers(wb) + return wb.definition diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 248916835..280e35432 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -152,3 +152,18 @@ def update_task(id, values): def delete_task(id): return IMPL.delete_task(id) + + +# Delayed calls. + + +def create_delayed_call(values): + return IMPL.create_delayed_call(values) + + +def delete_delayed_call(id): + return IMPL.delete_delayed_call(id) + + +def get_delayed_calls_to_start(time): + return IMPL.get_delayed_calls_to_start(time) diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 5dd70d996..bf74cb950 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -370,3 +370,45 @@ def _get_tasks(**kwargs): query = b.model_query(models.Task) return query.filter_by(**kwargs).all() + + +# Delayed calls. + +@b.session_aware() +def create_delayed_call(values, session=None): + delayed_call = models.DelayedCall() + delayed_call.update(values.copy()) + + try: + delayed_call.save(session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntry("Duplicate entry for DelayedCall: %s" + % e.columns) + + return delayed_call + + +@b.session_aware() +def delete_delayed_call(delayed_call_id, session=None): + delayed_call = _get_delayed_call(delayed_call_id) + if not delayed_call: + raise exc.NotFoundException("DelayedCall not found [delayed_call_id=" + "%s]" % delayed_call_id) + + session.delete(delayed_call) + + +@b.session_aware() +def get_delayed_calls_to_start(time, session=None): + query = b.model_query(models.DelayedCall) + query = query.filter(models.DelayedCall.execution_time < time) + query = query.order_by(models.DelayedCall.execution_time) + + return query.all() + + +@b.session_aware() +def _get_delayed_call(delayed_call_id, session=None): + query = b.model_query(models.DelayedCall) + + return query.filter_by(id=delayed_call_id).first() diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index f62ea5fdc..08bdbcd90 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -103,3 +103,16 @@ class Task(mb.MistralModelBase): # Relations. execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id')) execution = relationship('Execution', backref="tasks", lazy='joined') + + +class DelayedCall(mb.MistralModelBase): + """Contains info about delayed calls.""" + + __tablename__ = 'delayed_calls_v2' + + id = mb._id_column() + factory_method_path = sa.Column(sa.String(200), nullable=True) + target_method_name = sa.Column(sa.String(80), nullable=False) + method_arguments = sa.Column(st.JsonDictType()) + auth_context = sa.Column(st.JsonDictType()) + execution_time = sa.Column(sa.DateTime, nullable=False) diff --git a/mistral/engine/drivers/default/executor.py b/mistral/engine/drivers/default/executor.py index 07d1648d7..90d757270 100644 --- a/mistral/engine/drivers/default/executor.py +++ b/mistral/engine/drivers/default/executor.py @@ -31,7 +31,8 @@ class DefaultExecutor(executor.Executor): def _log_action_exception(self, message, task_id, action, params, ex): LOG.exception("%s [task_id=%s, action='%s', params='%s']\n %s" % - (message, task_id, action, params, ex)) + (message, str(task_id), str(action), + str(params), str(ex))) def handle_task(self, cntx, task_id, action_name, params={}): """Handle the execution of the workbook task. diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index c55106a0d..fc4efa7a3 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -20,7 +20,7 @@ from mistral import engine from mistral.openstack.common import log from mistral.openstack.common import periodic_task from mistral.openstack.common import threadgroup -from mistral.services import scheduler as sched +from mistral.services import triggers from mistral.services import trusts from mistral.workbook import parser as spec_parser @@ -40,7 +40,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): def scheduler_triggers(self, ctx): LOG.debug('Processing next Scheduler triggers.') - for trigger in sched.get_next_triggers(): + for trigger in triggers.get_next_triggers(): # Setup admin context before schedule triggers. context.set_ctx(ctx) @@ -54,7 +54,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): self.engine.start_workflow_execution(wb['name'], task) finally: - sched.set_next_execution_time(trigger) + triggers.set_next_execution_time(trigger) context.set_ctx(None) diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index 863ea2b43..90b9c6301 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -1,6 +1,4 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. +# Copyright 2014 - Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,71 +12,87 @@ # See the License for the specific language governing permissions and # limitations under the License. -from croniter import croniter + import datetime -from mistral.db.v1 import api as db_api -from mistral.workbook import parser as spec_parser + +from mistral import context +from mistral.db.v2 import api as db_api +from mistral.openstack.common import importutils +from mistral.openstack.common import log +from mistral.openstack.common import periodic_task +from mistral.openstack.common import threadgroup -def get_next_triggers(): - return db_api.get_next_triggers(datetime.datetime.now() + - datetime.timedelta(0, 2)) +LOG = log.getLogger(__name__) -def set_next_execution_time(trigger): - base = trigger['next_execution_time'] - cron = croniter(trigger['pattern'], base) +def schedule_call(factory_method_path, target_method_name, + run_after, **method_args): + """Add this call specification to DB, and then after run_after + seconds service CallScheduler invokes the target_method. - return db_api.trigger_update(trigger['id'], { - 'next_execution_time': cron.get_next(datetime.datetime) - }) + :param factory_method_path: Full python-specific path to + factory method for target object construction. + :param target_method_name: Name of target object method which + will be invoked. + :param run_after: Value in seconds. + :param method_args: Target method keyword arguments. + :return: None + """ + ctx = context.ctx().to_dict() if context.has_ctx() else {} + + execution_time = (datetime.datetime.now() + + datetime.timedelta(seconds=run_after)) + + values = { + 'factory_method_path': factory_method_path, + 'target_method_name': target_method_name, + 'execution_time': execution_time, + 'auth_context': ctx, + 'method_arguments': method_args + } + + db_api.create_delayed_call(values) -def _get_next_execution_time(pattern, start_time): - return croniter(pattern, start_time).get_next(datetime.datetime) +class CallScheduler(periodic_task.PeriodicTasks): + @periodic_task.periodic_task(spacing=1) + def run_delayed_calls(self, ctx=None): + LOG.debug('Processing next delayed calls.') + + datetime_filter = (datetime.datetime.now() + + datetime.timedelta(seconds=1)) + delayed_calls = db_api.get_delayed_calls_to_start(datetime_filter) + + for call in delayed_calls: + ctx = context.MistralContext(call.auth_context) + context.set_ctx(ctx) + + if call.factory_method_path: + factory = importutils.import_class(call.factory_method_path) + target_object = factory() + target_method = getattr(target_object, + call.target_method_name) + else: + target_method = importutils.import_class( + call.target_method_name) + try: + # Call the method. + target_method(**call.method_arguments) + except Exception as e: + LOG.debug("Exception was thrown during the " + "delayed call %s - %s", call, e) + finally: + # After call, delete this delayed call from DB. + db_api.delete_delayed_call(call.id) -def create_trigger(name, pattern, workbook_name, start_time=None): - if not start_time: - start_time = datetime.datetime.now() +def setup(): + tg = threadgroup.ThreadGroup() + pt = CallScheduler() - return db_api.trigger_create({ - "name": name, - "pattern": pattern, - "next_execution_time": _get_next_execution_time(pattern, start_time), - "workbook_name": workbook_name - }) - - -def create_associated_triggers(db_workbook): - if not db_workbook['definition']: - return - - wb_spec = spec_parser.get_workbook_spec_from_yaml( - db_workbook['definition'] - ) - - triggers = wb_spec.get_triggers() - - # Prepare all triggers data in advance to make db transaction shorter. - db_triggers = [] - - for e in triggers: - pattern = e['parameters']['cron-pattern'] - next_time = _get_next_execution_time(pattern, datetime.datetime.now()) - db_triggers.append({ - "name": e['name'], - "pattern": pattern, - "next_execution_time": next_time, - "workbook_name": db_workbook['name'] - }) - - db_api.start_tx() - - try: - for e in db_triggers: - db_api.trigger_create(e) - - db_api.commit_tx() - finally: - db_api.end_tx() + tg.add_dynamic_timer( + pt.run_periodic_tasks, + initial_delay=None, + periodic_interval_max=1, + context=None) diff --git a/mistral/services/triggers.py b/mistral/services/triggers.py new file mode 100644 index 000000000..362cddfee --- /dev/null +++ b/mistral/services/triggers.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from croniter import croniter +import datetime +from mistral.db.v1 import api as db_api +from mistral.workbook import parser as spec_parser + + +def get_next_triggers(): + return db_api.get_next_triggers(datetime.datetime.now() + + datetime.timedelta(0, 2)) + + +def set_next_execution_time(trigger): + base = trigger['next_execution_time'] + cron = croniter(trigger['pattern'], base) + + return db_api.trigger_update(trigger['id'], { + 'next_execution_time': cron.get_next(datetime.datetime) + }) + + +def _get_next_execution_time(pattern, start_time): + return croniter(pattern, start_time).get_next(datetime.datetime) + + +def create_trigger(name, pattern, workbook_name, start_time=None): + if not start_time: + start_time = datetime.datetime.now() + + return db_api.trigger_create({ + "name": name, + "pattern": pattern, + "next_execution_time": _get_next_execution_time(pattern, start_time), + "workbook_name": workbook_name + }) + + +def create_associated_triggers(db_workbook): + if not db_workbook.definition: + return + + wb_spec = spec_parser.get_workbook_spec_from_yaml( + db_workbook.definition + ) + + triggers = wb_spec.get_triggers() + + # Prepare all triggers data in advance to make db transaction shorter. + db_triggers = [] + + for e in triggers: + pattern = e['parameters']['cron-pattern'] + next_time = _get_next_execution_time(pattern, datetime.datetime.now()) + db_triggers.append({ + "name": e['name'], + "pattern": pattern, + "next_execution_time": next_time, + "workbook_name": db_workbook.name + }) + + db_api.start_tx() + + try: + for e in db_triggers: + db_api.trigger_create(e) + + db_api.commit_tx() + finally: + db_api.end_tx() diff --git a/mistral/services/workbooks.py b/mistral/services/workbooks.py index 0fac68b73..0c2a44685 100644 --- a/mistral/services/workbooks.py +++ b/mistral/services/workbooks.py @@ -20,7 +20,7 @@ from mistral import context from mistral.db.v1 import api as db_api_v1 from mistral.db.v2 import api as db_api_v2 from mistral import exceptions as exc -from mistral.services import scheduler +from mistral.services import triggers from mistral.services import trusts from mistral.workbook import parser as spec_parser @@ -35,7 +35,7 @@ def update_workbook_v1(workbook_name, values): wb_db = db_api_v1.workbook_update(workbook_name, values) if 'definition' in values: - scheduler.create_associated_triggers(wb_db) + triggers.create_associated_triggers(wb_db) return wb_db diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py new file mode 100644 index 000000000..f839c093d --- /dev/null +++ b/mistral/tests/unit/services/test_scheduler.py @@ -0,0 +1,92 @@ +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import eventlet +import mock + +from mistral.db.v2 import api as db_api +from mistral.services import scheduler +from mistral.tests import base + + +def factory_method(): + pass + + +class SchedulerServiceTest(base.DbTestCase): + def setUp(self): + super(SchedulerServiceTest, self).setUp() + scheduler.setup() + + @mock.patch('mistral.tests.unit.services.test_scheduler.factory_method') + def test_scheduler_with_factory(self, factory): + factory_method = ('mistral.tests.unit.services.' + 'test_scheduler.factory_method') + target_method = 'run_something' + method_args = {'name': 'task', 'id': '123'} + delay = 0.5 + + scheduler.schedule_call(factory_method, + target_method, + delay, + **method_args) + + time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) + calls = db_api.get_delayed_calls_to_start(time_filter) + + self.assertEqual(1, len(calls)) + call = self._assert_single_item(calls, + target_method_name=target_method) + + self.assertIn('name', call['method_arguments']) + + eventlet.sleep(delay * 2) + + factory().run_something.called_once_with(name='task', id='123') + + time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) + calls = db_api.get_delayed_calls_to_start(time_filter) + + self.assertEqual(0, len(calls)) + + @mock.patch('mistral.tests.unit.services.test_scheduler.factory_method') + def test_scheduler_without_factory(self, method): + target_method = ('mistral.tests.unit.services.' + 'test_scheduler.factory_method') + method_args = {'name': 'task', 'id': '321'} + delay = 0.5 + + scheduler.schedule_call(None, + target_method, + delay, + **method_args) + + time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) + calls = db_api.get_delayed_calls_to_start(time_filter) + + self.assertEqual(1, len(calls)) + call = self._assert_single_item(calls, + target_method_name=target_method) + + self.assertIn('name', call['method_arguments']) + + eventlet.sleep(delay * 2) + + method().called_once_with(name='task', id='321') + + time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1) + calls = db_api.get_delayed_calls_to_start(time_filter) + + self.assertEqual(0, len(calls)) diff --git a/mistral/tests/unit/test_scheduler.py b/mistral/tests/unit/test_trigger_service.py similarity index 74% rename from mistral/tests/unit/test_scheduler.py rename to mistral/tests/unit/test_trigger_service.py index 2e6d77b35..5b5e61576 100644 --- a/mistral/tests/unit/test_scheduler.py +++ b/mistral/tests/unit/test_trigger_service.py @@ -17,7 +17,7 @@ import datetime from mistral.openstack.common import timeutils -from mistral.services import scheduler as s +from mistral.services import triggers as t from mistral.tests import base @@ -30,30 +30,30 @@ SAMPLE_TRIGGER = { } -class SchedulerTest(base.DbTestCase): +class TriggerServiceTest(base.DbTestCase): def setUp(self): - super(SchedulerTest, self).setUp() + super(TriggerServiceTest, self).setUp() self.wb_name = "My workbook" def test_trigger_create_and_update(self): base = datetime.datetime(2010, 8, 25) next_trigger = datetime.datetime(2010, 8, 25, 0, 5) - trigger = s.create_trigger("test", "*/5 * * * *", self.wb_name, base) + trigger = t.create_trigger("test", "*/5 * * * *", self.wb_name, base) self.assertEqual(trigger['next_execution_time'], next_trigger) - trigger = s.set_next_execution_time(trigger) + trigger = t.set_next_execution_time(trigger) next_trigger = datetime.datetime(2010, 8, 25, 0, 10) self.assertEqual(trigger['next_execution_time'], next_trigger) def test_get_trigger_in_correct_orders(self): base = datetime.datetime(2010, 8, 25) - s.create_trigger("test1", "*/5 * * * *", self.wb_name, base) + t.create_trigger("test1", "*/5 * * * *", self.wb_name, base) base = datetime.datetime(2010, 8, 22) - s.create_trigger("test2", "*/5 * * * *", self.wb_name, base) + t.create_trigger("test2", "*/5 * * * *", self.wb_name, base) base = datetime.datetime(2010, 9, 21) - s.create_trigger("test3", "*/5 * * * *", self.wb_name, base) + t.create_trigger("test3", "*/5 * * * *", self.wb_name, base) base = datetime.datetime.now() + datetime.timedelta(0, 50) - s.create_trigger("test4", "*/5 * * * *", self.wb_name, base) - triggersName = [e['name'] for e in s.get_next_triggers()] + t.create_trigger("test4", "*/5 * * * *", self.wb_name, base) + triggersName = [e['name'] for e in t.get_next_triggers()] self.assertEqual(triggersName, ["test2", "test1", "test3"]) diff --git a/mistral/tests/unit/triggers/test_triggers.py b/mistral/tests/unit/triggers/test_triggers.py index ad3f223fc..cefb7ab7f 100644 --- a/mistral/tests/unit/triggers/test_triggers.py +++ b/mistral/tests/unit/triggers/test_triggers.py @@ -17,7 +17,7 @@ import pkg_resources as pkg from mistral.db.v1 import api as db_api -from mistral.services import scheduler +from mistral.services import triggers as trigger_srv from mistral.tests import base from mistral import version @@ -36,7 +36,9 @@ class TriggersTest(base.DbTestCase): 'definition': self.doc } - scheduler.create_associated_triggers(workbook) + wb_db = db_api.workbook_create(workbook) + + trigger_srv.create_associated_triggers(wb_db) triggers = db_api.triggers_get(workbook_name='my_workbook')