Added periodic events

Change-Id: Ieb15db84b6ece293a4aa2dbc0d1ff0a7d8afde94
(cherry picked from commit d4b92e3)
This commit is contained in:
Alexander Kuznetsov 2013-12-03 16:45:06 +04:00
parent 40d43a6dec
commit 56816d3910
15 changed files with 652 additions and 5 deletions

View File

@ -15,6 +15,7 @@
import pecan import pecan
from mistral.api import config as api_config from mistral.api import config as api_config
from mistral.services import periodic
def get_pecan_config(): def get_pecan_config():
@ -30,6 +31,9 @@ def setup_app(config=None):
app_conf = dict(config.app) app_conf = dict(config.app)
##TODO(akuznetsov) move this to event scheduling to separate process
periodic.setup()
return pecan.make_app( return pecan.make_app(
app_conf.pop('root'), app_conf.pop('root'),
logging=getattr(config, 'logging', {}), logging=getattr(config, 'logging', {}),

View File

@ -14,6 +14,8 @@
"""Script to start Mistral API service.""" """Script to start Mistral API service."""
import eventlet
import os import os
import sys import sys
from wsgiref import simple_server from wsgiref import simple_server
@ -25,6 +27,9 @@ from mistral import config
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
eventlet.monkey_patch(
os=True, select=True, socket=True, thread=True, time=True)
LOG = logging.getLogger('mistral.cmd.api') LOG = logging.getLogger('mistral.cmd.api')

View File

@ -86,13 +86,40 @@ def event_create(values):
try: try:
event.save(session=session) event.save(session=session)
except db_exc.DBDuplicateEntry as e: except db_exc.DBDuplicateEntry as e:
raise Exception
LOG.exception("Database registration exception: %s", e) LOG.exception("Database registration exception: %s", e)
##TODO(akuznetsov) create special exception for this case
raise Exception
return event_get(event.id) return event_get(event.id)
def event_update(event_id, values):
values = values.copy()
session = get_session()
with session.begin():
event = _event_get(event_id, session)
if event is None:
##TODO(akuznetsov) create special exception for this case
raise Exception
event.update(values)
return event
@to_dict
def get_next_events(time):
query = model_query(m.Event, get_session())
query = query.filter(m.Event.next_execution_time < time)
query = query.order_by(m.Event.next_execution_time)
return query.all()
def _event_get(event_id, session):
query = model_query(m.Event, session)
return query.filter_by(id=event_id).first()
@to_dict @to_dict
def event_get(event_id): def event_get(event_id):
query = model_query(m.Event, get_session()) return _event_get(event_id, get_session())
return query.filter_by(id=event_id).first()

View File

@ -43,3 +43,5 @@ class Event(mb.MistralBase):
id = _id_column() id = _id_column()
name = sa.Column(sa.String(80), nullable=False) name = sa.Column(sa.String(80), nullable=False)
pattern = sa.Column(sa.String(20), nullable=False)
next_execution_time = sa.Column(sa.DateTime, nullable=False)

View File

@ -0,0 +1,147 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from eventlet import event
from eventlet import greenthread
from mistral.openstack.common.gettextutils import _ # noqa
from mistral.openstack.common import log as logging
from mistral.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCall.
The poll-function passed to LoopingCall can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCall.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCall.wait() should return."""
self.retvalue = retvalue
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running:
break
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn_n(_inner)
return self.done
# TODO(mikal): this class name is deprecated in Havana and should be removed
# in the I release
LoopingCall = FixedIntervalLoopingCall
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done

View File

@ -0,0 +1,189 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
from oslo.config import cfg
from mistral.openstack.common.gettextutils import _ # noqa
from mistral.openstack.common import log as logging
from mistral.openstack.common import timeutils
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help=('Some periodic tasks can be run in a separate process. '
'Should we run them here?')),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
class InvalidPeriodicTaskArg(Exception):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = timeutils.utcnow()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parens.
#
# In the 'with-parens' case (with kwargs present), this function needs to
# return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parens' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class _PeriodicTasksMeta(type):
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._periodic_last_run = cls._periodic_last_run.copy()
except AttributeError:
cls._periodic_last_run = {}
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
if task._periodic_spacing < 0:
LOG.info(_('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run every pass
if task._periodic_spacing == 0:
task._periodic_spacing = None
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
cls._periodic_last_run[name] = task._periodic_last_run
class PeriodicTasks(object):
__metaclass__ = _PeriodicTasksMeta
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
now = timeutils.utcnow()
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# If a periodic task is _nearly_ due, then we'll run it early
if spacing is not None and last_run is not None:
due = last_run + datetime.timedelta(seconds=spacing)
if not timeutils.is_soon(due, 0.2):
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
continue
if spacing is not None:
idle_for = min(idle_for, spacing)
LOG.debug(_("Running periodic task %(full_task_name)s"),
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = timeutils.utcnow()
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for

View File

@ -0,0 +1,121 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from mistral.openstack.common import log as logging
from mistral.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
"""Callback function to be passed to GreenThread.link() when we spawn()
Calls the :class:`ThreadGroup` to notify if.
"""
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
"""Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
def stop(self):
self.thread.kill()
def wait(self):
return self.thread.wait()
class ThreadGroup(object):
"""The point of the ThreadGroup classis to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
* provide an easy API to add timers.
"""
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
pulse.start(interval=interval,
initial_delay=initial_delay)
self.timers.append(pulse)
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):
self.threads.remove(thread)
def stop(self):
current = greenthread.getcurrent()
for x in self.threads:
if x is current:
# don't kill the current thread.
continue
try:
x.stop()
except Exception as ex:
LOG.exception(ex)
for x in self.timers:
try:
x.stop()
except Exception as ex:
LOG.exception(ex)
self.timers = []
def wait(self):
for x in self.timers:
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
current = greenthread.getcurrent()
for x in self.threads:
if x is current:
continue
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)

