Merge "Operation Engine: Support Trigger Multi Node"
This commit is contained in:
commit
2e8afd53c4
|
@ -192,6 +192,25 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None,
|
|||
###################
|
||||
|
||||
|
||||
def trigger_execution_create(context, trigger_id, time):
|
||||
return IMPL.trigger_execution_create(context, trigger_id, time)
|
||||
|
||||
|
||||
def trigger_execution_get_next(context):
|
||||
return IMPL.trigger_execution_get_next(context)
|
||||
|
||||
|
||||
def trigger_execution_delete(context, id, trigger_id):
|
||||
return IMPL.trigger_execution_delete(context, id, trigger_id)
|
||||
|
||||
|
||||
def trigger_execution_update(context, id, current_time, new_time):
|
||||
return IMPL.trigger_execution_update(context, id, current_time, new_time)
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def scheduled_operation_get(context, id, columns_to_join=[]):
|
||||
"""Get a scheduled operation by its id.
|
||||
|
||||
|
|
|
@ -445,6 +445,109 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None, marker=None,
|
|||
###################
|
||||
|
||||
|
||||
def _trigger_execution_list_query(context, session, **kwargs):
|
||||
return model_query(context, models.TriggerExecution, session=session)
|
||||
|
||||
|
||||
def _trigger_execution_list_process_filters(query, filters):
|
||||
exact_match_filter_names = ['id', 'trigger_id', 'execution_time']
|
||||
query = _list_common_process_exact_filter(models.Trigger, query, filters,
|
||||
exact_match_filter_names)
|
||||
return query
|
||||
|
||||
|
||||
def _trigger_execution_get(context, id, session=None):
|
||||
result = model_query(context, models.TriggerExecution,
|
||||
session=session).filter_by(id=id)
|
||||
result = result.first()
|
||||
|
||||
if not result:
|
||||
raise exception.TriggerNotFound(id=id)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def trigger_execution_update(context, id, old_time, new_time):
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
result = model_query(
|
||||
context, models.TriggerExecution, session=session
|
||||
).filter_by(
|
||||
id=id, execution_time=old_time
|
||||
).update({"execution_time": new_time})
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to update trigger execution (%(execution)s): "
|
||||
"%(exc)s",
|
||||
{"execution": id, "exc": e})
|
||||
return False
|
||||
else:
|
||||
LOG.debug("Updated trigger execution (%(execution)s) from %(old_time)s"
|
||||
" to %(new_time)s",
|
||||
{"execution": id, "old_time": old_time, "new_time": new_time}
|
||||
)
|
||||
return result == 1
|
||||
|
||||
|
||||
def trigger_execution_create(context, trigger_id, time):
|
||||
trigger_ex_ref = models.TriggerExecution()
|
||||
trigger_ex_ref.update({
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'trigger_id': trigger_id,
|
||||
'execution_time': time,
|
||||
})
|
||||
trigger_ex_ref.save(get_session())
|
||||
return trigger_ex_ref
|
||||
|
||||
|
||||
def trigger_execution_delete(context, id, trigger_id):
|
||||
filters = {}
|
||||
if id:
|
||||
filters['id'] = id
|
||||
if trigger_id:
|
||||
filters['trigger_id'] = trigger_id
|
||||
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
deleted = model_query(
|
||||
context, models.TriggerExecution, session=session
|
||||
).filter_by(**filters).delete()
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to delete trigger (%(trigger)s) execution "
|
||||
"(%(execution)s): %(exc)s",
|
||||
{"trigger": trigger_id, "execution": id, "exc": e})
|
||||
return False
|
||||
else:
|
||||
LOG.debug("Deleted trigger (%(trigger)s) execution (%(execution)s)",
|
||||
{"trigger": trigger_id, "execution": id})
|
||||
return deleted == 1
|
||||
|
||||
|
||||
def trigger_execution_get_next(context):
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
query = _generate_paginate_query(
|
||||
context, session,
|
||||
marker=None,
|
||||
limit=1,
|
||||
sort_keys=('execution_time', ),
|
||||
sort_dirs=('asc', ),
|
||||
filters=None,
|
||||
paginate_type=models.TriggerExecution,
|
||||
)
|
||||
result = query.first()
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to get next trigger execution %s", e)
|
||||
return None
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def scheduled_operation_get(context, id, columns_to_join=[]):
|
||||
return _scheduled_operation_get(context, id,
|
||||
columns_to_join=columns_to_join)
|
||||
|
@ -1484,6 +1587,9 @@ PAGINATION_HELPERS = {
|
|||
_restore_get),
|
||||
models.Trigger: (_trigger_list_query, _trigger_list_process_filters,
|
||||
_trigger_get),
|
||||
models.TriggerExecution: (_trigger_execution_list_query,
|
||||
_trigger_execution_list_process_filters,
|
||||
_trigger_execution_get),
|
||||
models.ScheduledOperation: (_scheduled_operation_list_query,
|
||||
_scheduled_operation_list_process_filters,
|
||||
_scheduled_operation_get),
|
||||
|
|
|
@ -121,6 +121,19 @@ def define_tables(meta):
|
|||
mysql_engine='InnoDB'
|
||||
)
|
||||
|
||||
trigger_executions = Table(
|
||||
'trigger_executions', meta,
|
||||
Column('created_at', DateTime),
|
||||
Column('updated_at', DateTime),
|
||||
Column('deleted_at', DateTime),
|
||||
Column('deleted', Boolean, nullable=False),
|
||||
Column('id', String(length=36), primary_key=True, nullable=False),
|
||||
Column('trigger_id', String(length=36), unique=True, nullable=False,
|
||||
index=True),
|
||||
Column('execution_time', DateTime, nullable=False, index=True),
|
||||
mysql_engine='InnoDB'
|
||||
)
|
||||
|
||||
scheduled_operations = Table(
|
||||
'scheduled_operations', meta,
|
||||
Column('created_at', DateTime),
|
||||
|
@ -206,6 +219,7 @@ def define_tables(meta):
|
|||
restores,
|
||||
operation_logs,
|
||||
triggers,
|
||||
trigger_executions,
|
||||
scheduled_operations,
|
||||
scheduled_operation_states,
|
||||
scheduled_operation_logs,
|
||||
|
|
|
@ -74,6 +74,16 @@ class Trigger(BASE, KarborBase):
|
|||
properties = Column(Text, nullable=False)
|
||||
|
||||
|
||||
class TriggerExecution(BASE, KarborBase):
|
||||
"""Represents a future trigger execition"""
|
||||
|
||||
__tablename__ = 'trigger_executions'
|
||||
|
||||
id = Column(String(36), primary_key=True, nullable=False)
|
||||
trigger_id = Column(String(36), unique=True, nullable=False, index=True)
|
||||
execution_time = Column(DateTime, nullable=False, index=True)
|
||||
|
||||
|
||||
class ScheduledOperation(BASE, KarborBase):
|
||||
"""Represents a scheduled operation."""
|
||||
|
||||
|
@ -235,6 +245,7 @@ def register_models():
|
|||
Plan,
|
||||
Resource,
|
||||
Trigger,
|
||||
TriggerExecution,
|
||||
ScheduledOperation,
|
||||
ScheduledOperationState,
|
||||
ScheduledOperationLog,
|
||||
|
|
|
@ -12,14 +12,15 @@
|
|||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import eventlet
|
||||
import functools
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from stevedore import driver as import_driver
|
||||
|
||||
from karbor import context as karbor_context
|
||||
from karbor import db
|
||||
from karbor import exception
|
||||
from karbor.i18n import _
|
||||
from karbor.services.operationengine.engine import triggers
|
||||
|
@ -41,7 +42,11 @@ time_trigger_opts = [
|
|||
cfg.StrOpt('time_format',
|
||||
default='calendar',
|
||||
choices=['crontab', 'calendar'],
|
||||
help='The type of time format which is used to compute time')
|
||||
help='The type of time format which is used to compute time'),
|
||||
cfg.IntOpt('trigger_poll_interval',
|
||||
default=15,
|
||||
help='Interval, in seconds, in which Karbor will poll for '
|
||||
'trigger events'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -49,67 +54,10 @@ CONF.register_opts(time_trigger_opts)
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerOperationGreenThread(object):
|
||||
def __init__(self, first_run_time, function):
|
||||
super(TriggerOperationGreenThread, self).__init__()
|
||||
self._is_sleeping = True
|
||||
self._pre_run_time = None
|
||||
self._running = False
|
||||
self._thread = None
|
||||
|
||||
self._function = function
|
||||
|
||||
self._start(first_run_time)
|
||||
|
||||
def kill(self):
|
||||
self._running = False
|
||||
if self._is_sleeping:
|
||||
self._thread.kill()
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return self._running
|
||||
|
||||
@property
|
||||
def pre_run_time(self):
|
||||
return self._pre_run_time
|
||||
|
||||
def _start(self, first_run_time):
|
||||
self._running = True
|
||||
|
||||
now = datetime.utcnow()
|
||||
initial_delay = 0 if first_run_time <= now else (
|
||||
int(timeutils.delta_seconds(now, first_run_time)))
|
||||
|
||||
self._thread = eventlet.spawn_after(
|
||||
initial_delay, self._run, first_run_time)
|
||||
self._thread.link(self._on_done)
|
||||
|
||||
def _on_done(self, gt, *args, **kwargs):
|
||||
self._is_sleeping = True
|
||||
self._pre_run_time = None
|
||||
self._running = False
|
||||
self._thread = None
|
||||
|
||||
def _run(self, expect_run_time):
|
||||
while self._running:
|
||||
self._is_sleeping = False
|
||||
self._pre_run_time = expect_run_time
|
||||
|
||||
expect_run_time = self._function(expect_run_time)
|
||||
if expect_run_time is None or not self._running:
|
||||
break
|
||||
|
||||
self._is_sleeping = True
|
||||
|
||||
now = datetime.utcnow()
|
||||
idle_time = 0 if expect_run_time <= now else int(
|
||||
timeutils.delta_seconds(now, expect_run_time))
|
||||
eventlet.sleep(idle_time)
|
||||
|
||||
|
||||
class TimeTrigger(triggers.BaseTrigger):
|
||||
TRIGGER_TYPE = "time"
|
||||
_loopingcall = None
|
||||
_triggers = {}
|
||||
|
||||
def __init__(self, trigger_id, trigger_property, executor):
|
||||
super(TimeTrigger, self).__init__(
|
||||
|
@ -118,30 +66,136 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
self._trigger_property = self.check_trigger_definition(
|
||||
trigger_property)
|
||||
|
||||
self._greenthread = None
|
||||
timer = self._get_timer(self._trigger_property)
|
||||
first_run_time = self._compute_next_run_time(
|
||||
datetime.utcnow(), self._trigger_property['end_time'], timer)
|
||||
LOG.debug("first_run_time: %s", first_run_time)
|
||||
|
||||
self._trigger_execution_new(self._id, first_run_time)
|
||||
|
||||
if not self.__class__._loopingcall:
|
||||
self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall(
|
||||
self._loop)
|
||||
self.__class__._loopingcall.start(
|
||||
interval=CONF.trigger_poll_interval,
|
||||
stop_on_exception=False,
|
||||
)
|
||||
|
||||
self._register()
|
||||
|
||||
def _register(self):
|
||||
self.__class__._triggers[self._id] = self
|
||||
|
||||
def _unregister(self):
|
||||
del self.__class__._triggers[self._id]
|
||||
|
||||
@classmethod
|
||||
def _loop(cls):
|
||||
while True:
|
||||
now = datetime.utcnow()
|
||||
exec_to_handle = cls._trigger_execution_get_next()
|
||||
if not exec_to_handle:
|
||||
LOG.debug("No next trigger executions")
|
||||
break
|
||||
|
||||
trigger_id = exec_to_handle.trigger_id
|
||||
execution_time = exec_to_handle.execution_time
|
||||
trigger = cls._triggers.get(trigger_id)
|
||||
if not trigger:
|
||||
LOG.warning("Unable to find trigger %s", trigger_id)
|
||||
res = cls._trigger_execution_delete(
|
||||
execution_id=exec_to_handle.id)
|
||||
continue
|
||||
|
||||
if now < execution_time:
|
||||
LOG.debug("Time trigger not yet due")
|
||||
break
|
||||
|
||||
trigger_property = trigger._trigger_property
|
||||
timer = cls._get_timer(trigger_property)
|
||||
window = trigger_property.get("window")
|
||||
end_time_to_run = execution_time + timedelta(
|
||||
seconds=window)
|
||||
|
||||
if now > end_time_to_run:
|
||||
LOG.debug("Time trigger (%s) out of window",)
|
||||
execute = False
|
||||
else:
|
||||
LOG.debug("Time trigger (%s) is due", trigger_id)
|
||||
execute = True
|
||||
|
||||
next_exec_time = cls._compute_next_run_time(
|
||||
now,
|
||||
trigger_property['end_time'],
|
||||
timer,
|
||||
)
|
||||
res = False
|
||||
if not next_exec_time:
|
||||
LOG.debug("No more planned executions for trigger (%s)",
|
||||
trigger_id)
|
||||
res = cls._trigger_execution_delete(
|
||||
execution_id=exec_to_handle.id)
|
||||
else:
|
||||
LOG.debug("Rescheduling (%s) from %s to %s",
|
||||
trigger_id,
|
||||
execution_time,
|
||||
next_exec_time)
|
||||
res = cls._trigger_execution_update(
|
||||
exec_to_handle.id,
|
||||
execution_time,
|
||||
next_exec_time,
|
||||
)
|
||||
|
||||
if not res:
|
||||
LOG.info("Trigger probably handled by another node")
|
||||
continue
|
||||
|
||||
if execute:
|
||||
cls._trigger_operations(trigger_id, execution_time, window)
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_new(cls, trigger_id, time):
|
||||
# Find the first time.
|
||||
# We don't known when using this trigger first time.
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
try:
|
||||
db.trigger_execution_create(ctxt, trigger_id, time)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_update(cls, id, current_time, next_time):
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
return db.trigger_execution_update(ctxt, id, current_time, next_time)
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_delete(cls, execution_id=None, trigger_id=None):
|
||||
if execution_id is None and trigger_id is None:
|
||||
raise exception.InvalidParameterValue('supply at least one id')
|
||||
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
num_deleted = db.trigger_execution_delete(ctxt, execution_id,
|
||||
trigger_id)
|
||||
return num_deleted > 0
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_get_next(cls):
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
return db.trigger_execution_get_next(ctxt)
|
||||
|
||||
def shutdown(self):
|
||||
self._kill_greenthread()
|
||||
self._unregister()
|
||||
|
||||
def register_operation(self, operation_id, **kwargs):
|
||||
if operation_id in self._operation_ids:
|
||||
msg = (_("The operation_id(%s) is exist") % operation_id)
|
||||
raise exception.ScheduledOperationExist(msg)
|
||||
|
||||
if self._greenthread and not self._greenthread.running:
|
||||
raise exception.TriggerIsInvalid(trigger_id=self._id)
|
||||
|
||||
self._operation_ids.add(operation_id)
|
||||
if self._greenthread is None:
|
||||
self._start_greenthread()
|
||||
|
||||
def unregister_operation(self, operation_id, **kwargs):
|
||||
if operation_id not in self._operation_ids:
|
||||
return
|
||||
|
||||
self._operation_ids.remove(operation_id)
|
||||
if 0 == len(self._operation_ids):
|
||||
self._kill_greenthread()
|
||||
self._operation_ids.discard(operation_id)
|
||||
|
||||
def update_trigger_property(self, trigger_property):
|
||||
valid_trigger_property = self.check_trigger_definition(
|
||||
|
@ -150,82 +204,38 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
if valid_trigger_property == self._trigger_property:
|
||||
return
|
||||
|
||||
timer, first_run_time = self._get_timer_and_first_run_time(
|
||||
valid_trigger_property)
|
||||
timer = self._get_timer(valid_trigger_property)
|
||||
first_run_time = self._compute_next_run_time(
|
||||
datetime.utcnow(), valid_trigger_property['end_time'], timer)
|
||||
|
||||
if not first_run_time:
|
||||
msg = (_("The new trigger property is invalid, "
|
||||
"Can not find the first run time"))
|
||||
raise exception.InvalidInput(msg)
|
||||
|
||||
if self._greenthread is not None:
|
||||
pre_run_time = self._greenthread.pre_run_time
|
||||
if pre_run_time:
|
||||
end_time = pre_run_time + timedelta(
|
||||
seconds=self._trigger_property['window'])
|
||||
if first_run_time <= end_time:
|
||||
msg = (_("The new trigger property is invalid, "
|
||||
"First run time%(t1)s must be after %(t2)s") %
|
||||
{'t1': first_run_time, 't2': end_time})
|
||||
raise exception.InvalidInput(msg)
|
||||
|
||||
self._trigger_property = valid_trigger_property
|
||||
self._trigger_execution_delete(trigger_id=self._id)
|
||||
self._trigger_execution_new(self._id, first_run_time)
|
||||
|
||||
if len(self._operation_ids) > 0:
|
||||
# Restart greenthread to take the change of trigger property
|
||||
# effect immediately
|
||||
self._kill_greenthread()
|
||||
self._create_green_thread(first_run_time, timer)
|
||||
@classmethod
|
||||
def _trigger_operations(cls, trigger_id, expect_run_time, window):
|
||||
"""Trigger operations once"""
|
||||
|
||||
def _kill_greenthread(self):
|
||||
if self._greenthread:
|
||||
self._greenthread.kill()
|
||||
self._greenthread = None
|
||||
|
||||
def _start_greenthread(self):
|
||||
# Find the first time.
|
||||
# We don't known when using this trigger first time.
|
||||
timer, first_run_time = self._get_timer_and_first_run_time(
|
||||
self._trigger_property)
|
||||
if not first_run_time:
|
||||
raise exception.TriggerIsInvalid(trigger_id=self._id)
|
||||
|
||||
self._create_green_thread(first_run_time, timer)
|
||||
|
||||
def _create_green_thread(self, first_run_time, timer):
|
||||
func = functools.partial(
|
||||
self._trigger_operations,
|
||||
trigger_property=self._trigger_property.copy(),
|
||||
timer=timer)
|
||||
|
||||
self._greenthread = TriggerOperationGreenThread(
|
||||
first_run_time, func)
|
||||
|
||||
def _trigger_operations(self, expect_run_time, trigger_property, timer):
|
||||
"""Trigger operations once
|
||||
|
||||
returns: wait time for next run
|
||||
"""
|
||||
|
||||
# Just for robustness, actually expect_run_time always <= now
|
||||
# but, if the scheduling of eventlet is not accurate, then we
|
||||
# can do some adjustments.
|
||||
entry_time = datetime.utcnow()
|
||||
if entry_time < expect_run_time and (
|
||||
int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0):
|
||||
return expect_run_time
|
||||
|
||||
# The self._executor.execute_operation may have I/O operation.
|
||||
# The executor execute_operation may have I/O operation.
|
||||
# If it is, this green thread will be switched out during looping
|
||||
# operation_ids. In order to avoid changing self._operation_ids
|
||||
# during the green thread is switched out, copy self._operation_ids
|
||||
# as the iterative object.
|
||||
operation_ids = self._operation_ids.copy()
|
||||
trigger = cls._triggers.get(trigger_id)
|
||||
if not trigger:
|
||||
LOG.warning("Can't find trigger: %s" % trigger_id)
|
||||
return
|
||||
operations_ids = trigger._operation_ids.copy()
|
||||
sent_ops = set()
|
||||
window = trigger_property.get("window")
|
||||
end_time = expect_run_time + timedelta(seconds=window)
|
||||
|
||||
for operation_id in operation_ids:
|
||||
if operation_id not in self._operation_ids:
|
||||
for operation_id in operations_ids:
|
||||
if operation_id not in trigger._operation_ids:
|
||||
# Maybe, when traversing this operation_id, it has been
|
||||
# removed by self.unregister_operation
|
||||
LOG.warning("Execute operation %s which is not exist, "
|
||||
|
@ -236,15 +246,13 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
if now >= end_time:
|
||||
LOG.error("Can not trigger operations to run. Because it is "
|
||||
"out of window time. now=%(now)s, "
|
||||
"end time=%(end_time)s, expect run time=%(expect)s,"
|
||||
" wating operations=%(ops)s",
|
||||
"end time=%(end_time)s, waiting operations=%(ops)s",
|
||||
{'now': now, 'end_time': end_time,
|
||||
'expect': expect_run_time,
|
||||
'ops': operation_ids - sent_ops})
|
||||
'ops': operations_ids - sent_ops})
|
||||
break
|
||||
|
||||
try:
|
||||
self._executor.execute_operation(
|
||||
trigger._executor.execute_operation(
|
||||
operation_id, now, expect_run_time, window)
|
||||
except Exception:
|
||||
LOG.exception("Submit operation to executor failed, operation"
|
||||
|
@ -252,18 +260,6 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
|
||||
sent_ops.add(operation_id)
|
||||
|
||||
next_time = self._compute_next_run_time(
|
||||
expect_run_time, trigger_property['end_time'], timer)
|
||||
now = datetime.utcnow()
|
||||
if next_time and next_time <= now:
|
||||
LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the "
|
||||
"entry time=%(entry)s is too late, even exceeds the end"
|
||||
" time of window=%(end)s, or it was blocked where "
|
||||
"sending the operation to executor.",
|
||||
{'next_time': next_time, 'now': now,
|
||||
'entry': entry_time, 'end': end_time})
|
||||
return next_time
|
||||
|
||||
@classmethod
|
||||
def check_trigger_definition(cls, trigger_definition):
|
||||
"""Check trigger definition
|
||||
|
@ -350,14 +346,11 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
CONF.time_format).driver
|
||||
|
||||
@classmethod
|
||||
def _get_timer_and_first_run_time(cls, trigger_property):
|
||||
def _get_timer(cls, trigger_property):
|
||||
tf_cls = cls._get_time_format_class()
|
||||
timer = tf_cls(trigger_property['start_time'],
|
||||
trigger_property['pattern'])
|
||||
first_run_time = cls._compute_next_run_time(
|
||||
datetime.utcnow(), trigger_property['end_time'], timer)
|
||||
|
||||
return timer, first_run_time
|
||||
return timer
|
||||
|
||||
@classmethod
|
||||
def check_configuration(cls):
|
||||
|
|
|
@ -221,15 +221,19 @@ class OperationEngineManager(manager.Manager):
|
|||
|
||||
@messaging.expected_exceptions(exception.InvalidInput)
|
||||
def create_trigger(self, context, trigger):
|
||||
LOG.debug('Creating trigger (id: "%s" type: "%s")',
|
||||
trigger.id, trigger.type)
|
||||
self.trigger_manager.add_trigger(trigger.id, trigger.type,
|
||||
trigger.properties)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.DeleteTriggerNotAllowed)
|
||||
def delete_trigger(self, context, trigger_id):
|
||||
LOG.debug('Deleting trigger (id: "%s")', trigger_id)
|
||||
self.trigger_manager.remove_trigger(trigger_id)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.InvalidInput)
|
||||
def update_trigger(self, context, trigger):
|
||||
LOG.debug('Updating trigger (id: "%s")', trigger.id)
|
||||
self.trigger_manager.update_trigger(trigger.id, trigger.properties)
|
||||
|
|
|
@ -64,10 +64,13 @@ class OperationEngineAPI(object):
|
|||
trigger_id=trigger_id)
|
||||
|
||||
def create_trigger(self, ctxt, trigger):
|
||||
return self._client.call(ctxt, 'create_trigger', trigger=trigger)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'create_trigger',
|
||||
trigger=trigger)
|
||||
|
||||
def delete_trigger(self, ctxt, trigger_id):
|
||||
return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'delete_trigger',
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def update_trigger(self, ctxt, trigger):
|
||||
return self._client.call(ctxt, 'update_trigger', trigger=trigger)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'update_trigger',
|
||||
trigger=trigger)
|
||||
|
|
|
@ -109,12 +109,14 @@ class ScheduledOperationsTest(karbor_base.KarborBaseTest):
|
|||
|
||||
def test_scheduled_operations_create_and_scheduled(self):
|
||||
freq = 2
|
||||
eventlet_grace = 20
|
||||
pattern = "BEGIN:VEVENT\nRRULE:FREQ=MINUTELY;INTERVAL=5;\nEND:VEVENT"
|
||||
cur_property = {'pattern': pattern, 'format': 'calendar'}
|
||||
|
||||
operation = self.store(self._create_for_volume(cur_property))
|
||||
start_time = datetime.now().replace(microsecond=0)
|
||||
sleep_time = self._wait_timestamp(pattern, start_time, freq)
|
||||
sleep_time += eventlet_grace
|
||||
self.assertNotEqual(0, sleep_time)
|
||||
eventlet.sleep(sleep_time)
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
from karbor.tests.fullstack import karbor_base
|
||||
from karbor.tests.fullstack import karbor_objects as objects
|
||||
|
||||
|
@ -40,6 +41,29 @@ class TriggersTest(karbor_base.KarborBaseTest):
|
|||
trigger = self.karbor_client.triggers.get(trigger.id)
|
||||
self.assertEqual(trigger_name, trigger.name)
|
||||
|
||||
def test_triggers_update(self):
|
||||
trigger_name = "FullStack Trigger Test Update"
|
||||
pattern1 = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT"
|
||||
pattern2 = "BEGIN:VEVENT\nRRULE:FREQ=DAILY;INTERVAL=1;\nEND:VEVENT"
|
||||
trigger = self.store(objects.Trigger())
|
||||
trigger.create('time', {'pattern': pattern1, 'format': 'calendar'},
|
||||
name=trigger_name)
|
||||
properties = {
|
||||
'properties': {
|
||||
'pattern': pattern2,
|
||||
'format': 'calendar',
|
||||
'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
|
||||
}
|
||||
}
|
||||
|
||||
self.karbor_client.triggers.update(
|
||||
trigger.id,
|
||||
properties,
|
||||
)
|
||||
|
||||
trigger = self.karbor_client.triggers.get(trigger.id)
|
||||
self.assertEqual(trigger.properties['pattern'], pattern2)
|
||||
|
||||
def test_triggers_delete(self):
|
||||
pattern = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT"
|
||||
trigger = objects.Trigger()
|
||||
|
|
|
@ -43,3 +43,4 @@ def set_defaults(conf):
|
|||
conf.set_default('username', 'karbor', group='trustee')
|
||||
conf.set_default('password', 'password', group='trustee')
|
||||
conf.set_default('user_domain_id', 'default', group='trustee')
|
||||
conf.set_default('trigger_poll_interval', 1)
|
||||
|
|
|
@ -10,18 +10,27 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import eventlet
|
||||
import functools
|
||||
import heapq
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from karbor import context as karbor_context
|
||||
from karbor import exception
|
||||
from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \
|
||||
import TimeTrigger
|
||||
from karbor.services.operationengine.engine.triggers.timetrigger \
|
||||
import time_trigger as tt
|
||||
from karbor.tests import base
|
||||
|
||||
|
||||
TriggerExecution = namedtuple('TriggerExecution',
|
||||
['execution_time', 'id', 'trigger_id'])
|
||||
|
||||
|
||||
class FakeTimeFormat(object):
|
||||
def __init__(self, start_time, pattern):
|
||||
super(FakeTimeFormat, self).__init__()
|
||||
|
@ -49,30 +58,78 @@ class FakeExecutor(object):
|
|||
self._ops[operation_id] += 1
|
||||
eventlet.sleep(0.5)
|
||||
|
||||
def clear(self):
|
||||
self._ops.clear()
|
||||
|
||||
class FakeTimeTrigger(object):
|
||||
@classmethod
|
||||
def get_time_format(cls, *args, **kwargs):
|
||||
return FakeTimeFormat
|
||||
|
||||
|
||||
class FakeDb(object):
|
||||
def __init__(self):
|
||||
self._db = []
|
||||
|
||||
def trigger_execution_get_next(self, context):
|
||||
if len(self._db) == 0:
|
||||
return None
|
||||
return self._db[0]
|
||||
|
||||
def trigger_execution_create(self, context, trigger_id, time):
|
||||
element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id)
|
||||
heapq.heappush(self._db, element)
|
||||
|
||||
def trigger_execution_update(self, context, id, current_time, new_time):
|
||||
for idx, element in enumerate(self._db):
|
||||
if element.id == id:
|
||||
if element.execution_time != current_time:
|
||||
return False
|
||||
self._db[idx] = TriggerExecution(new_time, element.id,
|
||||
element.trigger_id)
|
||||
break
|
||||
heapq.heapify(self._db)
|
||||
return True
|
||||
|
||||
def trigger_execution_delete(self, context, id, trigger_id):
|
||||
removed_ids = []
|
||||
for idx, element in enumerate(self._db):
|
||||
if (id and element.id == id) or (trigger_id and
|
||||
element.trigger_id == trigger_id):
|
||||
removed_ids.append(idx)
|
||||
|
||||
for idx in reversed(removed_ids):
|
||||
self._db.pop(idx)
|
||||
heapq.heapify(self._db)
|
||||
return len(removed_ids)
|
||||
|
||||
|
||||
def time_trigger_test(func):
|
||||
@functools.wraps(func)
|
||||
@mock.patch.object(tt, 'db', FakeDb())
|
||||
@mock.patch.object(karbor_context, 'get_admin_context', lambda: None)
|
||||
@mock.patch.object(tt.TimeTrigger, '_get_time_format_class',
|
||||
FakeTimeTrigger.get_time_format)
|
||||
def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class TimeTriggerTestCase(base.TestCase):
|
||||
_tid = 0
|
||||
_default_executor = FakeExecutor()
|
||||
|
||||
def setUp(self):
|
||||
super(TimeTriggerTestCase, self).setUp()
|
||||
|
||||
self._set_configuration()
|
||||
|
||||
mock_obj = mock.Mock()
|
||||
mock_obj.return_value = FakeTimeFormat
|
||||
TimeTrigger._get_time_format_class = mock_obj
|
||||
|
||||
self._default_executor = FakeExecutor()
|
||||
|
||||
def test_check_configuration(self):
|
||||
self._set_configuration(10, 20, 30)
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"Configurations of time trigger are invalid",
|
||||
TimeTrigger.check_configuration)
|
||||
tt.TimeTrigger.check_configuration)
|
||||
self._set_configuration()
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_start_time(self):
|
||||
trigger_property = {
|
||||
"pattern": "",
|
||||
|
@ -81,22 +138,23 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger\'s start time is unknown",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['start_time'] = 'abc'
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The format of trigger .* is not correct",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['start_time'] = 123
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger .* is not an instance of string",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@mock.patch.object(FakeTimeFormat, 'get_min_interval')
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_interval(self, get_min_interval):
|
||||
get_min_interval.return_value = 0
|
||||
|
||||
|
@ -106,9 +164,10 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The interval of two adjacent time points .*",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_window(self):
|
||||
trigger_property = {
|
||||
"window": "abc",
|
||||
|
@ -117,15 +176,16 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger window.* is not integer",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['window'] = 1000
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger windows .* must be between .*",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_end_time(self):
|
||||
trigger_property = {
|
||||
"window": 15,
|
||||
|
@ -135,27 +195,24 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The format of trigger .* is not correct",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_register_operation(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
operation_id = "1"
|
||||
trigger.register_operation(operation_id)
|
||||
eventlet.sleep(0.3)
|
||||
eventlet.sleep(2)
|
||||
|
||||
self.assertGreaterEqual(trigger._executor._ops[operation_id], 1)
|
||||
self.assertGreaterEqual(self._default_executor._ops[operation_id], 1)
|
||||
self.assertRaisesRegex(exception.ScheduledOperationExist,
|
||||
"The operation_id.* is exist",
|
||||
trigger.register_operation,
|
||||
operation_id)
|
||||
|
||||
eventlet.sleep(0.3)
|
||||
self.assertRaises(exception.TriggerIsInvalid,
|
||||
trigger.register_operation,
|
||||
"2")
|
||||
|
||||
@time_trigger_test
|
||||
def test_unregister_operation(self):
|
||||
trigger = self._generate_trigger()
|
||||
operation_id = "2"
|
||||
|
@ -164,26 +221,9 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
self.assertIn(operation_id, trigger._operation_ids)
|
||||
|
||||
trigger.unregister_operation(operation_id)
|
||||
self.assertNotIn(operation_id, trigger._operation_ids)
|
||||
|
||||
def test_unregister_operation_when_scheduling(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
for op_id in ['1', '2', '3']:
|
||||
trigger.register_operation(op_id)
|
||||
self.assertIn(op_id, trigger._operation_ids)
|
||||
eventlet.sleep(0.5)
|
||||
|
||||
for op_id in ['2', '3']:
|
||||
trigger.unregister_operation(op_id)
|
||||
self.assertNotIn(op_id, trigger._operation_ids)
|
||||
eventlet.sleep(0.6)
|
||||
|
||||
self.assertGreaterEqual(trigger._executor._ops['1'], 1)
|
||||
|
||||
self.assertTrue(('2' not in trigger._executor._ops) or (
|
||||
'3' not in trigger._executor._ops))
|
||||
self.assertNotIn(trigger._id, trigger._operation_ids)
|
||||
|
||||
@time_trigger_test
|
||||
def test_update_trigger_property(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
|
@ -199,18 +239,10 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
trigger.update_trigger_property,
|
||||
trigger_property)
|
||||
|
||||
trigger.register_operation('1')
|
||||
eventlet.sleep(0.2)
|
||||
trigger_property['end_time'] = (
|
||||
datetime.utcnow() + timedelta(seconds=1))
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
".*First run time.* must be after.*",
|
||||
trigger.update_trigger_property,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_update_trigger_property_success(self):
|
||||
trigger = self._generate_trigger()
|
||||
trigger.register_operation('1')
|
||||
trigger.register_operation('7')
|
||||
eventlet.sleep(0.2)
|
||||
|
||||
trigger_property = {
|
||||
|
@ -221,12 +253,8 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
}
|
||||
with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c:
|
||||
c.return_value = datetime.utcnow() + timedelta(seconds=20)
|
||||
old_id = id(trigger._greenthread)
|
||||
|
||||
trigger.update_trigger_property(trigger_property)
|
||||
|
||||
self.assertNotEqual(old_id, id(trigger._greenthread))
|
||||
|
||||
def _generate_trigger(self, end_time=None):
|
||||
if not end_time:
|
||||
end_time = datetime.utcnow() + timedelta(seconds=1)
|
||||
|
@ -238,11 +266,15 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
"end_time": end_time
|
||||
}
|
||||
|
||||
self._default_executor.clear()
|
||||
return TimeTrigger("123", trigger_property, self._default_executor)
|
||||
return tt.TimeTrigger(
|
||||
uuidutils.generate_uuid(),
|
||||
trigger_property,
|
||||
self._default_executor,
|
||||
)
|
||||
|
||||
def _set_configuration(self, min_window=15,
|
||||
max_window=30, min_interval=60):
|
||||
max_window=30, min_interval=60, poll_interval=1):
|
||||
self.override_config('min_interval', min_interval)
|
||||
self.override_config('min_window_time', min_window)
|
||||
self.override_config('max_window_time', max_window)
|
||||
self.override_config('trigger_poll_interval', poll_interval)
|
||||
|
|
Loading…
Reference in New Issue