Add service for delayed calls

* Delayed call mechanism works over DB

Partially implements: blueprint mistral-delayed-messaging

Change-Id: Ic7a517640673633e8cd2963650df91953dc958aa
This commit is contained in:
Nikolay Mahotkin 2014-08-20 11:34:18 +04:00
parent 85193f625b
commit 026ac2f207
13 changed files with 346 additions and 78 deletions

View File

@ -22,6 +22,7 @@ from mistral.api.hooks import engine
from mistral import context as ctx from mistral import context as ctx
from mistral.db.v1 import api as db_api from mistral.db.v1 import api as db_api
from mistral.services import periodic from mistral.services import periodic
from mistral.services import scheduler
def get_pecan_config(): def get_pecan_config():
@ -50,6 +51,7 @@ def setup_app(config=None, transport=None):
# TODO(akuznetsov) move this to trigger scheduling to separate process # TODO(akuznetsov) move this to trigger scheduling to separate process
periodic.setup(transport) periodic.setup(transport)
scheduler.setup()
app = pecan.make_app( app = pecan.make_app(
app_conf.pop('root'), app_conf.pop('root'),

View File

@ -18,6 +18,7 @@ import pecan
from mistral.db.v1 import api as db_api from mistral.db.v1 import api as db_api
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral.services import triggers
from mistral.services import workbooks from mistral.services import workbooks
from mistral.utils import rest_utils from mistral.utils import rest_utils
@ -46,4 +47,6 @@ class WorkbookDefinitionController(pecan.rest.RestController):
wb = workbooks.update_workbook_v1(workbook_name, {'definition': text}) wb = workbooks.update_workbook_v1(workbook_name, {'definition': text})
triggers.create_associated_triggers(wb)
return wb.definition return wb.definition

View File

@ -152,3 +152,18 @@ def update_task(id, values):
def delete_task(id): def delete_task(id):
return IMPL.delete_task(id) return IMPL.delete_task(id)
# Delayed calls.
def create_delayed_call(values):
return IMPL.create_delayed_call(values)
def delete_delayed_call(id):
return IMPL.delete_delayed_call(id)
def get_delayed_calls_to_start(time):
return IMPL.get_delayed_calls_to_start(time)

View File

@ -370,3 +370,45 @@ def _get_tasks(**kwargs):
query = b.model_query(models.Task) query = b.model_query(models.Task)
return query.filter_by(**kwargs).all() return query.filter_by(**kwargs).all()
# Delayed calls.
@b.session_aware()
def create_delayed_call(values, session=None):
delayed_call = models.DelayedCall()
delayed_call.update(values.copy())
try:
delayed_call.save(session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for DelayedCall: %s"
% e.columns)
return delayed_call
@b.session_aware()
def delete_delayed_call(delayed_call_id, session=None):
delayed_call = _get_delayed_call(delayed_call_id)
if not delayed_call:
raise exc.NotFoundException("DelayedCall not found [delayed_call_id="
"%s]" % delayed_call_id)
session.delete(delayed_call)
@b.session_aware()
def get_delayed_calls_to_start(time, session=None):
query = b.model_query(models.DelayedCall)
query = query.filter(models.DelayedCall.execution_time < time)
query = query.order_by(models.DelayedCall.execution_time)
return query.all()
@b.session_aware()
def _get_delayed_call(delayed_call_id, session=None):
query = b.model_query(models.DelayedCall)
return query.filter_by(id=delayed_call_id).first()

View File

@ -103,3 +103,16 @@ class Task(mb.MistralModelBase):
# Relations. # Relations.
execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id')) execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id'))
execution = relationship('Execution', backref="tasks", lazy='joined') execution = relationship('Execution', backref="tasks", lazy='joined')
class DelayedCall(mb.MistralModelBase):
"""Contains info about delayed calls."""
__tablename__ = 'delayed_calls_v2'
id = mb._id_column()
factory_method_path = sa.Column(sa.String(200), nullable=True)
target_method_name = sa.Column(sa.String(80), nullable=False)
method_arguments = sa.Column(st.JsonDictType())
auth_context = sa.Column(st.JsonDictType())
execution_time = sa.Column(sa.DateTime, nullable=False)

View File

