added transforms to talk with kafka

This commit is contained in:
SamKirsch10 2015-06-15 14:49:25 -06:00
parent 59ba0f8adc
commit 11bb60599e
4 changed files with 53 additions and 9 deletions

View File

@ -44,8 +44,7 @@ up the server by following the following instructions.
To start the server, run the following command:
Running the server in foreground mode
gunicorn -k eventlet --worker-connections=2000 --backlog=1000
--paste /etc/monasca/events_api.ini
gunicorn -k eventlet --worker-connections=2000 --backlog=1000 --paste /etc/monasca/events_api.ini
Running the server as daemons
gunicorn -k eventlet --worker-connections=2000 --backlog=1000

View File

@ -0,0 +1,21 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(transform_id, tenant_id, event):
transformed_event = dict(
transform_definition=event,
tenant_id=tenant_id,
transform_id=transform_id
)
return transformed_event

View File

@ -33,7 +33,7 @@ class TransformsRepository(mysql_repository.MySQLRepository,
with cnxn:
now = timeutils.utcnow()
try:
cursor.execute("""insert into event_transform_specification(
cursor.execute("""insert into event_transform(
id,
tenant_id,
name,
@ -57,12 +57,12 @@ class TransformsRepository(mysql_repository.MySQLRepository,
def list_transforms(self, tenant_id):
cnxn, cursor = self._get_cnxn_cursor_tuple()
with cnxn:
cursor.execute("""select * from event_transform_specification
cursor.execute("""select * from event_transform
where tenant_id = %s and deleted_at IS NULL""", [tenant_id])
return cursor.fetchall()
def delete_transform(self, tenant_id, transform_id):
cnxn, cursor = self._get_cnxn_cursor_tuple()
with cnxn:
cursor.execute("""delete from event_transform_specification
cursor.execute("""delete from event_transform
where id = %s and tenant_id = %s""", (transform_id, tenant_id))

View File

@ -1,4 +1,4 @@
# Copyright 2014 Hewlett-Packard
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -22,6 +22,7 @@ from oslo.config import cfg
import simport
from monasca_events_api.api import transforms_api_v2
from monasca_events_api.common.messaging import exceptions as message_queue_exceptions
from monasca_events_api.common.repositories import exceptions as repository_exceptions
from monasca_events_api.openstack.common import log
from monasca_events_api.openstack.common import uuidutils
@ -29,6 +30,8 @@ from monasca_events_api.v2.common import helpers
from monasca_events_api.v2.common.schemas import (exceptions as schemas_exceptions)
from monasca_events_api.v2.common.schemas import (
transforms_request_body_schema as schemas_transforms)
from monasca_events_api.common.messaging.message_formats.reference.transforms import (
transform as transform_event)
LOG = log.getLogger(__name__)
@ -48,6 +51,8 @@ class Transforms(transforms_api_v2.TransformsV2API):
self._region = cfg.CONF.region
self._default_authorized_roles = (
cfg.CONF.security.default_authorized_roles)
self._message_queue = (
simport.load(cfg.CONF.messaging.driver)("transform-definitions"))
self._transforms_repo = (
simport.load(cfg.CONF.repositories.transforms)())
@ -59,6 +64,8 @@ class Transforms(transforms_api_v2.TransformsV2API):
transform_id = uuidutils.generate_uuid()
tenant_id = helpers.get_tenant_id(req)
self._create_transform(transform_id, tenant_id, transform)
transformed_event = transform_event(transform_id, tenant_id, transform)
self._send_event(transformed_event)
res.body = self._create_transform_response(transform_id, transform)
res.status = falcon.HTTP_200
@ -72,8 +79,25 @@ class Transforms(transforms_api_v2.TransformsV2API):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
self._delete_transform(tenant_id, transform_id)
transformed_event = transform_event(transform_id, tenant_id, [])
self._send_event(transformed_event)
res.status = falcon.HTTP_204
def _send_event(self, event):
"""Send the event using the message queue.
:param metrics: An event object.
:raises: falcon.HTTPServiceUnavailable
"""
try:
str_msg = json.dumps(event, cls=MyEncoder,
ensure_ascii=False).encode('utf8')
self._message_queue.send_message(str_msg)
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError('Service unavailable',
ex.message)
def _validate_transform(self, transform):
"""Validates the transform
@ -100,9 +124,9 @@ class Transforms(transforms_api_v2.TransformsV2API):
enabled = transform['enabled']
else:
enabled = False
self._transforms_repo.create_transforms(transform_id, tenant_id, name,
description, specification,
enabled)
self._transforms_repo.create_transforms(transform_id, tenant_id,
name, description,
specification, enabled)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable',