Merge branch 'master' into oslo

This commit is contained in:
Joe Keen 2015-06-22 15:47:52 -06:00
commit fa1a08010f
11 changed files with 1367 additions and 46 deletions

View File

@ -17,11 +17,11 @@ import json
import time
import requests
import yaml
from monascaclient import ksclient
events_url = "http://127.0.0.1:8082"
events_url = "http://192.168.10.4:8082"
def token():
@ -146,10 +146,15 @@ def test_stream_definition_delete():
def test_transforms():
print("Test POST /transforms")
# Open example yaml file and post to DB
fh = open('transform_definitions.yaml', 'r')
specification_data = yaml.load(fh)
body = {
"name": 'func test',
"description": 'a really short description of this thing',
"specification": 'some sorta specification'
"description": 'an example definition',
"specification": str(specification_data)
}
response = requests.post(
url=events_url + "/v2.0/transforms",

View File

@ -0,0 +1,63 @@
---
- event_type: compute.instance.*
traits: &instance_traits
tenant_id:
fields: payload.tenant_id
user_id:
fields: payload.user_id
instance_id:
fields: payload.instance_id
host:
fields: publisher_id
plugin:
name: split
parameters:
segment: 1
max_split: 1
service:
fields: publisher_id
plugin: split
memory_mb:
type: int
fields: payload.memory_mb
disk_gb:
type: int
fields: payload.disk_gb
root_gb:
type: int
fields: payload.root_gb
ephemeral_gb:
type: int
fields: payload.ephemeral_gb
vcpus:
type: int
fields: payload.vcpus
instance_type_id:
type: int
fields: payload.instance_type_id
instance_type:
fields: payload.instance_type
state:
fields: payload.state
os_architecture:
fields: payload.image_meta.'org.openstack__1__architecture'
os_version:
fields: payload.image_meta.'org.openstack__1__os_version'
os_distro:
fields: payload.image_meta.'org.openstack__1__os_distro'
launched_at:
type: datetime
fields: payload.launched_at
deleted_at:
type: datetime
fields: payload.deleted_at
- event_type: compute.instance.exists
traits:
<<: *instance_traits
audit_period_beginning:
type: datetime
fields: payload.audit_period_beginning
audit_period_ending:
type: datetime
fields: payload.audit_period_ending

View File

@ -109,3 +109,17 @@ class KafkaPublisher(publisher.Publisher):
except Exception:
LOG.exception('Unknown error.')
raise exceptions.MessageQueueException()
def send_message_batch(self, messages):
try:
if not self._producer:
self._init_producer()
self._producer.send_messages(self.topic, *messages)
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError):
self._client = None
LOG.exception('Error occurred while posting data to Kafka.')
raise exceptions.MessageQueueException()
except Exception:
LOG.exception('Unknown error.')
raise exceptions.MessageQueueException()

View File

@ -12,17 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_utils import timeutils
def transform(event, tenant_id, region):
transformed_event = dict(
event=event,
meta=dict(
tenantId=tenant_id,
region=region
),
creation_time=timeutils.utcnow_ts()
)
def transform(events, tenant_id, region):
event_template = {'event': {},
'_tenant_id': tenant_id,
'meta': {'tenantId': tenant_id, 'region': region},
'creation_time': timeutils.utcnow_ts()}
return transformed_event
if isinstance(events, list):
transformed_events = []
for event in events:
event_template['event'] = event
transformed_events.append(json.dumps(event_template))
return transformed_events
else:
transformed_event = event_template['event']
transformed_event['event'] = events
return [json.dumps(transformed_event)]

View File

@ -1,21 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
class Test_first(unittest.TestCase):
def test_first(self):
assert 1 == 1

View File