@ -31,7 +31,8 @@ class DefaultExecutor(executor.Executor):
def _log_action_exception(self, message, task_id, action, params, ex): def _log_action_exception(self, message, task_id, action, params, ex):
LOG.exception("%s [task_id=%s, action='%s', params='%s']\n %s" % LOG.exception("%s [task_id=%s, action='%s', params='%s']\n %s" %
(message, task_id, action, params, ex)) (message, str(task_id), str(action),
str(params), str(ex)))
def handle_task(self, cntx, task_id, action_name, params={}): def handle_task(self, cntx, task_id, action_name, params={}):
"""Handle the execution of the workbook task. """Handle the execution of the workbook task.

View File

@ -20,7 +20,7 @@ from mistral import engine
from mistral.openstack.common import log from mistral.openstack.common import log
from mistral.openstack.common import periodic_task from mistral.openstack.common import periodic_task
from mistral.openstack.common import threadgroup from mistral.openstack.common import threadgroup
from mistral.services import scheduler as sched from mistral.services import triggers
from mistral.services import trusts from mistral.services import trusts
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
@ -40,7 +40,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
def scheduler_triggers(self, ctx): def scheduler_triggers(self, ctx):
LOG.debug('Processing next Scheduler triggers.') LOG.debug('Processing next Scheduler triggers.')
for trigger in sched.get_next_triggers(): for trigger in triggers.get_next_triggers():
# Setup admin context before schedule triggers. # Setup admin context before schedule triggers.
context.set_ctx(ctx) context.set_ctx(ctx)
@ -54,7 +54,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
self.engine.start_workflow_execution(wb['name'], task) self.engine.start_workflow_execution(wb['name'], task)
finally: finally:
sched.set_next_execution_time(trigger) triggers.set_next_execution_time(trigger)
context.set_ctx(None) context.set_ctx(None)

View File

@ -1,6 +1,4 @@
# -*- coding: utf-8 -*- # Copyright 2014 - Mirantis, Inc.
#
# Copyright 2013 - Mirantis, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -14,71 +12,87 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from croniter import croniter
import datetime import datetime
from mistral.db.v1 import api as db_api
from mistral.workbook import parser as spec_parser from mistral import context
from mistral.db.v2 import api as db_api
from mistral.openstack.common import importutils
from mistral.openstack.common import log
from mistral.openstack.common import periodic_task
from mistral.openstack.common import threadgroup
def get_next_triggers(): LOG = log.getLogger(__name__)
return db_api.get_next_triggers(datetime.datetime.now() +
datetime.timedelta(0, 2))
def set_next_execution_time(trigger): def schedule_call(factory_method_path, target_method_name,
base = trigger['next_execution_time'] run_after, **method_args):
cron = croniter(trigger['pattern'], base) """Add this call specification to DB, and then after run_after
seconds service CallScheduler invokes the target_method.
return db_api.trigger_update(trigger['id'], { :param factory_method_path: Full python-specific path to
'next_execution_time': cron.get_next(datetime.datetime) factory method for target object construction.
}) :param target_method_name: Name of target object method which
will be invoked.
:param run_after: Value in seconds.
:param method_args: Target method keyword arguments.
:return: None
"""
ctx = context.ctx().to_dict() if context.has_ctx() else {}
execution_time = (datetime.datetime.now()
+ datetime.timedelta(seconds=run_after))
values = {
'factory_method_path': factory_method_path,
'target_method_name': target_method_name,
'execution_time': execution_time,
'auth_context': ctx,
'method_arguments': method_args
}
db_api.create_delayed_call(values)
def _get_next_execution_time(pattern, start_time): class CallScheduler(periodic_task.PeriodicTasks):
return croniter(pattern, start_time).get_next(datetime.datetime) @periodic_task.periodic_task(spacing=1)
def run_delayed_calls(self, ctx=None):
LOG.debug('Processing next delayed calls.')
datetime_filter = (datetime.datetime.now()
+ datetime.timedelta(seconds=1))
delayed_calls = db_api.get_delayed_calls_to_start(datetime_filter)
for call in delayed_calls:
ctx = context.MistralContext(call.auth_context)
context.set_ctx(ctx)
if call.factory_method_path:
factory = importutils.import_class(call.factory_method_path)
target_object = factory()
target_method = getattr(target_object,
call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name)
try:
# Call the method.
target_method(**call.method_arguments)
except Exception as e:
LOG.debug("Exception was thrown during the "
"delayed call %s - %s", call, e)
finally:
# After call, delete this delayed call from DB.
db_api.delete_delayed_call(call.id)
def create_trigger(name, pattern, workbook_name, start_time=None): def setup():
if not start_time: tg = threadgroup.ThreadGroup()
start_time = datetime.datetime.now() pt = CallScheduler()
return db_api.trigger_create({ tg.add_dynamic_timer(
"name": name, pt.run_periodic_tasks,
"pattern": pattern, initial_delay=None,
"next_execution_time": _get_next_execution_time(pattern, start_time), periodic_interval_max=1,
"workbook_name": workbook_name context=None)
})
def create_associated_triggers(db_workbook):
if not db_workbook['definition']:
return
wb_spec = spec_parser.get_workbook_spec_from_yaml(
db_workbook['definition']
)
triggers = wb_spec.get_triggers()
# Prepare all triggers data in advance to make db transaction shorter.
db_triggers = []
for e in triggers:
pattern = e['parameters']['cron-pattern']
next_time = _get_next_execution_time(pattern, datetime.datetime.now())
db_triggers.append({
"name": e['name'],
"pattern": pattern,
"next_execution_time": next_time,
"workbook_name": db_workbook['name']
})
db_api.start_tx()
try:
for e in db_triggers:
db_api.trigger_create(e)
db_api.commit_tx()
finally:
db_api.end_tx()