View File

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import log
from mistral.openstack.common import periodic_task
from mistral.openstack.common import threadgroup
from mistral.services import scheduler as s
LOG = log.getLogger(__name__)
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def scheduler_events(self, ctx):
LOG.debug('Processing next Scheduler events.')
for event in s.get_next_events():
#TODO(akuznetsov) send signal
s.set_next_execution_time(event)
def setup():
tg = threadgroup.ThreadGroup()
pt = MistralPeriodicTasks()
tg.add_dynamic_timer(
pt.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=None)

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from croniter import croniter
from datetime import datetime
from datetime import timedelta
from mistral.db.sqlalchemy import api as db_api
def get_next_events():
time = datetime.now() + timedelta(0, 2)
return db_api.get_next_events(time)
def set_next_execution_time(event):
base = event['next_execution_time']
cron = croniter(event['pattern'], base)
return db_api.event_update(event['id'], {
'next_execution_time': cron.get_next(datetime)
})
def create_event(name, pattern, start_time=None):
if not start_time:
start_time = datetime.now()
cron = croniter(pattern, start_time)
next_execution_time = cron.get_next(datetime)
return db_api.event_create({
"name": name,
"pattern": pattern,
"next_execution_time": next_execution_time
})

View File

View File

@ -14,13 +14,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from mistral.tests.unit import base as test_base from mistral.openstack.common import timeutils
from mistral.db.sqlalchemy import api as db_api from mistral.db.sqlalchemy import api as db_api
from mistral.tests.unit import base as test_base
SAMPLE_EVENT = { SAMPLE_EVENT = {
"id": "123", "id": "123",
"name": "test_event" "name": "test_event",
"pattern": "* *",
"next_execution_time": timeutils.utcnow()
} }

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
from mistral.openstack.common import timeutils
from mistral.services import scheduler as s
from mistral.tests.unit import base as test_base
SAMPLE_EVENT = {
"id": "123",
"name": "test_event",
"patter": "* *",
"next_execution_time": timeutils.utcnow()
}
class SchedulerTest(test_base.DbTestCase):
def test_event_create_and_update(self):
base = datetime(2010, 8, 25)
next_event = datetime(2010, 8, 25, 0, 5)
event = s.create_event("test", "*/5 * * * *", base)
self.assertEqual(event['next_execution_time'], next_event)
event = s.set_next_execution_time(event)
next_event = datetime(2010, 8, 25, 0, 10)
self.assertEqual(event['next_execution_time'], next_event)
def test_get_event_in_correct_orders(self):
base = datetime(2010, 8, 25)
s.create_event("test1", "*/5 * * * *", base)
base = datetime(2010, 8, 22)
s.create_event("test2", "*/5 * * * *", base)
base = datetime(2010, 9, 21)
s.create_event("test3", "*/5 * * * *", base)
base = datetime.now() + timedelta(0, 50)
s.create_event("test4", "*/5 * * * *", base)
eventsName = [e['name'] for e in s.get_next_events()]
self.assertEqual(eventsName, ["test2", "test1", "test3"])

View File

@ -10,6 +10,10 @@ module=db.sqlalchemy
module=log module=log
module=test module=test
module=jsonutils module=jsonutils
module=loopingcall
module=periodic_task
module=threadgroup
module=timeutils
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=mistral base=mistral

View File

@ -5,4 +5,5 @@ pecan>=0.2.0
WSME>=0.5b6 WSME>=0.5b6
amqplib>=0.6.1 amqplib>=0.6.1
argparse argparse
croniter
oslo.config>=1.2.0 oslo.config>=1.2.0