Merge "Add event configuration for event trigger"

This commit is contained in:
Jenkins 2016-09-07 11:42:45 +00:00 committed by Gerrit Code Review
commit 7c75d37d8f
5 changed files with 209 additions and 46 deletions

View File

@ -0,0 +1,6 @@
- event_types:
- compute.instance.create.*
properties:
resource_id: <% $.payload.instance_id %>
project_id: <% $.context.project_id %>
user_id: <% $.context.user_id %>

View File

@ -191,6 +191,11 @@ event_engine_opts = [
default='mistral_event_engine',
help='The message topic that the event engine listens on.'
),
cfg.StrOpt(
'event_definitions_cfg_file',
default='/etc/mistral/event_definitions.yaml',
help='Configuration file for event definitions.'
),
]
execution_expiration_policy_opts = [

View File

@ -26,6 +26,7 @@ from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import listener
from oslo_messaging import target
from oslo_messaging import transport
from oslo_utils import timeutils
import six
LOG = logging.getLogger(__name__)
@ -52,12 +53,16 @@ def handle_event(self, ctxt, publisher_id, event_type, payload, metadata):
'payload: %s, metadata: %s.', publisher_id, event_type, payload,
metadata)
self.event_engine.process_notification_event(
ctxt,
event_type,
payload,
metadata
)
notification = {
'event_type': event_type,
'payload': payload,
'publisher': publisher_id,
'timestamp': metadata.get('timestamp',
ctxt.get('timestamp', timeutils.utcnow())),
'context': ctxt
}
self.event_engine.process_notification_event(notification)
return dispatcher.NotificationResult.HANDLED

View File