View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from croniter import croniter
import datetime
from mistral.db.v1 import api as db_api
from mistral.workbook import parser as spec_parser
def get_next_triggers():
return db_api.get_next_triggers(datetime.datetime.now() +
datetime.timedelta(0, 2))
def set_next_execution_time(trigger):
base = trigger['next_execution_time']
cron = croniter(trigger['pattern'], base)
return db_api.trigger_update(trigger['id'], {
'next_execution_time': cron.get_next(datetime.datetime)
})
def _get_next_execution_time(pattern, start_time):
return croniter(pattern, start_time).get_next(datetime.datetime)
def create_trigger(name, pattern, workbook_name, start_time=None):
if not start_time:
start_time = datetime.datetime.now()
return db_api.trigger_create({
"name": name,
"pattern": pattern,
"next_execution_time": _get_next_execution_time(pattern, start_time),
"workbook_name": workbook_name
})
def create_associated_triggers(db_workbook):
if not db_workbook.definition:
return
wb_spec = spec_parser.get_workbook_spec_from_yaml(
db_workbook.definition
)
triggers = wb_spec.get_triggers()
# Prepare all triggers data in advance to make db transaction shorter.
db_triggers = []
for e in triggers:
pattern = e['parameters']['cron-pattern']
next_time = _get_next_execution_time(pattern, datetime.datetime.now())
db_triggers.append({
"name": e['name'],
"pattern": pattern,
"next_execution_time": next_time,
"workbook_name": db_workbook.name
})
db_api.start_tx()
try:
for e in db_triggers:
db_api.trigger_create(e)
db_api.commit_tx()
finally:
db_api.end_tx()

View File

@ -20,7 +20,7 @@ from mistral import context
from mistral.db.v1 import api as db_api_v1 from mistral.db.v1 import api as db_api_v1
from mistral.db.v2 import api as db_api_v2 from mistral.db.v2 import api as db_api_v2
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import scheduler from mistral.services import triggers
from mistral.services import trusts from mistral.services import trusts
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
@ -35,7 +35,7 @@ def update_workbook_v1(workbook_name, values):
wb_db = db_api_v1.workbook_update(workbook_name, values) wb_db = db_api_v1.workbook_update(workbook_name, values)
if 'definition' in values: if 'definition' in values:
scheduler.create_associated_triggers(wb_db) triggers.create_associated_triggers(wb_db)
return wb_db return wb_db

View File

@ -0,0 +1,92 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import eventlet
import mock
from mistral.db.v2 import api as db_api
from mistral.services import scheduler
from mistral.tests import base
def factory_method():
pass
class SchedulerServiceTest(base.DbTestCase):
def setUp(self):
super(SchedulerServiceTest, self).setUp()
scheduler.setup()
@mock.patch('mistral.tests.unit.services.test_scheduler.factory_method')
def test_scheduler_with_factory(self, factory):
factory_method = ('mistral.tests.unit.services.'
'test_scheduler.factory_method')
target_method = 'run_something'
method_args = {'name': 'task', 'id': '123'}
delay = 0.5
scheduler.schedule_call(factory_method,
target_method,
delay,
**method_args)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(1, len(calls))
call = self._assert_single_item(calls,
target_method_name=target_method)
self.assertIn('name', call['method_arguments'])
eventlet.sleep(delay * 2)
factory().run_something.called_once_with(name='task', id='123')
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(0, len(calls))
@mock.patch('mistral.tests.unit.services.test_scheduler.factory_method')
def test_scheduler_without_factory(self, method):
target_method = ('mistral.tests.unit.services.'
'test_scheduler.factory_method')
method_args = {'name': 'task', 'id': '321'}
delay = 0.5
scheduler.schedule_call(None,
target_method,
delay,
**method_args)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(1, len(calls))
call = self._assert_single_item(calls,
target_method_name=target_method)
self.assertIn('name', call['method_arguments'])
eventlet.sleep(delay * 2)
method().called_once_with(name='task', id='321')
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(0, len(calls))

