monasca-events-api/monasca_events_api/v2/stream_definitions.py

508 lines
20 KiB
Python

# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import re
import falcon
from oslo_config import cfg
from oslo_log import log
import simport
from monasca_events_api.api import stream_definitions_api_v2
from monasca_events_api.common.messaging import exceptions \
as message_queue_exceptions
from monasca_events_api.common.repositories import exceptions
from monasca_events_api.v2.common import helpers
from monasca_events_api.v2.common import resource
from monasca_events_api.v2.common.schemas import \
(stream_definition_request_body_schema as schema_streams)
from monasca_events_api.v2.common.schemas import exceptions \
as schemas_exceptions
LOG = log.getLogger(__name__)
class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
def __init__(self):
try:
self._region = cfg.CONF.region
self._default_authorized_roles = (
cfg.CONF.security.default_authorized_roles)
self._delegate_authorized_roles = (
cfg.CONF.security.delegate_authorized_roles)
self._post_authorized_roles = (
cfg.CONF.security.default_authorized_roles +
cfg.CONF.security.agent_authorized_roles)
self._stream_definitions_repo = (
simport.load(cfg.CONF.repositories.streams)())
self.stream_definition_event_message_queue = (
simport.load(cfg.CONF.messaging.driver)('stream-definitions'))
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
def on_post(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
stream_definition = helpers.read_json_msg_body(req)
self._validate_stream_definition(stream_definition)
tenant_id = helpers.get_tenant_id(req)
name = get_query_stream_definition_name(stream_definition)
description = get_query_stream_definition_description(
stream_definition)
select = stream_definition['select']
for s in select:
if 'traits' in s:
s['traits']['_tenant_id'] = tenant_id
else:
s['traits'] = {'_tenant_id': tenant_id}
group_by = stream_definition['group_by']
fire_criteria = stream_definition['fire_criteria']
expiration = stream_definition['expiration']
fire_actions = get_query_stream_definition_fire_actions(
stream_definition)
expire_actions = get_query_stream_definition_expire_actions(
stream_definition)
result = self._stream_definition_create(tenant_id, name, description,
select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions)
helpers.add_links_to_resource(result, req.uri)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_201
def on_get(self, req, res, stream_id=None):
if stream_id:
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
result = self._stream_definition_show(tenant_id, stream_id)
helpers.add_links_to_resource(
result, re.sub('/' + stream_id, '', req.uri))
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
else:
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
name = helpers.get_query_name(req)
offset = helpers.normalize_offset(
helpers.get_query_param(req, 'offset'))
limit = helpers.get_query_param(req, 'limit')
result = self._stream_definition_list(tenant_id, name,
req.uri, offset, limit)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
def on_patch(self, req, res, stream_id):
helpers.validate_authorization(req, self._default_authorized_roles)
stream_definition = helpers.read_json_msg_body(req)
tenant_id = helpers.get_tenant_id(req)
name = get_query_stream_definition_name(stream_definition, return_none=True)
description = get_query_stream_definition_description(
stream_definition, return_none=True)
select = get_query_stream_definition_select(stream_definition, return_none=True)
if select:
for s in select:
if 'traits' in s:
s['traits']['_tenant_id'] = tenant_id
else:
s['traits'] = {'_tenant_id': tenant_id}
group_by = get_query_stream_definition_group_by(stream_definition, return_none=True)
fire_criteria = get_query_stream_definition_fire_criteria(stream_definition, return_none=True)
expiration = get_query_stream_definition_expiration(stream_definition, return_none=True)
fire_actions = get_query_stream_definition_fire_actions(
stream_definition, return_none=True)
expire_actions = get_query_stream_definition_expire_actions(
stream_definition, return_none=True)
result = self._stream_definition_patch(tenant_id,
stream_id,
name,
description,
select,
group_by,
fire_criteria,
expiration,
fire_actions,
expire_actions)
helpers.add_links_to_resource(result, req.uri)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_201
def on_delete(self, req, res, stream_id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
self._stream_definition_delete(tenant_id, stream_id)
res.status = falcon.HTTP_204
@resource.resource_try_catch_block
def _stream_definition_delete(self, tenant_id, stream_id):
stream_definition_row = (
self._stream_definitions_repo.get_stream_definition(tenant_id,
stream_id))
if not self._stream_definitions_repo.delete_stream_definition(
tenant_id, stream_id):
raise falcon.HTTPNotFound
self._send_stream_definition_deleted_event(
stream_id, tenant_id, stream_definition_row['name'])
def _check_invalid_trait(self, stream_definition):
select = stream_definition['select']
for s in select:
if 'traits' in s and '_tenant_id' in s['traits']:
raise falcon.HTTPBadRequest(
'Bad request',
'_tenant_id is a reserved word and invalid trait.')
def _validate_stream_definition(self, stream_definition):
try:
schema_streams.validate(stream_definition)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
self._check_invalid_trait(stream_definition)
@resource.resource_try_catch_block
def _stream_definition_create(self, tenant_id, name,
description, select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions):
stream_definition_id = (
self._stream_definitions_repo.
create_stream_definition(tenant_id,
name,
description,
json.dumps(select),
json.dumps(group_by),
json.dumps(fire_criteria),
expiration,
fire_actions,
expire_actions))
self._send_stream_definition_created_event(tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration)
result = (
{u'name': name,
u'id': stream_definition_id,
u'description': description,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration,
u'fire_actions': fire_actions,
u'expire_actions': expire_actions,
u'actions_enabled': u'true'}
)
return result
@resource.resource_try_catch_block
def _stream_definition_patch(self, tenant_id, stream_definition_id, name,
description, select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions):
stream_definition_row = (
self._stream_definitions_repo.patch_stream_definition(tenant_id,
stream_definition_id,
name,
description,
None if select is None else json.dumps(select),
None if group_by is None else json.dumps(group_by),
None if fire_criteria is None else json.dumps(
fire_criteria),
expiration,
fire_actions,
expire_actions))
self._send_stream_definition_updated_event(tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration)
result = self._build_stream_definition_show_result(stream_definition_row)
return result
def send_event(self, message_queue, event_msg):
try:
message_queue.send_message(
helpers.dumpit_utf8(event_msg))
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError(
'Message queue service unavailable'.encode('utf8'),
ex.message.encode('utf8'))
@resource.resource_try_catch_block
def _stream_definition_show(self, tenant_id, stream_id):
stream_definition_row = (
self._stream_definitions_repo.get_stream_definition(tenant_id,
stream_id))
return self._build_stream_definition_show_result(stream_definition_row)
@resource.resource_try_catch_block
def _stream_definition_list(self, tenant_id, name, req_uri,
offset, limit):
stream_definition_rows = (
self._stream_definitions_repo.get_stream_definitions(
tenant_id, name, offset, limit))
result = []
for stream_definition_row in stream_definition_rows:
sd = self._build_stream_definition_show_result(
stream_definition_row)
helpers.add_links_to_resource(sd, req_uri)
result.append(sd)
result = helpers.paginate(result, req_uri)
return result
def _build_stream_definition_show_result(self, stream_definition_row):
fire_actions_list = get_comma_separated_str_as_list(
stream_definition_row['fire_actions'])
expire_actions_list = get_comma_separated_str_as_list(
stream_definition_row['expire_actions'])
selectlist = json.loads(stream_definition_row['select_by'])
for s in selectlist:
if '_tenant_id' in s['traits']:
del s['traits']['_tenant_id']
if not s['traits']:
del s['traits']
result = (
{u'name': stream_definition_row['name'],
u'id': stream_definition_row['id'],
u'description': stream_definition_row['description'],
u'select': selectlist,
u'group_by': json.loads(stream_definition_row['group_by']),
u'fire_criteria': json.loads(
stream_definition_row['fire_criteria']),
u'expiration': stream_definition_row['expiration'],
u'fire_actions': fire_actions_list,
u'expire_actions': expire_actions_list,
u'actions_enabled': stream_definition_row['actions_enabled'] == 1,
u'created_at': stream_definition_row['created_at'].isoformat(),
u'updated_at': stream_definition_row['updated_at'].isoformat()}
)
return result
def _send_stream_definition_deleted_event(self, stream_definition_id,
tenant_id, stream_name):
stream_definition_deleted_event_msg = {
u"stream-definition-deleted": {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': stream_name}}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_deleted_event_msg)
def _send_stream_definition_created_event(self, tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration):
stream_definition_created_event_msg = {
u'stream-definition-created': {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': name,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration}
}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_created_event_msg)
def _send_stream_definition_updated_event(self, tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration):
stream_definition_created_event_msg = {
u'stream-definition-updated': {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': name,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration}
}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_created_event_msg)
def get_query_stream_definition_name(stream_definition, return_none=False):
if 'name' in stream_definition:
return stream_definition['name']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_description(stream_definition,
return_none=False):
if 'description' in stream_definition:
return stream_definition['description']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_select(stream_definition,
return_none=False):
if 'select' in stream_definition:
return stream_definition['select']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_group_by(stream_definition,
return_none=False):
if 'group_by' in stream_definition:
return stream_definition['group_by']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_fire_criteria(stream_definition,
return_none=False):
if 'fire_criteria' in stream_definition:
return stream_definition['fire_criteria']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_expiration(stream_definition,
return_none=False):
if 'expiration' in stream_definition:
return stream_definition['expiration']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_fire_actions(stream_definition,
return_none=False):
if 'fire_actions' in stream_definition:
return stream_definition['fire_actions']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_expire_actions(stream_definition,
return_none=False):
if 'expire_actions' in stream_definition:
return stream_definition['expire_actions']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_actions_enabled(stream_definition,
required=False,
return_none=False):
try:
if 'actions_enabled' in stream_definition:
return stream_definition['actions_enabled']
else:
if return_none:
return None
elif required:
raise Exception("Missing actions-enabled")
else:
return ''
except Exception as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def get_comma_separated_str_as_list(comma_separated_str):
if not comma_separated_str:
return []
else:
return comma_separated_str.decode('utf8').split(',')