@ -13,27 +13,115 @@
# under the License.
from collections import defaultdict
import os
import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import threadgroup
from oslo_utils import fnmatch
import six
import yaml
from mistral import context as auth_ctx
from mistral import coordination
from mistral.db.v2 import api as db_api
from mistral import exceptions
from mistral import expressions
from mistral import messaging as mistral_messaging
from mistral.services import security
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# Event queue event constants.
EVENT_CONTEXT = 'context'
EVENT_TYPE = 'type'
EVENT_PAYLOAD = 'payload'
EVENT_METADATA = 'metadata'
DEFAULT_PROPERTIES = {
'service': '<% $.publisher %>',
'project_id': '<% $.context.project_id %>',
'user_id': '<% $.context.user_id %>',
'timestamp': '<% $.timestamp %>'
}
class EventDefinition(object):
def __init__(self, definition_cfg):
self.cfg = definition_cfg
try:
self.event_types = self.cfg['event_types']
self.properties = self.cfg['properties']
except KeyError as err:
raise exceptions.MistralException(
"Required field %s not specified" % err.args[0]
)
if isinstance(self.event_types, six.string_types):
self.event_types = [self.event_types]
def match_type(self, event_type):
for t in self.event_types:
if fnmatch.fnmatch(event_type, t):
return True
return False
def convert(self, event):
return expressions.evaluate_recursively(self.properties, event)
class NotificationsConverter(object):
def __init__(self):
config_file = CONF.event_engine.event_definitions_cfg_file
definition_cfg = []
if os.path.exists(config_file):
with open(config_file) as cf:
config = cf.read()
try:
definition_cfg = yaml.safe_load(config)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = (
"Invalid YAML syntax in Definitions file "
"%(file)s at line: %(line)s, column: %(column)s."
% dict(file=config_file,
line=mark.line + 1,
column=mark.column + 1)
)
else:
errmsg = (
"YAML error reading Definitions file %s" %
CONF.event_engine.event_definitions_cfg_file
)
LOG.error(errmsg)
raise exceptions.MistralError(
'Invalid event definition configuration file. %s' %
config_file
)
self.definitions = [EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
def get_event_definition(self, event_type):
for d in self.definitions:
if d.match_type(event_type):
return d
return None
def convert(self, event_type, event):
edef = self.get_event_definition(event_type)
if edef is None:
LOG.debug('No event definition found for type: %s, use default '
'settings instead.', event_type)
return expressions.evaluate_recursively(DEFAULT_PROPERTIES, event)
return edef.convert(event)
class EventEngine(coordination.Service):
@ -55,6 +143,10 @@ class EventEngine(coordination.Service):
self.lock = threading.Lock()
LOG.debug('Loading notification definitions.')
self.notification_converter = NotificationsConverter()
self._start_handler()
self._start_listeners()
@ -131,15 +223,13 @@ class EventEngine(coordination.Service):
exchange, topic = ex_t
self._add_event_listener(exchange, topic, events)
def _start_workflow(self, triggers, payload, metadata):
def _start_workflow(self, triggers, event_params):
"""Start workflows defined in event triggers."""
for t in triggers:
LOG.info('Start to process event trigger: %s', t['id'])
workflow_params = t.get('workflow_params', {})
workflow_params.update(
{'event_payload': payload, 'event_metadata': metadata}
)
workflow_params.update({'event_params': event_params})
# Setup context before schedule triggers.
ctx = security.create_context(t['trust_id'], t['project_id'])
@ -167,10 +257,8 @@ class EventEngine(coordination.Service):
while True:
event = self.event_queue.get()
context = event.get(EVENT_CONTEXT)
event_type = event.get(EVENT_TYPE)
payload = event.get(EVENT_PAYLOAD)
metadata = event.get(EVENT_METADATA)
context = event.get('context')
event_type = event.get('event_type')
# NOTE(kong): Use lock here to protect event_triggers_map variable
# from being updated outside the thread.
@ -190,7 +278,12 @@ class EventEngine(coordination.Service):
LOG.debug('Start to handle event: %s, %d trigger(s) '
'registered.', event_type, len(triggers))
self._start_workflow(triggers, payload, metadata)
event_params = self.notification_converter.convert(
event_type,
event
)
self._start_workflow(triggers, event_params)
self.event_queue.task_done()
@ -200,22 +293,14 @@ class EventEngine(coordination.Service):
self.handler_tg.add_thread(self._process_event_queue)
def process_notification_event(self, context, event_type, payload,
metadata):
def process_notification_event(self, notification):
"""Callback funtion by event handler.
Just put notification into a queue.
"""
event = {
EVENT_CONTEXT: context,
EVENT_TYPE: event_type,
EVENT_PAYLOAD: payload,
EVENT_METADATA: metadata
}
LOG.debug("Putting notification event to event queue.")
LOG.debug("Adding notification event to event queue: %s", event)
self.event_queue.put(event)
self.event_queue.put(notification)
def create_event_trigger(self, trigger, events):
"""An endpoint method for creating event trigger.

View File

@ -35,7 +35,7 @@ my_wf:
"""
EXCHANGE_TOPIC = ('openstack', 'notification')
EVENT = 'compute.instance.create.start'
EVENT_TYPE = 'compute.instance.create.start'
EVENT_TRIGGER = {
'name': 'trigger1',
@ -44,7 +44,7 @@ EVENT_TRIGGER = {
'workflow_params': {},
'exchange': 'openstack',
'topic': 'notification',
'event': EVENT,
'event': EVENT_TYPE,
}
cfg.CONF.set_default('auth_enable', False, group='pecan')
@ -74,30 +74,31 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual(1, len(e_engine.exchange_topic_events_map))
self.assertEqual(
EVENT,
EVENT_TYPE,
list(e_engine.exchange_topic_events_map[EXCHANGE_TOPIC])[0]
)
self.assertEqual(1, len(e_engine.event_triggers_map))
self.assertEqual(1, len(e_engine.event_triggers_map[EVENT]))
self.assertEqual(1, len(e_engine.event_triggers_map[EVENT_TYPE]))
self._assert_dict_contains_subset(
trigger.to_dict(),
e_engine.event_triggers_map[EVENT][0]
e_engine.event_triggers_map[EVENT_TYPE][0]
)
self.assertEqual(1, len(e_engine.exchange_topic_listener_map))
@mock.patch('mistral.messaging.start_listener')
def test_process_event_queue(self, mock_start):
trigger = db_api.create_event_trigger(EVENT_TRIGGER)
db_api.create_event_trigger(EVENT_TRIGGER)
client = mock.MagicMock()
e_engine = event_engine.EventEngine(client)
self.addCleanup(e_engine.handler_tg.stop)
event = {
event_engine.EVENT_CONTEXT: {},
event_engine.EVENT_TYPE: EVENT,
event_engine.EVENT_PAYLOAD: {},
event_engine.EVENT_METADATA: {}
'event_type': EVENT_TYPE,
'payload': {},
'publisher': 'fake_publisher',
'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'},
}
with mock.patch.object(e_engine, 'engine_client') as client_mock:
@ -111,10 +112,71 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args)
self.assertDictEqual(
{
'description': 'Workflow execution created by event '
'trigger %s.' % trigger.id,
'event_payload': {},
'event_metadata': {}
'service': 'fake_publisher',
'project_id': 'fake_project',
'user_id': 'fake_user',
'timestamp': ''
},
kwargs
kwargs['event_params']
)
class NotificationsConverterTest(base.BaseTest):
def test_convert(self):
definition_cfg = [
{
'event_types': EVENT_TYPE,
'properties': {'resource_id': '<% $.payload.instance_id %>'}
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {
'event_type': EVENT_TYPE,
'payload': {'instance_id': '12345'},
'publisher': 'fake_publisher',
'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}
}
event = converter.convert(EVENT_TYPE, notification)
self.assertDictEqual(
{'resource_id': '12345'},
event
)
def test_convert_event_type_not_defined(self):
definition_cfg = [
{
'event_types': EVENT_TYPE,
'properties': {'resource_id': '<% $.payload.instance_id %>'}
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {
'event_type': 'fake_event',
'payload': {'instance_id': '12345'},
'publisher': 'fake_publisher',
'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}
}
event = converter.convert('fake_event', notification)
self.assertDictEqual(
{
'service': 'fake_publisher',
'project_id': 'fake_project',
'user_id': 'fake_user',
'timestamp': ''
},
event
)