diff --git a/mistral/api/app.py b/mistral/api/app.py index 1693b2b5..8f2c8cb9 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -15,6 +15,7 @@ import pecan from mistral.api import config as api_config +from mistral.services import periodic def get_pecan_config(): @@ -30,6 +31,9 @@ def setup_app(config=None): app_conf = dict(config.app) + ##TODO(akuznetsov) move this to event scheduling to separate process + periodic.setup() + return pecan.make_app( app_conf.pop('root'), logging=getattr(config, 'logging', {}), diff --git a/mistral/cmd/api.py b/mistral/cmd/api.py index 398d61de..ef375d74 100644 --- a/mistral/cmd/api.py +++ b/mistral/cmd/api.py @@ -14,6 +14,8 @@ """Script to start Mistral API service.""" +import eventlet + import os import sys from wsgiref import simple_server @@ -25,6 +27,9 @@ from mistral import config from mistral.openstack.common import log as logging +eventlet.monkey_patch( + os=True, select=True, socket=True, thread=True, time=True) + LOG = logging.getLogger('mistral.cmd.api') diff --git a/mistral/db/sqlalchemy/api.py b/mistral/db/sqlalchemy/api.py index 1cd2440f..731c0499 100644 --- a/mistral/db/sqlalchemy/api.py +++ b/mistral/db/sqlalchemy/api.py @@ -86,13 +86,40 @@ def event_create(values): try: event.save(session=session) except db_exc.DBDuplicateEntry as e: - raise Exception LOG.exception("Database registration exception: %s", e) + ##TODO(akuznetsov) create special exception for this case + raise Exception return event_get(event.id) +def event_update(event_id, values): + values = values.copy() + + session = get_session() + with session.begin(): + event = _event_get(event_id, session) + if event is None: + ##TODO(akuznetsov) create special exception for this case + raise Exception + event.update(values) + + return event + + +@to_dict +def get_next_events(time): + query = model_query(m.Event, get_session()) + query = query.filter(m.Event.next_execution_time < time) + query = query.order_by(m.Event.next_execution_time) + return query.all() + + +def _event_get(event_id, session): + query = model_query(m.Event, session) + return query.filter_by(id=event_id).first() + + @to_dict def event_get(event_id): - query = model_query(m.Event, get_session()) - return query.filter_by(id=event_id).first() + return _event_get(event_id, get_session()) diff --git a/mistral/db/sqlalchemy/models.py b/mistral/db/sqlalchemy/models.py index ae64bd9a..550535fe 100644 --- a/mistral/db/sqlalchemy/models.py +++ b/mistral/db/sqlalchemy/models.py @@ -43,3 +43,5 @@ class Event(mb.MistralBase): id = _id_column() name = sa.Column(sa.String(80), nullable=False) + pattern = sa.Column(sa.String(20), nullable=False) + next_execution_time = sa.Column(sa.DateTime, nullable=False) diff --git a/mistral/openstack/common/loopingcall.py b/mistral/openstack/common/loopingcall.py new file mode 100644 index 00000000..5f5f5e5d --- /dev/null +++ b/mistral/openstack/common/loopingcall.py @@ -0,0 +1,147 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from eventlet import event +from eventlet import greenthread + +from mistral.openstack.common.gettextutils import _ # noqa +from mistral.openstack.common import log as logging +from mistral.openstack.common import timeutils + +LOG = logging.getLogger(__name__) + + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCall. + + The poll-function passed to LoopingCall can raise this exception to + break out of the loop normally. This is somewhat analogous to + StopIteration. + + An optional return-value can be included as the argument to the exception; + this return-value will be returned by LoopingCall.wait() + + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCall.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCallBase(object): + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + self.done = None + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() + + +class FixedIntervalLoopingCall(LoopingCallBase): + """A fixed interval looping call.""" + + def start(self, interval, initial_delay=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + start = timeutils.utcnow() + self.f(*self.args, **self.kw) + end = timeutils.utcnow() + if not self._running: + break + delay = interval - timeutils.delta_seconds(start, end) + if delay <= 0: + LOG.warn(_('task run outlasted interval by %s sec') % + -delay) + greenthread.sleep(delay if delay > 0 else 0) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_('in fixed duration looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn_n(_inner) + return self.done + + +# TODO(mikal): this class name is deprecated in Havana and should be removed +# in the I release +LoopingCall = FixedIntervalLoopingCall + + +class DynamicLoopingCall(LoopingCallBase): + """A looping call which sleeps until the next known event. + + The function called should return how long to sleep for before being + called again. + """ + + def start(self, initial_delay=None, periodic_interval_max=None): + self._running = True + done = event.Event() + + def _inner(): + + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + idle = self.f(*self.args, **self.kw) + if not self._running: + break + + if periodic_interval_max is not None: + idle = min(idle, periodic_interval_max) + LOG.debug(_('Dynamic looping call sleeping for %.02f ' + 'seconds'), idle) + greenthread.sleep(idle) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_('in dynamic looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + greenthread.spawn(_inner) + return self.done diff --git a/mistral/openstack/common/periodic_task.py b/mistral/openstack/common/periodic_task.py new file mode 100644 index 00000000..50c25950 --- /dev/null +++ b/mistral/openstack/common/periodic_task.py @@ -0,0 +1,189 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import time + +from oslo.config import cfg + +from mistral.openstack.common.gettextutils import _ # noqa +from mistral.openstack.common import log as logging +from mistral.openstack.common import timeutils + + +periodic_opts = [ + cfg.BoolOpt('run_external_periodic_tasks', + default=True, + help=('Some periodic tasks can be run in a separate process. ' + 'Should we run them here?')), +] + +CONF = cfg.CONF +CONF.register_opts(periodic_opts) + +LOG = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 60.0 + + +class InvalidPeriodicTaskArg(Exception): + message = _("Unexpected argument for periodic task creation: %(arg)s.") + + +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on every cycle + of the periodic scheduler. + + 2. With arguments: + @periodic_task(spacing=N [, run_immediately=[True|False]]) + this will be run on approximately every N seconds. If this number is + negative the periodic task will be disabled. If the run_immediately + argument is provided and has a value of 'True', the first run of the + task will be shortly after task scheduler starts. If + run_immediately is omitted or set to 'False', the first time the + task runs will be approximately N seconds after the task scheduler + starts. + """ + def decorator(f): + # Test for old style invocation + if 'ticks_between_runs' in kwargs: + raise InvalidPeriodicTaskArg(arg='ticks_between_runs') + + # Control if run at all + f._periodic_task = True + f._periodic_external_ok = kwargs.pop('external_process_ok', False) + if f._periodic_external_ok and not CONF.run_external_periodic_tasks: + f._periodic_enabled = False + else: + f._periodic_enabled = kwargs.pop('enabled', True) + + # Control frequency + f._periodic_spacing = kwargs.pop('spacing', 0) + f._periodic_immediate = kwargs.pop('run_immediately', False) + if f._periodic_immediate: + f._periodic_last_run = None + else: + f._periodic_last_run = timeutils.utcnow() + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parens. + # + # In the 'with-parens' case (with kwargs present), this function needs to + # return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parens' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class _PeriodicTasksMeta(type): + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead an initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._periodic_last_run = cls._periodic_last_run.copy() + except AttributeError: + cls._periodic_last_run = {} + + try: + cls._periodic_spacing = cls._periodic_spacing.copy() + except AttributeError: + cls._periodic_spacing = {} + + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + task = value + name = task.__name__ + + if task._periodic_spacing < 0: + LOG.info(_('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + continue + if not task._periodic_enabled: + LOG.info(_('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + continue + + # A periodic spacing of zero indicates that this task should + # be run every pass + if task._periodic_spacing == 0: + task._periodic_spacing = None + + cls._periodic_tasks.append((name, task)) + cls._periodic_spacing[name] = task._periodic_spacing + cls._periodic_last_run[name] = task._periodic_last_run + + +class PeriodicTasks(object): + __metaclass__ = _PeriodicTasksMeta + + def run_periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + idle_for = DEFAULT_INTERVAL + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + now = timeutils.utcnow() + spacing = self._periodic_spacing[task_name] + last_run = self._periodic_last_run[task_name] + + # If a periodic task is _nearly_ due, then we'll run it early + if spacing is not None and last_run is not None: + due = last_run + datetime.timedelta(seconds=spacing) + if not timeutils.is_soon(due, 0.2): + idle_for = min(idle_for, timeutils.delta_seconds(now, due)) + continue + + if spacing is not None: + idle_for = min(idle_for, spacing) + + LOG.debug(_("Running periodic task %(full_task_name)s"), + {"full_task_name": full_task_name}) + self._periodic_last_run[task_name] = timeutils.utcnow() + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + {"full_task_name": full_task_name, "e": e}) + time.sleep(0) + + return idle_for diff --git a/mistral/openstack/common/threadgroup.py b/mistral/openstack/common/threadgroup.py new file mode 100644 index 00000000..8a2211e5 --- /dev/null +++ b/mistral/openstack/common/threadgroup.py @@ -0,0 +1,121 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +from eventlet import greenpool +from eventlet import greenthread + +from mistral.openstack.common import log as logging +from mistral.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + """Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. + """ + def __init__(self, thread, group): + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + +class ThreadGroup(object): + """The point of the ThreadGroup classis to: + + * keep track of timers and greenthreads (making it easier to stop them + when need be). + * provide an easy API to add timers. + """ + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_dynamic_timer(self, callback, initial_delay=None, + periodic_interval_max=None, *args, **kwargs): + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + self.timers.append(timer) + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + + def thread_done(self, thread): + self.threads.remove(thread) + + def stop(self): + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def wait(self): + for x in self.timers: + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + continue + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) diff --git a/mistral/services/__init__.py b/mistral/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py new file mode 100644 index 00000000..7feeec8e --- /dev/null +++ b/mistral/services/periodic.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from mistral.openstack.common import log +from mistral.openstack.common import periodic_task +from mistral.openstack.common import threadgroup +from mistral.services import scheduler as s + +LOG = log.getLogger(__name__) + + +class MistralPeriodicTasks(periodic_task.PeriodicTasks): + @periodic_task.periodic_task(spacing=1, run_immediately=True) + def scheduler_events(self, ctx): + LOG.debug('Processing next Scheduler events.') + for event in s.get_next_events(): + #TODO(akuznetsov) send signal + s.set_next_execution_time(event) + + +def setup(): + tg = threadgroup.ThreadGroup() + pt = MistralPeriodicTasks() + tg.add_dynamic_timer( + pt.run_periodic_tasks, + initial_delay=None, + periodic_interval_max=1, + context=None) diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py new file mode 100644 index 00000000..be75f568 --- /dev/null +++ b/mistral/services/scheduler.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from croniter import croniter +from datetime import datetime +from datetime import timedelta +from mistral.db.sqlalchemy import api as db_api + + +def get_next_events(): + time = datetime.now() + timedelta(0, 2) + return db_api.get_next_events(time) + + +def set_next_execution_time(event): + base = event['next_execution_time'] + cron = croniter(event['pattern'], base) + return db_api.event_update(event['id'], { + 'next_execution_time': cron.get_next(datetime) + }) + + +def create_event(name, pattern, start_time=None): + if not start_time: + start_time = datetime.now() + cron = croniter(pattern, start_time) + next_execution_time = cron.get_next(datetime) + return db_api.event_create({ + "name": name, + "pattern": pattern, + "next_execution_time": next_execution_time + }) diff --git a/mistral/tests/unit/db/__init__.py b/mistral/tests/unit/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/tests/unit/test_events.py b/mistral/tests/unit/db/test_events.py similarity index 87% rename from mistral/tests/unit/test_events.py rename to mistral/tests/unit/db/test_events.py index f1f00f5d..cb9938af 100644 --- a/mistral/tests/unit/test_events.py +++ b/mistral/tests/unit/db/test_events.py @@ -14,13 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral.tests.unit import base as test_base +from mistral.openstack.common import timeutils + from mistral.db.sqlalchemy import api as db_api +from mistral.tests.unit import base as test_base SAMPLE_EVENT = { "id": "123", - "name": "test_event" + "name": "test_event", + "pattern": "* *", + "next_execution_time": timeutils.utcnow() } diff --git a/mistral/tests/unit/test_scheduler.py b/mistral/tests/unit/test_scheduler.py new file mode 100644 index 00000000..a909faa5 --- /dev/null +++ b/mistral/tests/unit/test_scheduler.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from datetime import timedelta + +from mistral.openstack.common import timeutils + +from mistral.services import scheduler as s +from mistral.tests.unit import base as test_base + + +SAMPLE_EVENT = { + "id": "123", + "name": "test_event", + "patter": "* *", + "next_execution_time": timeutils.utcnow() +} + + +class SchedulerTest(test_base.DbTestCase): + def test_event_create_and_update(self): + base = datetime(2010, 8, 25) + next_event = datetime(2010, 8, 25, 0, 5) + event = s.create_event("test", "*/5 * * * *", base) + self.assertEqual(event['next_execution_time'], next_event) + + event = s.set_next_execution_time(event) + next_event = datetime(2010, 8, 25, 0, 10) + self.assertEqual(event['next_execution_time'], next_event) + + def test_get_event_in_correct_orders(self): + base = datetime(2010, 8, 25) + s.create_event("test1", "*/5 * * * *", base) + base = datetime(2010, 8, 22) + s.create_event("test2", "*/5 * * * *", base) + base = datetime(2010, 9, 21) + s.create_event("test3", "*/5 * * * *", base) + base = datetime.now() + timedelta(0, 50) + s.create_event("test4", "*/5 * * * *", base) + eventsName = [e['name'] for e in s.get_next_events()] + + self.assertEqual(eventsName, ["test2", "test1", "test3"]) diff --git a/openstack-common.conf b/openstack-common.conf index 054b5d13..321680c6 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -10,6 +10,10 @@ module=db.sqlalchemy module=log module=test module=jsonutils +module=loopingcall +module=periodic_task +module=threadgroup +module=timeutils # The base module to hold the copy of openstack.common base=mistral diff --git a/requirements.txt b/requirements.txt index ba26b1d0..aa72446e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ pecan>=0.2.0 WSME>=0.5b6 amqplib>=0.6.1 argparse +croniter oslo.config>=1.2.0