From 4e1e358c8b595eba42ead91a118fd14b8b5546e7 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Mon, 20 Jun 2016 22:49:49 +1200 Subject: [PATCH] Add event trigger REST API This patch adds the Mistral changes to support the new event trigger REST API. Change-Id: I8190ce81d46cc8296db29f41442354cdfe1a5bbd Implements: blueprint event-notification-trigger Co-Authored-By: Lingxian Kong --- etc/policy.json | 8 +- mistral/api/controllers/v2/event_trigger.py | 143 ++++++++++++ mistral/api/controllers/v2/resources.py | 53 +++++ mistral/api/controllers/v2/root.py | 2 + mistral/engine/base.py | 13 ++ mistral/engine/rpc_backend/rpc.py | 73 +++++- mistral/exceptions.py | 4 + mistral/services/triggers.py | 73 ++++++ .../tests/unit/api/v2/test_event_trigger.py | 211 ++++++++++++++++++ 9 files changed, 577 insertions(+), 3 deletions(-) create mode 100644 mistral/api/controllers/v2/event_trigger.py create mode 100644 mistral/tests/unit/api/v2/test_event_trigger.py diff --git a/etc/policy.json b/etc/policy.json index c5df702a..01b97cd2 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -54,5 +54,11 @@ "workflows:delete": "rule:admin_or_owner", "workflows:get": "rule:admin_or_owner", "workflows:list": "rule:admin_or_owner", - "workflows:update": "rule:admin_or_owner" + "workflows:update": "rule:admin_or_owner", + + "event_triggers:create": "rule:admin_or_owner", + "event_triggers:delete": "rule:admin_or_owner", + "event_triggers:get": "rule:admin_or_owner", + "event_triggers:list": "rule:admin_or_owner", + "event_triggers:update": "rule:admin_or_owner", } diff --git a/mistral/api/controllers/v2/event_trigger.py b/mistral/api/controllers/v2/event_trigger.py new file mode 100644 index 00000000..86e5045d --- /dev/null +++ b/mistral/api/controllers/v2/event_trigger.py @@ -0,0 +1,143 @@ +# Copyright 2016 - IBM Corp. +# Copyright 2016 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api import access_control as acl +from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import types +from mistral import context as auth_ctx +from mistral.db.v2 import api as db_api +from mistral import exceptions as exc +from mistral.services import triggers +from mistral.utils import rest_utils + +LOG = logging.getLogger(__name__) + +UPDATE_NOT_ALLOWED = ['exchange', 'topic', 'event'] +CREATE_MANDATORY = set(['exchange', 'topic', 'event', 'workflow_id']) + + +class EventTriggersController(rest.RestController): + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.EventTrigger, types.uuid) + def get(self, id): + """Returns the specified event_trigger.""" + acl.enforce('event_trigger:get', auth_ctx.ctx()) + + LOG.info('Fetch event trigger [id=%s]', id) + + db_model = db_api.get_event_trigger(id) + + return resources.EventTrigger.from_dict(db_model.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.EventTrigger, body=resources.EventTrigger, + status_code=201) + def post(self, event_trigger): + """Creates a new event trigger.""" + acl.enforce('event_trigger:create', auth_ctx.ctx()) + + values = event_trigger.to_dict() + input_keys = [k for k in values if values[k]] + + if CREATE_MANDATORY - set(input_keys): + raise exc.EventTriggerException( + "Params %s must be provided for creating event trigger." % + CREATE_MANDATORY + ) + + LOG.info('Create event trigger: %s', values) + + db_model = triggers.create_event_trigger( + values.get('name', ''), + values.get('exchange'), + values.get('topic'), + values.get('event'), + values.get('workflow_id'), + workflow_input=values.get('workflow_input'), + workflow_params=values.get('workflow_params'), + ) + + return resources.EventTrigger.from_dict(db_model.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.EventTrigger, types.uuid, + body=resources.EventTrigger) + def put(self, id, event_trigger): + """Updates an existing event trigger. + + The exchange, topic and event can not be updated. The right way to + change them is to delete the event trigger first, then create a new + event trigger with new params. + """ + acl.enforce('event_trigger:update', auth_ctx.ctx()) + + values = event_trigger.to_dict() + + for field in UPDATE_NOT_ALLOWED: + if values.get(field, None): + raise exc.EventTriggerException( + "Can not update fields %s of event trigger." % + UPDATE_NOT_ALLOWED + ) + + db_api.ensure_event_trigger_exists(id) + + LOG.info('Update event trigger: [id=%s, values=%s]', id, values) + + db_model = triggers.update_event_trigger(id, values) + + return resources.EventTrigger.from_dict(db_model.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(None, types.uuid, status_code=204) + def delete(self, id): + """Delete event trigger.""" + acl.enforce('event_trigger:delete', auth_ctx.ctx()) + + LOG.info("Delete event trigger [id=%s]", id) + + event_trigger = db_api.get_event_trigger(id) + + triggers.delete_event_trigger(event_trigger.to_dict()) + + @wsme_pecan.wsexpose(resources.EventTriggers, types.uuid, int, + types.uniquelist, types.list, types.uniquelist, + types.jsontype) + def get_all(self, marker=None, limit=None, sort_keys='created_at', + sort_dirs='asc', fields='', **filters): + """Return all event triggers.""" + acl.enforce('event_trigger:list', auth_ctx.ctx()) + + LOG.info("Fetch event triggers. marker=%s, limit=%s, sort_keys=%s, " + "sort_dirs=%s, fields=%s, filters=%s", marker, limit, + sort_keys, sort_dirs, fields, filters) + + return rest_utils.get_all( + resources.EventTriggers, + resources.EventTrigger, + db_api.get_event_triggers, + db_api.get_event_trigger, + resource_function=None, + marker=marker, + limit=limit, + sort_keys=sort_keys, + sort_dirs=sort_dirs, + fields=fields, + **filters + ) diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index a26f322a..a30af4c5 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -540,3 +540,56 @@ class Services(resource.Resource): @classmethod def sample(cls): return cls(services=[Service.sample()]) + + +class EventTrigger(resource.Resource): + """EventTrigger resource.""" + + id = wsme.wsattr(wtypes.text, readonly=True) + created_at = wsme.wsattr(wtypes.text, readonly=True) + updated_at = wsme.wsattr(wtypes.text, readonly=True) + project_id = wsme.wsattr(wtypes.text, readonly=True) + name = wtypes.text + workflow_id = types.uuid + workflow_input = types.jsontype + workflow_params = types.jsontype + exchange = wtypes.text + topic = wtypes.text + event = wtypes.text + scope = SCOPE_TYPES + + @classmethod + def sample(cls): + return cls(id='123e4567-e89b-12d3-a456-426655441414', + created_at='1970-01-01T00:00:00.000000', + updated_at='1970-01-01T00:00:00.000000', + project_id='project', + name='expiration_event_trigger', + workflow_id='123e4567-e89b-12d3-a456-426655441414', + workflow_input={}, + workflow_params={}, + exchange='nova', + topic='notifications', + event='compute.instance.create.end') + + +class EventTriggers(resource.ResourceList): + """A collection of event triggers.""" + + event_triggers = [EventTrigger] + + def __init__(self, **kwargs): + self._type = 'event_triggers' + + super(EventTriggers, self).__init__(**kwargs) + + @classmethod + def sample(cls): + triggers_sample = cls() + triggers_sample.event_triggers = [EventTrigger.sample()] + triggers_sample.next = ("http://localhost:8989/v2/event_triggers?" + "sort_keys=id,name&" + "sort_dirs=asc,desc&limit=10&" + "marker=123e4567-e89b-12d3-a456-426655440000") + + return triggers_sample diff --git a/mistral/api/controllers/v2/root.py b/mistral/api/controllers/v2/root.py index 3881fef3..1c5b1f74 100644 --- a/mistral/api/controllers/v2/root.py +++ b/mistral/api/controllers/v2/root.py @@ -22,6 +22,7 @@ from mistral.api.controllers.v2 import action from mistral.api.controllers.v2 import action_execution from mistral.api.controllers.v2 import cron_trigger from mistral.api.controllers.v2 import environment +from mistral.api.controllers.v2 import event_trigger from mistral.api.controllers.v2 import execution from mistral.api.controllers.v2 import service from mistral.api.controllers.v2 import task @@ -54,6 +55,7 @@ class Controller(object): environments = environment.EnvironmentController() action_executions = action_execution.ActionExecutionsController() services = service.ServicesController() + event_triggers = event_trigger.EventTriggersController() @wsme_pecan.wsexpose(RootResource) def index(self): diff --git a/mistral/engine/base.py b/mistral/engine/base.py index 5fde13a0..d1d721d0 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -146,6 +146,19 @@ class Executor(object): raise NotImplementedError() +@six.add_metaclass(abc.ABCMeta) +class EventEngine(object): + """Action event trigger interface.""" + + @abc.abstractmethod + def create_event_trigger(self, trigger, events): + raise NotImplementedError() + + @abc.abstractmethod + def delete_event_trigger(self, trigger, events): + raise NotImplementedError() + + @six.add_metaclass(abc.ABCMeta) class TaskPolicy(object): """Task policy. diff --git a/mistral/engine/rpc_backend/rpc.py b/mistral/engine/rpc_backend/rpc.py index a88ec8e2..ded1911c 100644 --- a/mistral/engine/rpc_backend/rpc.py +++ b/mistral/engine/rpc_backend/rpc.py @@ -35,6 +35,7 @@ _TRANSPORT = None _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None +_EVENT_ENGINE_CLIENT = None def cleanup(): @@ -43,10 +44,12 @@ def cleanup(): global _TRANSPORT global _ENGINE_CLIENT global _EXECUTOR_CLIENT + global _EVENT_ENGINE_CLIENT _TRANSPORT = None _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None + _EVENT_ENGINE_CLIENT = None def get_transport(): @@ -80,6 +83,17 @@ def get_executor_client(): return _EXECUTOR_CLIENT +def get_event_engine_client(): + global _EVENT_ENGINE_CLIENT + + if not _EVENT_ENGINE_CLIENT: + _EVENT_ENGINE_CLIENT = EventEngineClient( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.event_engine) + ) + + return _EVENT_ENGINE_CLIENT + + def get_rpc_server_driver(): rpc_impl = cfg.CONF.rpc_implementation @@ -570,7 +584,62 @@ class ExecutorClient(base.Executor): class EventEngineServer(object): - """RPC Event Engine server.""" + """RPC EventEngine server.""" def __init__(self, event_engine): - self.event_engine = event_engine + self._event_engine = event_engine + + def create_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'create_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.create_event_trigger(trigger, events) + + def delete_event_trigger(self, rpc_ctx, trigger, events): + LOG.info( + "Received RPC request 'delete_event_trigger'[rpc_ctx=%s," + " trigger=%s, events=%s", rpc_ctx, trigger, events + ) + + return self._event_engine.delete_event_trigger(trigger, events) + + def update_event_trigger(self, rpc_ctx, trigger): + LOG.info( + "Received RPC request 'update_event_trigger'[rpc_ctx=%s," + " trigger=%s", rpc_ctx, trigger + ) + + return self._event_engine.update_event_trigger(trigger) + + +class EventEngineClient(base.EventEngine): + """RPC EventEngine client.""" + + def __init__(self, rpc_conf_dict): + """Constructs an RPC client for the EventEngine service.""" + self._client = get_rpc_client_driver()(rpc_conf_dict) + + def create_event_trigger(self, trigger, events): + return self._client.sync_call( + auth_ctx.ctx(), + 'create_event_trigger', + trigger=trigger, + events=events + ) + + def delete_event_trigger(self, trigger, events): + return self._client.sync_call( + auth_ctx.ctx(), + 'delete_event_trigger', + trigger=trigger, + events=events + ) + + def update_event_trigger(self, trigger): + return self._client.sync_call( + auth_ctx.ctx(), + 'update_event_trigger', + trigger=trigger, + ) diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 744ae78e..2e990773 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -153,6 +153,10 @@ class WorkflowException(MistralException): http_code = 400 +class EventTriggerException(MistralException): + http_code = 400 + + class InputException(MistralException): http_code = 400 diff --git a/mistral/services/triggers.py b/mistral/services/triggers.py index 8f2ee017..6605f471 100644 --- a/mistral/services/triggers.py +++ b/mistral/services/triggers.py @@ -17,6 +17,7 @@ import datetime import six from mistral.db.v2 import api as db_api +from mistral.engine.rpc_backend import rpc from mistral.engine import utils as eng_utils from mistral import exceptions as exc from mistral.services import security @@ -112,3 +113,75 @@ def create_cron_trigger(name, workflow_name, workflow_input, trig = db_api.create_cron_trigger(values) return trig + + +def create_event_trigger(name, exchange, topic, event, workflow_id, + workflow_input=None, workflow_params=None): + with db_api.transaction(): + wf_def = db_api.get_workflow_definition_by_id(workflow_id) + + eng_utils.validate_input( + wf_def, + workflow_input or {}, + parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) + ) + + values = { + 'name': name, + 'workflow_id': workflow_id, + 'workflow_input': workflow_input or {}, + 'workflow_params': workflow_params or {}, + 'exchange': exchange, + 'topic': topic, + 'event': event, + } + + security.add_trust_id(values) + + trig = db_api.create_event_trigger(values) + + trigs = db_api.get_event_triggers(insecure=True, exchange=exchange, + topic=topic) + events = [t.event for t in trigs] + + # NOTE(kong): Send RPC message within the db transaction, rollback if + # any error occurs. + rpc.get_event_engine_client().create_event_trigger( + trig.to_dict(), + events + ) + + return trig + + +def delete_event_trigger(event_trigger): + with db_api.transaction(): + db_api.delete_event_trigger(event_trigger['id']) + + trigs = db_api.get_event_triggers( + insecure=True, + exchange=event_trigger['exchange'], + topic=event_trigger['topic'] + ) + events = set([t.event for t in trigs]) + + # NOTE(kong): Send RPC message within the db transaction, rollback if + # any error occurs. + rpc.get_event_engine_client().delete_event_trigger( + event_trigger, + list(events) + ) + + +def update_event_trigger(id, values): + with db_api.transaction(): + trig = db_api.update_event_trigger(id, values) + + # NOTE(kong): Send RPC message within the db transaction, rollback if + # any error occurs. + rpc.get_event_engine_client().update_event_trigger(trig.to_dict()) + + return trig diff --git a/mistral/tests/unit/api/v2/test_event_trigger.py b/mistral/tests/unit/api/v2/test_event_trigger.py new file mode 100644 index 00000000..52e3fdcf --- /dev/null +++ b/mistral/tests/unit/api/v2/test_event_trigger.py @@ -0,0 +1,211 @@ +# Copyright 2016 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import json +import mock + +from mistral.db.v2 import api as db_api +from mistral.db.v2.sqlalchemy import models +from mistral import exceptions as exc +from mistral.tests.unit.api import base + +WF = models.WorkflowDefinition( + spec={ + 'version': '2.0', + 'name': 'my_wf', + 'tasks': { + 'task1': { + 'action': 'std.noop' + } + } + } +) +WF.update({'id': '123e4567-e89b-12d3-a456-426655440000', 'name': 'my_wf'}) + +TRIGGER = { + 'id': '09cc56a9-d15e-4494-a6e2-c4ec8bdaacae', + 'name': 'my_event_trigger', + 'workflow_id': '123e4567-e89b-12d3-a456-426655440000', + 'workflow_input': '{}', + 'workflow_params': '{}', + 'scope': 'private', + 'exchange': 'openstack', + 'topic': 'notification', + 'event': 'compute.instance.create.start' +} + +trigger_values = copy.deepcopy(TRIGGER) +trigger_values['workflow_input'] = json.loads( + trigger_values['workflow_input']) +trigger_values['workflow_params'] = json.loads( + trigger_values['workflow_params']) + +TRIGGER_DB = models.EventTrigger() +TRIGGER_DB.update(trigger_values) + +MOCK_WF = mock.MagicMock(return_value=WF) +MOCK_TRIGGER = mock.MagicMock(return_value=TRIGGER_DB) +MOCK_TRIGGERS = mock.MagicMock(return_value=[TRIGGER_DB]) +MOCK_NONE = mock.MagicMock(return_value=None) +MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) + + +class TestEventTriggerController(base.APITest): + @mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER) + def test_get(self): + resp = self.app.get( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae' + ) + + self.assertEqual(200, resp.status_int) + self.assertDictEqual(TRIGGER, resp.json) + + @mock.patch.object(db_api, "get_event_trigger", MOCK_NOT_FOUND) + def test_get_not_found(self): + resp = self.app.get( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae', + expect_errors=True + ) + + self.assertEqual(404, resp.status_int) + + @mock.patch.object(db_api, "get_workflow_definition_by_id", MOCK_WF) + @mock.patch.object(db_api, "get_workflow_definition", MOCK_WF) + @mock.patch.object(db_api, "create_event_trigger", MOCK_TRIGGER) + @mock.patch.object(db_api, "get_event_triggers", MOCK_TRIGGERS) + @mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client') + def test_post(self, mock_rpc_client): + client = mock.Mock() + mock_rpc_client.return_value = client + + CREATE_TRIGGER = copy.deepcopy(TRIGGER) + CREATE_TRIGGER.pop('id') + + resp = self.app.post_json('/v2/event_triggers', CREATE_TRIGGER) + + self.assertEqual(201, resp.status_int) + self.assertEqual(1, client.create_event_trigger.call_count) + + self.assertDictEqual( + TRIGGER_DB.to_dict(), + client.create_event_trigger.call_args[0][0] + ) + self.assertListEqual( + ['compute.instance.create.start'], + client.create_event_trigger.call_args[0][1] + ) + + def test_post_no_workflow_id(self): + CREATE_TRIGGER = copy.deepcopy(TRIGGER) + CREATE_TRIGGER.pop('id') + CREATE_TRIGGER.pop('workflow_id') + + resp = self.app.post_json( + '/v2/event_triggers', + CREATE_TRIGGER, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + + @mock.patch.object(db_api, "get_workflow_definition_by_id", MOCK_NOT_FOUND) + def test_post_workflow_not_found(self): + CREATE_TRIGGER = copy.deepcopy(TRIGGER) + CREATE_TRIGGER.pop('id') + + resp = self.app.post_json( + '/v2/event_triggers', + CREATE_TRIGGER, + expect_errors=True + ) + + self.assertEqual(404, resp.status_int) + + @mock.patch.object(db_api, 'ensure_event_trigger_exists', MOCK_NONE) + @mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client') + @mock.patch('mistral.db.v2.api.update_event_trigger') + def test_put(self, mock_update, mock_rpc_client): + client = mock.Mock() + mock_rpc_client.return_value = client + + UPDATED_TRIGGER = models.EventTrigger() + UPDATED_TRIGGER.update(trigger_values) + UPDATED_TRIGGER.update({'name': 'new_name'}) + mock_update.return_value = UPDATED_TRIGGER + + resp = self.app.put_json( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae', + {'name': 'new_name'} + ) + + self.assertEqual(200, resp.status_int) + self.assertEqual(1, client.update_event_trigger.call_count) + + self.assertDictEqual( + UPDATED_TRIGGER.to_dict(), + client.update_event_trigger.call_args[0][0] + ) + + def test_put_field_not_allowed(self): + resp = self.app.put_json( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae', + {'exchange': 'new_exchange'}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + + @mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client') + @mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER) + @mock.patch.object(db_api, "get_event_triggers", + mock.MagicMock(return_value=[])) + @mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE) + def test_delete(self, mock_rpc_client): + client = mock.Mock() + mock_rpc_client.return_value = client + + resp = self.app.delete( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae' + ) + + self.assertEqual(204, resp.status_int) + self.assertEqual(1, client.delete_event_trigger.call_count) + + self.assertDictEqual( + TRIGGER_DB.to_dict(), + client.delete_event_trigger.call_args[0][0] + ) + self.assertListEqual( + [], + client.delete_event_trigger.call_args[0][1] + ) + + @mock.patch.object(db_api, "get_event_trigger", MOCK_NOT_FOUND) + def test_delete_not_found(self): + resp = self.app.delete( + '/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae', + expect_errors=True + ) + + self.assertEqual(404, resp.status_int) + + @mock.patch.object(db_api, "get_event_triggers", MOCK_TRIGGERS) + def test_get_all(self): + resp = self.app.get('/v2/event_triggers') + + self.assertEqual(200, resp.status_int) + + self.assertEqual(1, len(resp.json['event_triggers'])) + self.assertDictEqual(TRIGGER, resp.json['event_triggers'][0])