@ -0,0 +1,370 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import json
from monasca_events_api.common.messaging import exceptions
from monasca_events_api.common.repositories.mysql.events_repository import EventsRepository
from monasca_events_api.v2.events import Events
import mock
from monasca_events_api.common.repositories.exceptions import RepositoryException
from oslo_utils import timeutils
import unittest
class EventsSubClass(Events):
def __init__(self):
self._default_authorized_roles = ['user', 'domainuser',
'domainadmin', 'monasca-user']
self._post_events_authorized_roles = [
'user',
'domainuser',
'domainadmin',
'monasca-user',
'monasca-agent']
self._events_repo = None
self._message_queue = None
self._region = 'useast'
def _event_transform(self, event, tenant_id, _region):
return dict(
event=1,
meta=dict(
tenantId='0ab1ac0a-2867-402d',
region='useast'),
creation_time=timeutils.utcnow_ts())
class Test_first(unittest.TestCase):
@mock.patch('monascaclient.ksclient.KSClient')
def _generate_req(self, token):
"""Generate a mock HTTP request"""
req = mock.MagicMock()
req.get_param.return_value = None
req.headers = {
'X-Auth-User': 'mini-mon',
'X-Auth-Token': token,
'X-Auth-Key': 'password',
'X-TENANT-ID': '0ab1ac0a-2867-402d',
'X-ROLES': 'user, domainuser, domainadmin, monasca-user, monasca-agent',
'Accept': 'application/json',
'User-Agent': 'python-monascaclient',
'Content-Type': 'application/json'}
req.body = {}
req.content_type = 'application/json'
return req
@mock.patch('monasca_events_api.v2.events.Events._list_event')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_pass_singleevent(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo,
listev):
"""GET Method success Single Event"""
helpers_validate.validate_authorization.return_value = True
returnEvent = [{"region": "useast", "tenantId": "0ab1ac0a-2867-402d",
"creation_time": "1434331190", "event": "1"}]
listev.return_value = returnEvent
mysqlRepo.connect.return_value = True
helper_tenant_id.get_tenant_id.return_value = '0ab1ac0a-2867-402d'
event_id = "1"
eventsObj = EventsSubClass()
eventsObj._events_repo = EventsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
eventsObj.on_get(self._generate_req(), res, event_id)
self.assertEqual(returnEvent, json.loads(res.body))
@mock.patch('monasca_events_api.v2.events.Events._list_events')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_pass_events(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo,
listev):
"""GET Method success Multiple Events"""
helpers_validate.validate_authorization.return_value = True
returnEvent = [{"region": "useast", "tenantId": "0ab1ac0a-2867-402d",
"creation_time": "1434331190", "event": "1"},
{"region": "useast", "tenantId": "0ab1ac0a-2866-403d",
"creation_time": "1234567890", "event": "2"}]
listev.return_value = returnEvent
mysqlRepo.connect.return_value = True
helper_tenant_id.get_tenant_id.return_value = '0ab1ac0a-2867-402d'
eventsObj = EventsSubClass()
eventsObj._events_repo = EventsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
eventsObj.on_get(self._generate_req(), res)
self.assertEqual(returnEvent, json.loads(res.body))
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_with_eventid_dbdown(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo):
"""GET method when DB Down with event_ID"""
mysqlRepo.connect.side_effect = RepositoryException("Database\
Connection Error")
helpers_validate.validate_authorization.return_value = True
helper_tenant_id.get_tenant_id.return_value = '0ab1ac0a-2867-402d'
event_id = "0ab1ac0a-2867-402d-83c7-d7087262470c"
eventsObj = EventsSubClass()
eventsObj._events_repo = EventsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
eventsObj.on_get(self._generate_req(), res, event_id)
self.assertFalse(
1,
msg="Database Down, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_mysql_down(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo):
"""GET METHOD without event ID DB DOWN"""
mysqlRepo.connect.side_effect = RepositoryException("Database\
Connection Error")
helpers_validate.return_value = True
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
eventsObj = EventsSubClass()
eventsObj._events_repo = EventsRepository()
res = mock.MagicMock(spec='status')
res.body = {}
try:
eventsObj.on_get(self._generate_req(), res, None)
self.assertFalse(
1,
msg="Database Down, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.events.Events._validate_event')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_unauthorized(
self,
tenantid,
json,
http,
event,
validate,
kafka):
"""POST method unauthorized """
json.return_value = None
validate.side_effect = falcon.HTTPUnauthorized('Forbidden',
'Tenant ID is missing a'
'required role to '
'access this service')
http.return_value = self._generate_req()
tenantid.return_value = '0ab1ac0a-2867-402d'
event.return_value = True
eventsObj = EventsSubClass()
eventsObj._message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
eventsObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="Unauthorized Access, should fail but passed")
except Exception as e:
self.assertRaises(falcon.HTTPUnauthorized)
self.assertEqual(e.status, '401 Unauthorized')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.events.Events._validate_event')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_bad_request(
self,
tenantid,
json,
readHttpRes,
event,
validate,
kafka):
"""POST method with bad request body"""
json.return_value = None
validate.return_value = True
readHttpRes.side_effect = falcon.HTTPBadRequest('Bad request',
'Request body is'
'not valid JSON')
tenantid.return_value = '0ab1ac0a-2867-402d'
event.return_value = True
eventsObj = EventsSubClass()
eventsObj._message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
eventsObj.on_post(self._generate_req(), res)
assertFalse(
1,
msg="Get Method should fail but succeeded, bad request sent")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.events.Events._validate_event')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_kafka_down(
self,
tenantid,
json,
readHttpRes,
event,
validate,
kafka):
"""POST method with Kafka Down"""
kafka.send_message.side_effect = exceptions.MessageQueueException()
json.return_value = None
validate.return_value = True
readHttpRes.return_value = self._generate_req()
tenantid.return_value = '0ab1ac0a-2867-402d'
event.return_value = True
eventsObj = EventsSubClass()
eventsObj._message_queue = kafka
res = mock.MagicMock()
res.body = {}
try:
eventsObj.on_post(self._generate_req(), res)
assertFalse(
1,
msg="Kakfa Server Down, Post should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_pass_validate_event(self, tenantid, json, readHttpRes, validate, kafka):
"""POST method passed due to validate event """
jsonObj = {
'event_type': 'compute.instance.create.start',
'timestamp': '2015-06-17T21:57:03.493436',
'message_id': '1f4609b5-f01d-11e4-81ac-20c9d0b84f8b'
}
json.return_value = True
validate.return_value = True
readHttpRes.return_value = jsonObj
tenantid.return_value = '0ab1ac0a-2867-402d'
eventsObj = EventsSubClass()
eventsObj._message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
eventsObj.on_post(self._generate_req(), res)
self.assertEqual(falcon.HTTP_204, res.status)
self.assertEqual({}, res.body)
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_fail_on_validate_event(self, tenantid, json, readHttpRes, validate, kafka):
"""POST method failed due to validate event """
"""_tenant_id is a reserved word that cannot be used"""
jsonObj = {
'event_type': 'compute.instance.create.start',
'timestamp': '2015-06-17T21:57:03.493436',
'message_id': '1f4609b5-f01d-11e4-81ac-20c9d0b84f8b',
'_tenant_id': '0ab1ac0a-2867-402d'
}
json.return_value = True
validate.return_value = True
readHttpRes.return_value = jsonObj
tenantid.return_value = '0ab1ac0a-2867-402d'
eventsObj = EventsSubClass()
eventsObj._message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
eventsObj.on_post(self._generate_req(), res)
assertFalse(
1,
msg="Post Method should fail but succeeded, bad request sent")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')
self.assertEqual({}, res.body)

View File

@ -0,0 +1,455 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import json
from monasca_events_api.common.repositories.mysql.streams_repository import StreamsRepository
from monasca_events_api.v2.stream_definitions import StreamDefinitions
import mock
from monasca_events_api.common.repositories.exceptions import AlreadyExistsException
from monasca_events_api.common.repositories.exceptions import RepositoryException
import unittest
class StreamDefinitionsSubClass(StreamDefinitions):
def __init__(self):
self._default_authorized_roles = ['user', 'domainuser',
'domainadmin', 'monasca-user']
self._post_events_authorized_roles = [
'user',
'domainuser',
'domainadmin',
'monasca-user',
'monasca-agent']
self._stream_definitions_repo = None
self.stream_definition_event_message_queue = None
self._region = 'useast'
class Test_StreamDefinitions(unittest.TestCase):
@mock.patch('monascaclient.ksclient.KSClient')
def _generate_req(self, token):
"""Generate a mock HTTP request"""
req = mock.MagicMock()
req.get_param.return_value = None
req.headers = {
'X-Auth-User': 'mini-mon',
'X-Auth-Token': token,
'X-Auth-Key': 'password',
'X-TENANT-ID': '0ab1ac0a-2867-402d',
'X-ROLES': 'user, domainuser, domainadmin, monasca-user, monasca-agent',
'Accept': 'application/json',
'User-Agent': 'python-monascaclient',
'Content-Type': 'application/json'}
req.body = {}
req.uri = "/v2.0/stream-definitions/{stream_id}"
req.content_type = 'application/json'
return req
@mock.patch('monasca_events_api.v2.common.helpers.add_links_to_resource')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.normalize_offset')
@mock.patch('monasca_events_api.v2.common.helpers.get_query_name')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_stream_fail_db_down(
self,
tenant_id,
validate,
mysqlRepo,
qname,
normalize,
repo,
getlinks):
"""GET Method FAIL Single Stream"""
repo.connect.side_effect = RepositoryException(
"Database Connection Error")
validate.return_value = True
normalize.return_value = 5
qname.return_value = "Test"
tenant_id.return_value = '0ab1ac0a-2867-402d'
stream_id = "1"
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_get(self._generate_req(), res, stream_id)
self.assertFalse(
1,
msg="Database Down, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._stream_definition_show')
@mock.patch('monasca_events_api.v2.common.helpers.add_links_to_resource')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_get_streamid_pass(
self,
validate,
tenant_id,
getlinks,
definitionshow,
mysqlRepo):
"""GET Method SUCCESS Single Stream"""
validate.return_value = True
tenant_id.return_value = '0ab1ac0a-2867-402d'
returnStream = [{"region": "useast", "tenantId": "0ab1ac0a-2867-402d",
"creation_time": "1434331190", "stream": "1"}]
definitionshow.return_value = returnStream
getlinks.return_value = "/v2.0/stream-definitions/{stream_id}"
mysqlRepo.connect.return_value = True
stream_id = "1"
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
streamsObj.on_get(self._generate_req(), res, stream_id)
self.assertEqual(returnStream, json.loads(res.body))
self.assertEqual(res.status, '200 OK')
@mock.patch('monasca_events_api.v2.common.helpers.normalize_offset')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_streams_fail_db_down(
self,
tenant_id,
validate,
mysqlRepo,
normalize):
"""GET Method FAILS Multiple Streams"""
mysqlRepo.connect.side_effect = RepositoryException(
"Database Connection Error")
validate.return_value = True
tenant_id.return_value = '0ab1ac0a-2867-402d'
normalize.return_value = 5
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_get(self._generate_req(), res, None)
self.assertFalse(
1,
msg="Database Down, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch('monasca_events_api.v2.common.helpers.normalize_offset')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._stream_definition_list')
@mock.patch('monasca_events_api.v2.common.helpers.add_links_to_resource')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_get_streams_pass(
self,
validate,
tenant_id,
getlinks,
definitionlist,
mysqlRepo,
normalize):
"""GET Method SUCCESS Streams List"""
validate.return_value = True
tenant_id.return_value = '0ab1ac0a-2867-402d'
returnStreams = [{"region": "useast", "tenantId": "0ab1ac0a-2867-402d",
"creation_time": "1434331190", "stream": "1"},
{"region": "useast", "tenantId": "0ab1ac0a-2866-403d",
"creation_time": "1234567890", "stream": "2"}]
definitionlist.return_value = returnStreams
normalize.return_value = 5
getlinks.return_value = "/v2.0/stream-definitions/"
mysqlRepo.connect.return_value = True
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
streamsObj.on_get(self._generate_req(), res, None)
self.assertEqual(returnStreams, json.loads(res.body))
self.assertEqual(res.status, '200 OK')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_expire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_fire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_description')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_name')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._validate_stream_definition')
@mock.patch('monasca_events_api.v2.common.helpers.read_json_msg_body')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_post_integrity_error(
self,
validate,
tenantid,
json,
readjson,
streamvalid,
getname,
desc,
fireactions,
expire,
repo):
"""POST method failed due to integrity error"""
validate.return_value = True
repo.connect.side_effect = AlreadyExistsException()
fireactions.return_value = "fire_actions"
getname.return_value = "Test"
expire.return_value = "expire_actions"
desc.return_value = "Stream_Description"
readjson.return_value = {
u'fire_criteria': [
{
u'event_type': u'compute.instance.create.start'},
{
u'event_type': u'compute.instance.create.end'}],
u'description': u'provisioning duration',
u'group_by': [u'instance_id'],
u'expiration': 90000,
u'select': [
{
u'event_type': u'compute.instance.create.*'}],
u'name': u'buzz'}
tenantid.return_value = '0ab1ac0a-2867-402d'
streamvalid.return_value = True
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="DB Integrity Error, should fail but passed")
except Exception as e:
self.assertRaises(falcon.HTTPConflict)
self.assertEqual(e.status, '409 Conflict')
@mock.patch('monasca_events_api.v2.common.helpers.add_links_to_resource')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._stream_definition_create')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_expire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_fire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_description')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_name')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._validate_stream_definition')
@mock.patch('monasca_events_api.v2.common.helpers.read_json_msg_body')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_post_pass__validate_stream_definition(
self,
validate,
tenantid,
readjson,
streamvalid,
getname,
desc,
fireactions,
expire,
streamsrepo,
kafka,
addlink):
"""POST method successful"""
validate.return_value = True
fireactions.return_value = "fire_actions"
getname.return_value = "Test"
addlink.return_value = "/v2.0/stream-definitions/{stream_id}"
expire.return_value = "expire_actions"
desc.return_value = "Stream_Description"
responseObj = {u'fire_criteria': [{u'event_type': u'compute.instance.create.start'},
{u'event_type': u'compute.instance.create.end'}],
u'description': u'provisioning duration',
u'group_by': [u'instance_id'],
u'expiration': 90000,
u'select': [{u'event_type': u'compute.instance.create.*'}],
u'name': u'buzz'}
readjson.return_value = responseObj
streamsrepo.return_value = responseObj
tenantid.return_value = '0ab1ac0a-2867-402d'
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
streamsObj.stream_definition_event_message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
streamsObj.on_post(self._generate_req(), res)
self.assertEqual(falcon.HTTP_201, res.status)
self.assertEqual(responseObj, json.loads(res.body))
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
@mock.patch('monasca_events_api.v2.common.helpers.add_links_to_resource')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_expire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_fire_actions')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_description')
@mock.patch('monasca_events_api.v2.common.helpers.read_json_msg_body')
@mock.patch(
'monasca_events_api.v2.stream_definitions.get_query_stream_definition_name')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_post_fail__validate_stream_definition(
self,
tenantid,
getname,
readjson,
desc,
fireactions,
expire,
kafka,
addlink,
httpRes,
authorization):
"""POST method failed due to invalid body"""
fireactions.return_value = "fire_actions"
getname.return_value = "Test"
addlink.return_value = "/v2.0/stream-definitions/{stream_id}"
expire.return_value = "expire_actions"
desc.return_value = "Stream_Description"
"""name removed from body"""
responseObj = {u'fire_criteria': [{u'event_type': u'compute.instance.create.start'},
{u'event_type': u'compute.instance.create.end'}],
u'description': u'provisioning duration',
u'group_by': [u'instance_id'],
u'expiration': 90000,
u'name': u'buzz'}
tenantid.return_value = '0ab1ac0a-2867-402d'
readjson.return_value = responseObj
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
streamsObj.stream_definition_event_message_queue = kafka
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_post(self._generate_req(), res)
self.assertFalse(1, msg="Bad Request Sent, should fail but passed")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')
@mock.patch('monasca_events_api.v2.common.helpers.read_json_msg_body')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_post_badrequest(self, validate, readjson):
"""POST method Fail Due to bad request"""
validate.return_value = True
readjson.side_effect = falcon.HTTPBadRequest(
'Bad request',
'Request body is not valid JSON')
streamsObj = StreamDefinitionsSubClass()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_post(self._generate_req(), res)
self.assertFalse(1, msg="Bad Request Sent, should fail but passed")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')
@mock.patch(
'monasca_events_api.common.repositories.mysql.streams_repository.StreamsRepository.delete_stream_definition')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_delete_fail(self, validate, tenantid, deleteStream):
"""DELETEE method failed due to database down """
validate.return_value = True
tenantid.return_value = '0ab1ac0a-2867-402d'
deleteStream.side_effect = RepositoryException(
"Database Connection Error")
stream_id = "1"
streamsObj = StreamDefinitionsSubClass()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
streamsObj.on_delete(self._generate_req(), res, stream_id)
self.assertFalse(1, msg="Database Down, should fail but passed")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.v2.stream_definitions.StreamDefinitions._stream_definition_delete')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_delete_pass(self, validate, tenantid, mysql, deleteStream):
"""DELETE method successful """
validate.return_value = True
tenantid.return_value = '0ab1ac0a-2867-402d'
deleteStream.return_value = True
stream_id = "1"
streamsObj = StreamDefinitionsSubClass()
streamsObj._stream_definitions_repo = StreamsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
streamsObj.on_delete(self._generate_req(), res, stream_id)
self.assertEqual("204 No Content", res.status)

View File

@ -0,0 +1,424 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import json
from monasca_events_api.common.repositories.mysql.transforms_repository import TransformsRepository
from monasca_events_api.v2.transforms import Transforms
import mock
from monasca_events_api.common.repositories import exceptions as repository_exceptions
import unittest
class TransformsSubClass(Transforms):
def __init__(self):
self._default_authorized_roles = ['user', 'domainuser',
'domainadmin', 'monasca-user']
self._transforms_repo = None
self._region = 'useast'
self._message_queue = None
class Test_Transforms(unittest.TestCase):
@mock.patch('monascaclient.ksclient.KSClient')
def _generate_req(self, token):
"""Generate a mock HTTP request"""
req = mock.MagicMock()
req.get_param.return_value = None
req.headers = {
'X-Auth-User': 'mini-mon',
'X-Auth-Token': token,
'X-Auth-Key': 'password',
'X-TENANT-ID': '0ab1ac0a-2867-402d',
'X-ROLES': 'user, domainuser, domainadmin, monasca-user, monasca-agent',
'Accept': 'application/json',
'User-Agent': 'python-monascaclient',
'Content-Type': 'application/json'}
req.body = {}
req.content_type = 'application/json'
return req
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_fail_db_down(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo):
"""GET Method fail due to db down"""
mysqlRepo.connect.side_effect = repository_exceptions.RepositoryException(
"Database Connection Error")
helpers_validate.return_value = True
mysqlRepo.connect.return_value = True
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_get(self._generate_req(), res)
self.assertFalse(
1,
msg="Database Down, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_get_fail_validate_authorization(self, _validate_authorization):
"""GET Method fail due to validate authorization"""
_validate_authorization.side_effect = falcon.HTTPUnauthorized(
'Forbidden',
'Tenant does not have any roles')
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_get(self._generate_req(), res)
self.assertFalse(
1,
msg="Validate Authorization failed, GET should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPUnauthorized)
self.assertEqual(e.status, '401 Unauthorized')
@mock.patch('monasca_events_api.v2.transforms.Transforms._list_transforms')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_get_pass(
self,
helper_tenant_id,
helpers_validate,
list_transforms):
"""GET Method success Single Event"""
helpers_validate.return_value = True
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
returnTransform = [{"id": "1",
"name": "Trans1",
"description": "Desc1",
"specification": "AutoSpec1",
"enabled": "True"},
{"id": "2",
"name": "Trans2",
"description": "Desc2",
"specification": "AutoSpec2",
"enabled": "False"}]
list_transforms.return_value = returnTransform
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
transObj.on_get(self._generate_req(), res)
self.assertEqual(res.status, '200 OK')
self.assertEqual(returnTransform, json.loads(json.dumps(res.body)))
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_delete_fail(
self,
helper_tenant_id,
helpers_validate,
mysqlRepo):
"""DELETE Method fail due to db down"""
mysqlRepo.connect.side_effect = repository_exceptions.RepositoryException(
"Database Connection Error")
helpers_validate.return_value = True
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
transform_id = "0ab1ac0a"
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_delete(self._generate_req(), res, transform_id)
self.assertFalse(
1,
msg="Database Down, delete should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_delete_fail_validate_authorization(
self,
_validate_authorization):
"""Post Method fail due to validate authorization"""
_validate_authorization.side_effect = falcon.HTTPUnauthorized(
'Forbidden',
'Tenant does not have any roles')
transform_id = "0ab1ac0a"
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_delete(self._generate_req(), res, transform_id)
self.assertFalse(
1,
msg="Validate Authorization failed, delete should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPUnauthorized)
self.assertEqual(e.status, '401 Unauthorized')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._delete_transform')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
def test_on_delete_pass(
self,
helper_tenant_id,
helpers_validate,
deleteTransform,
kafka):
"""DELETE Method pass"""
helpers_validate.return_value = True
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
transform_id = "0ab1ac0a"
deleteTransform.return_value = True
transObj = TransformsSubClass()
transObj._message_queue = kafka
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
transObj.on_delete(self._generate_req(), res, transform_id)
self.assertEqual(res.status, '204 No Content')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.openstack.common.uuidutils.generate_uuid')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._validate_transform')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._delete_transform')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
def test_on_post_fail_db_down(
self,
readhttp,
helper_tenant_id,
helpers_validate,
deleteTransform,
validjson,
validateTransform,
generateUUID,
mysqlRepo):
"""Post Method fail due to db down"""
mysqlRepo.connect.side_effect = repository_exceptions.RepositoryException(
"Database Connection Error")
helpers_validate.return_value = True
validjson.return_value = True
validateTransform.return_value = True
readhttp.return_value = self._generate_req()
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
generateUUID.return_value = "067e6162-3b6f-4ae2-a171-2470b63dff00"
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="Database Down, POST should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPInternalServerError)
self.assertEqual(e.status, '500 Internal Server Error')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._validate_transform')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
def test_on_post_fail_validate_transform(
self,
readhttp,
helpers_validate,
validjson,
_validate_transform):
"""Post Method fail due to validate transform"""
helpers_validate.return_value = True
validjson.return_value = True
_validate_transform.side_effect = falcon.HTTPBadRequest(
'Bad request',
'Error')
readhttp.return_value = self._generate_req()
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="Validate Trasnform failed, POST should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
def test_on_post_fail_validate_authorization(
self,
_validate_authorization):
"""Post Method fail due to validate authorization"""
_validate_authorization.side_effect = falcon.HTTPUnauthorized(
'Forbidden',
'Tenant does not have any roles')
transObj = TransformsSubClass()
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="Validate Authorization failed, POST should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPUnauthorized)
self.assertEqual(e.status, '401 Unauthorized')
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._create_transform_response')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.openstack.common.uuidutils.generate_uuid')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._validate_transform')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._delete_transform')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
def test_on_post_pass_valid_request(
self,
readhttp,
helper_tenant_id,
helpers_validate,
deleteTransform,
validjson,
validateTransform,
generateUUID,
mysqlRepo,
createRes,
kafka):
"""Post Method pass due to valid request"""
helpers_validate.return_value = True
validjson.return_value = True
returnTransform = {'name': 'Trans1',
'description': 'Desc1',
'specification': 'AutoSpec1'
}
createRes.return_value = returnTransform
readhttp.return_value = returnTransform
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
generateUUID.return_value = "067e6162-3b6f-4ae2-a171-2470b63dff00"
transObj = TransformsSubClass()
transObj._message_queue = kafka
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
transObj.on_post(self._generate_req(), res)
self.assertEqual(falcon.HTTP_200, "200 OK")
self.assertEqual(returnTransform, json.loads(json.dumps(res.body)))
@mock.patch(
'monasca_events_api.common.messaging.kafka_publisher.KafkaPublisher')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._create_transform_response')
@mock.patch(
'monasca_events_api.common.repositories.mysql.mysql_repository.mdb')
@mock.patch('monasca_events_api.openstack.common.uuidutils.generate_uuid')
@mock.patch(
'monasca_events_api.v2.common.helpers.validate_json_content_type')
@mock.patch(
'monasca_events_api.v2.transforms.Transforms._delete_transform')
@mock.patch('monasca_events_api.v2.common.helpers.validate_authorization')
@mock.patch('monasca_events_api.v2.common.helpers.get_tenant_id')
@mock.patch('monasca_events_api.v2.common.helpers.read_http_resource')
def test_on_post_pass_fail_invalid_request(
self,
readhttp,
helper_tenant_id,
helpers_validate,
deleteTransform,
validjson,
generateUUID,
mysqlRepo,
createRes,
kafka):
"""Post Method fails due to invalid request"""
helpers_validate.return_value = True
validjson.return_value = True
returnTransform = {
'description': 'Desc1',
'specification': 'AutoSpec1'
}
createRes.return_value = returnTransform
readhttp.return_value = returnTransform
helper_tenant_id.return_value = '0ab1ac0a-2867-402d'
generateUUID.return_value = "067e6162-3b6f-4ae2-a171-2470b63dff00"
transObj = TransformsSubClass()
transObj._message_queue = kafka
transObj._transforms_repo = TransformsRepository()
res = mock.MagicMock()
res.body = {}
res.status = 0
try:
transObj.on_post(self._generate_req(), res)
self.assertFalse(
1,
msg="Validate transform failed, POST should fail but succeeded")
except Exception as e:
self.assertRaises(falcon.HTTPBadRequest)
self.assertEqual(e.status, '400 Bad Request')