View File

@ -17,7 +17,7 @@
import datetime import datetime
from mistral.openstack.common import timeutils from mistral.openstack.common import timeutils
from mistral.services import scheduler as s from mistral.services import triggers as t
from mistral.tests import base from mistral.tests import base
@ -30,30 +30,30 @@ SAMPLE_TRIGGER = {
} }
class SchedulerTest(base.DbTestCase): class TriggerServiceTest(base.DbTestCase):
def setUp(self): def setUp(self):
super(SchedulerTest, self).setUp() super(TriggerServiceTest, self).setUp()
self.wb_name = "My workbook" self.wb_name = "My workbook"
def test_trigger_create_and_update(self): def test_trigger_create_and_update(self):
base = datetime.datetime(2010, 8, 25) base = datetime.datetime(2010, 8, 25)
next_trigger = datetime.datetime(2010, 8, 25, 0, 5) next_trigger = datetime.datetime(2010, 8, 25, 0, 5)
trigger = s.create_trigger("test", "*/5 * * * *", self.wb_name, base) trigger = t.create_trigger("test", "*/5 * * * *", self.wb_name, base)
self.assertEqual(trigger['next_execution_time'], next_trigger) self.assertEqual(trigger['next_execution_time'], next_trigger)
trigger = s.set_next_execution_time(trigger) trigger = t.set_next_execution_time(trigger)
next_trigger = datetime.datetime(2010, 8, 25, 0, 10) next_trigger = datetime.datetime(2010, 8, 25, 0, 10)
self.assertEqual(trigger['next_execution_time'], next_trigger) self.assertEqual(trigger['next_execution_time'], next_trigger)
def test_get_trigger_in_correct_orders(self): def test_get_trigger_in_correct_orders(self):
base = datetime.datetime(2010, 8, 25) base = datetime.datetime(2010, 8, 25)
s.create_trigger("test1", "*/5 * * * *", self.wb_name, base) t.create_trigger("test1", "*/5 * * * *", self.wb_name, base)
base = datetime.datetime(2010, 8, 22) base = datetime.datetime(2010, 8, 22)
s.create_trigger("test2", "*/5 * * * *", self.wb_name, base) t.create_trigger("test2", "*/5 * * * *", self.wb_name, base)
base = datetime.datetime(2010, 9, 21) base = datetime.datetime(2010, 9, 21)
s.create_trigger("test3", "*/5 * * * *", self.wb_name, base) t.create_trigger("test3", "*/5 * * * *", self.wb_name, base)
base = datetime.datetime.now() + datetime.timedelta(0, 50) base = datetime.datetime.now() + datetime.timedelta(0, 50)
s.create_trigger("test4", "*/5 * * * *", self.wb_name, base) t.create_trigger("test4", "*/5 * * * *", self.wb_name, base)
triggersName = [e['name'] for e in s.get_next_triggers()] triggersName = [e['name'] for e in t.get_next_triggers()]
self.assertEqual(triggersName, ["test2", "test1", "test3"]) self.assertEqual(triggersName, ["test2", "test1", "test3"])

View File

@ -17,7 +17,7 @@
import pkg_resources as pkg import pkg_resources as pkg
from mistral.db.v1 import api as db_api from mistral.db.v1 import api as db_api
from mistral.services import scheduler from mistral.services import triggers as trigger_srv
from mistral.tests import base from mistral.tests import base
from mistral import version from mistral import version
@ -36,7 +36,9 @@ class TriggersTest(base.DbTestCase):
'definition': self.doc 'definition': self.doc
} }
scheduler.create_associated_triggers(workbook) wb_db = db_api.workbook_create(workbook)
trigger_srv.create_associated_triggers(wb_db)
triggers = db_api.triggers_get(workbook_name='my_workbook') triggers = db_api.triggers_get(workbook_name='my_workbook')