From 11bb60599e3fc615cd7ad9d5e53995529cc13d18 Mon Sep 17 00:00:00 2001 From: SamKirsch10 Date: Mon, 15 Jun 2015 14:49:25 -0600 Subject: [PATCH] added transforms to talk with kafka --- README.md | 3 +- .../message_formats/reference/transforms.py | 21 ++++++++++++ .../mysql/transforms_repository.py | 6 ++-- monasca_events_api/v2/transforms.py | 32 ++++++++++++++++--- 4 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 monasca_events_api/common/messaging/message_formats/reference/transforms.py diff --git a/README.md b/README.md index f71f65a..e1eb683 100644 --- a/README.md +++ b/README.md @@ -44,8 +44,7 @@ up the server by following the following instructions. To start the server, run the following command: Running the server in foreground mode - gunicorn -k eventlet --worker-connections=2000 --backlog=1000 - --paste /etc/monasca/events_api.ini + gunicorn -k eventlet --worker-connections=2000 --backlog=1000 --paste /etc/monasca/events_api.ini Running the server as daemons gunicorn -k eventlet --worker-connections=2000 --backlog=1000 diff --git a/monasca_events_api/common/messaging/message_formats/reference/transforms.py b/monasca_events_api/common/messaging/message_formats/reference/transforms.py new file mode 100644 index 0000000..227cb34 --- /dev/null +++ b/monasca_events_api/common/messaging/message_formats/reference/transforms.py @@ -0,0 +1,21 @@ +# Copyright 2015 Hewlett-Packard +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +def transform(transform_id, tenant_id, event): + transformed_event = dict( + transform_definition=event, + tenant_id=tenant_id, + transform_id=transform_id + ) + return transformed_event diff --git a/monasca_events_api/common/repositories/mysql/transforms_repository.py b/monasca_events_api/common/repositories/mysql/transforms_repository.py index 6acc527..bb5325d 100644 --- a/monasca_events_api/common/repositories/mysql/transforms_repository.py +++ b/monasca_events_api/common/repositories/mysql/transforms_repository.py @@ -33,7 +33,7 @@ class TransformsRepository(mysql_repository.MySQLRepository, with cnxn: now = timeutils.utcnow() try: - cursor.execute("""insert into event_transform_specification( + cursor.execute("""insert into event_transform( id, tenant_id, name, @@ -57,12 +57,12 @@ class TransformsRepository(mysql_repository.MySQLRepository, def list_transforms(self, tenant_id): cnxn, cursor = self._get_cnxn_cursor_tuple() with cnxn: - cursor.execute("""select * from event_transform_specification + cursor.execute("""select * from event_transform where tenant_id = %s and deleted_at IS NULL""", [tenant_id]) return cursor.fetchall() def delete_transform(self, tenant_id, transform_id): cnxn, cursor = self._get_cnxn_cursor_tuple() with cnxn: - cursor.execute("""delete from event_transform_specification + cursor.execute("""delete from event_transform where id = %s and tenant_id = %s""", (transform_id, tenant_id)) diff --git a/monasca_events_api/v2/transforms.py b/monasca_events_api/v2/transforms.py index 6ea11bb..5eba125 100644 --- a/monasca_events_api/v2/transforms.py +++ b/monasca_events_api/v2/transforms.py @@ -1,4 +1,4 @@ -# Copyright 2014 Hewlett-Packard +# Copyright 2015 Hewlett-Packard # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -22,6 +22,7 @@ from oslo.config import cfg import simport from monasca_events_api.api import transforms_api_v2 +from monasca_events_api.common.messaging import exceptions as message_queue_exceptions from monasca_events_api.common.repositories import exceptions as repository_exceptions from monasca_events_api.openstack.common import log from monasca_events_api.openstack.common import uuidutils @@ -29,6 +30,8 @@ from monasca_events_api.v2.common import helpers from monasca_events_api.v2.common.schemas import (exceptions as schemas_exceptions) from monasca_events_api.v2.common.schemas import ( transforms_request_body_schema as schemas_transforms) +from monasca_events_api.common.messaging.message_formats.reference.transforms import ( + transform as transform_event) LOG = log.getLogger(__name__) @@ -48,6 +51,8 @@ class Transforms(transforms_api_v2.TransformsV2API): self._region = cfg.CONF.region self._default_authorized_roles = ( cfg.CONF.security.default_authorized_roles) + self._message_queue = ( + simport.load(cfg.CONF.messaging.driver)("transform-definitions")) self._transforms_repo = ( simport.load(cfg.CONF.repositories.transforms)()) @@ -59,6 +64,8 @@ class Transforms(transforms_api_v2.TransformsV2API): transform_id = uuidutils.generate_uuid() tenant_id = helpers.get_tenant_id(req) self._create_transform(transform_id, tenant_id, transform) + transformed_event = transform_event(transform_id, tenant_id, transform) + self._send_event(transformed_event) res.body = self._create_transform_response(transform_id, transform) res.status = falcon.HTTP_200 @@ -72,8 +79,25 @@ class Transforms(transforms_api_v2.TransformsV2API): helpers.validate_authorization(req, self._default_authorized_roles) tenant_id = helpers.get_tenant_id(req) self._delete_transform(tenant_id, transform_id) + transformed_event = transform_event(transform_id, tenant_id, []) + self._send_event(transformed_event) res.status = falcon.HTTP_204 + def _send_event(self, event): + """Send the event using the message queue. + + :param metrics: An event object. + :raises: falcon.HTTPServiceUnavailable + """ + try: + str_msg = json.dumps(event, cls=MyEncoder, + ensure_ascii=False).encode('utf8') + self._message_queue.send_message(str_msg) + except message_queue_exceptions.MessageQueueException as ex: + LOG.exception(ex) + raise falcon.HTTPInternalServerError('Service unavailable', + ex.message) + def _validate_transform(self, transform): """Validates the transform @@ -100,9 +124,9 @@ class Transforms(transforms_api_v2.TransformsV2API): enabled = transform['enabled'] else: enabled = False - self._transforms_repo.create_transforms(transform_id, tenant_id, name, - description, specification, - enabled) + self._transforms_repo.create_transforms(transform_id, tenant_id, + name, description, + specification, enabled) except repository_exceptions.RepositoryException as ex: LOG.error(ex) raise falcon.HTTPInternalServerError('Service unavailable',