View File

@ -34,8 +34,11 @@ event_schema = {
voluptuous.Length(max=50)),
voluptuous.Required('timestamp'): DateValidator()}
request_body_schema = voluptuous.Schema(event_schema,
required=True, extra=True)
event_schema = voluptuous.Schema(event_schema,
required=True, extra=True)
request_body_schema = voluptuous.Schema(
voluptuous.Any(event_schema, [event_schema]))
def validate(body):

View File

@ -13,7 +13,6 @@
# under the License.
import collections
import json
import falcon
from oslo_config import cfg
@ -31,7 +30,6 @@ from monasca_events_api.v2.common.schemas import (
events_request_body_schema as schemas_event)
from monasca_events_api.v2.common.schemas import (
exceptions as schemas_exceptions)
from monasca_events_api.v2.common import utils
LOG = log.getLogger(__name__)
@ -80,9 +78,9 @@ class Events(events_api_v2.EventsV2API):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._post_events_authorized_roles)
event = helpers.read_http_resource(req)
self._validate_event(event)
tenant_id = helpers.get_tenant_id(req)
event['_tenant_id'] = tenant_id
transformed_event = self._event_transform(event, tenant_id,
self._region)
self._send_event(transformed_event)
@ -103,16 +101,14 @@ class Events(events_api_v2.EventsV2API):
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _send_event(self, event):
def _send_event(self, events):
"""Send the event using the message queue.
:param metrics: An event object.
:param metrics: A series of event objects.
:raises: falcon.HTTPServiceUnavailable
"""
try:
str_msg = json.dumps(event, default=utils.date_handler,
ensure_ascii=False).encode('utf8')
self._message_queue.send_message(str_msg)
self._message_queue.send_message_batch(events)
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError('Service unavailable',

View File

@ -12,9 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import ast
import datetime
import json
from time import mktime
import yaml
import falcon
from oslo_config import cfg
@ -119,7 +121,7 @@ class Transforms(transforms_api_v2.TransformsV2API):
try:
name = transform['name']
description = transform['description']
specification = transform['specification']
specification = str(yaml.load(transform['specification']))
if 'enabled' in transform:
enabled = transform['enabled']
else:
@ -147,6 +149,9 @@ class Transforms(transforms_api_v2.TransformsV2API):
def _list_transforms(self, tenant_id):
try:
transforms = self._transforms_repo.list_transforms(tenant_id)
for transform in transforms:
transform['specification'] = yaml.safe_dump(
ast.literal_eval(transform['specification']))
return json.dumps(transforms, cls=MyEncoder)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)