359 lines
13 KiB
Python
359 lines
13 KiB
Python
# Copyright 2015 Hewlett-Packard
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json
|
|
import uuid
|
|
|
|
import MySQLdb
|
|
from oslo_log import log
|
|
from oslo_utils import timeutils
|
|
|
|
from monasca_events_api.common.repositories import constants
|
|
from monasca_events_api.common.repositories import exceptions
|
|
from monasca_events_api.common.repositories.mysql import mysql_repository
|
|
from monasca_events_api.common.repositories import streams_repository as sdr
|
|
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class StreamsRepository(mysql_repository.MySQLRepository,
|
|
sdr.StreamsRepository):
|
|
|
|
base_query = """
|
|
select sd.id, sd.tenant_id, sd.name, sd.description,
|
|
sd.select_by, sd.group_by, sd.fire_criteria, sd.expiration,
|
|
sd.actions_enabled, sd.created_at,
|
|
sd.updated_at, sd.deleted_at,
|
|
saf.fire_actions, sae.expire_actions
|
|
from stream_definition as sd
|
|
left join (select stream_definition_id,
|
|
group_concat(action_id) as fire_actions
|
|
from stream_actions
|
|
where action_type = 'FIRE'
|
|
group by stream_definition_id) as saf
|
|
on saf.stream_definition_id = sd.id
|
|
left join (select stream_definition_id,
|
|
group_concat(action_id) as expire_actions
|
|
from stream_actions
|
|
where action_type = 'EXPIRE'
|
|
group by stream_definition_id) as sae
|
|
on sae.stream_definition_id = sd.id
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
super(StreamsRepository, self).__init__()
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def get_stream_definition(self, tenant_id, stream_definition_id):
|
|
|
|
parms = [tenant_id, stream_definition_id]
|
|
|
|
where_clause = """ where sd.tenant_id = %s
|
|
and sd.id = %s
|
|
and deleted_at is NULL """
|
|
|
|
query = StreamsRepository.base_query + where_clause
|
|
|
|
rows = self._execute_query(query, parms)
|
|
|
|
if rows:
|
|
return rows[0]
|
|
else:
|
|
raise exceptions.DoesNotExistException
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def get_stream_definitions(self, tenant_id, name, offset=None, limit=None):
|
|
|
|
parms = [tenant_id]
|
|
|
|
select_clause = StreamsRepository.base_query
|
|
|
|
where_clause = " where sd.tenant_id = %s and deleted_at is NULL "
|
|
|
|
if name:
|
|
where_clause += " and sd.name = %s "
|
|
parms.append(name.encode('utf8'))
|
|
|
|
if offset is not None:
|
|
order_by_clause = " order by sd.id, sd.created_at "
|
|
where_clause += " and sd.id > %s "
|
|
parms.append(offset.encode('utf8'))
|
|
limit_clause = " limit %s "
|
|
parms.append(constants.PAGE_LIMIT)
|
|
else:
|
|
order_by_clause = " order by sd.created_at "
|
|
limit_clause = ""
|
|
if limit:
|
|
limit_clause = " limit %s"
|
|
parms.append(int(limit))
|
|
|
|
query = select_clause + where_clause + order_by_clause + limit_clause
|
|
|
|
return self._execute_query(query, parms)
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def get_all_stream_definitions(self, offset=None, limit=None):
|
|
|
|
parms = []
|
|
|
|
select_clause = StreamsRepository.base_query
|
|
|
|
where_clause = " where deleted_at is NULL "
|
|
|
|
if offset is not None:
|
|
order_by_clause = " order by sd.id, sd.created_at "
|
|
where_clause += " and sd.id > %s "
|
|
parms.append(offset.encode('utf8'))
|
|
limit_clause = " limit %s "
|
|
if limit is not None:
|
|
parms.append(limit)
|
|
else:
|
|
parms.append(constants.PAGE_LIMIT)
|
|
else:
|
|
order_by_clause = " order by sd.created_at "
|
|
limit_clause = ""
|
|
|
|
query = select_clause + where_clause + order_by_clause + limit_clause
|
|
|
|
return self._execute_query(query, parms)
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def delete_stream_definition(self, tenant_id, stream_definition_id):
|
|
"""Delete the stream definition.
|
|
|
|
:param tenant_id:
|
|
:param stream_definition_id:
|
|
:returns True: -- if stream definition exists and was deleted.
|
|
:returns False: -- if the stream definition does not exists.
|
|
:raises RepositoryException:
|
|
"""
|
|
|
|
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
|
|
|
with cnxn:
|
|
|
|
cursor.execute("""delete from stream_definition
|
|
where tenant_id = %s and id = %s""",
|
|
[tenant_id, stream_definition_id])
|
|
|
|
if cursor.rowcount < 1:
|
|
return False
|
|
|
|
return True
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def create_stream_definition(self,
|
|
tenant_id,
|
|
name,
|
|
description,
|
|
select,
|
|
group_by,
|
|
fire_criteria,
|
|
expiration,
|
|
fire_actions,
|
|
expire_actions):
|
|
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
|
|
|
with cnxn:
|
|
|
|
now = timeutils.utcnow()
|
|
stream_definition_id = str(uuid.uuid1())
|
|
try:
|
|
cursor.execute("""insert into stream_definition(
|
|
id,
|
|
tenant_id,
|
|
name,
|
|
description,
|
|
select_by,
|
|
group_by,
|
|
fire_criteria,
|
|
expiration,
|
|
created_at,
|
|
updated_at)
|
|
values (%s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s)""", (
|
|
stream_definition_id, tenant_id, name.encode('utf8'),
|
|
description.encode('utf8'), select.encode('utf8'),
|
|
group_by.encode('utf8'), fire_criteria.encode('utf8'),
|
|
expiration, now, now))
|
|
except MySQLdb.IntegrityError as e:
|
|
code, msg = e
|
|
if code == 1062:
|
|
raise exceptions.AlreadyExistsException(
|
|
'Stream Definition already '
|
|
'exists for tenant_id: {0} name: {1}'.format(
|
|
tenant_id, name.encode('utf8')))
|
|
else:
|
|
raise e
|
|
|
|
self._insert_into_stream_actions(cursor, stream_definition_id,
|
|
fire_actions, u"FIRE")
|
|
self._insert_into_stream_actions(cursor, stream_definition_id,
|
|
expire_actions,
|
|
u"EXPIRE")
|
|
|
|
return stream_definition_id
|
|
|
|
@mysql_repository.mysql_try_catch_block
|
|
def patch_stream_definition(self, tenant_id, stream_definition_id, name, description, select, group_by,
|
|
fire_criteria, expiration, fire_actions, expire_actions):
|
|
|
|
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
|
|
|
with cnxn:
|
|
# Get the original alarm definition from the DB
|
|
parms = [tenant_id, stream_definition_id]
|
|
|
|
where_clause = """ where sd.tenant_id = %s
|
|
and sd.id = %s"""
|
|
query = StreamsRepository.base_query + where_clause
|
|
|
|
cursor.execute(query, parms)
|
|
|
|
if cursor.rowcount < 1:
|
|
raise exceptions.DoesNotExistException
|
|
|
|
original_definition = cursor.fetchall()[0]
|
|
|
|
# Update that stream definition in the database
|
|
|
|
patch_query = """
|
|
update stream_definition
|
|
set name = %s,
|
|
description = %s,
|
|
select_by = %s,
|
|
group_by = %s,
|
|
fire_criteria = %s,
|
|
expiration = %s,
|
|
updated_at = %s
|
|
where tenant_id = %s and id = %s"""
|
|
|
|
if name is None:
|
|
new_name = original_definition['name']
|
|
else:
|
|
new_name = name.encode('utf8')
|
|
|
|
if description is None:
|
|
new_description = original_definition['description']
|
|
else:
|
|
new_description = description.encode('utf8')
|
|
|
|
if select is None:
|
|
new_select = original_definition['select_by']
|
|
else:
|
|
new_select = json.dumps(select).encode('utf8')
|
|
|
|
if new_select != original_definition['select_by']:
|
|
msg = "select_by must not change".encode('utf8')
|
|
raise exceptions.InvalidUpdateException(msg)
|
|
|
|
if group_by is None:
|
|
new_group_by = original_definition['group_by']
|
|
else:
|
|
new_group_by = json.dumps(group_by).encode('utf8')
|
|
|
|
if new_group_by != original_definition['group_by']:
|
|
msg = "group_by must not change".encode('utf8')
|
|
raise exceptions.InvalidUpdateException(msg)
|
|
|
|
if fire_criteria is None:
|
|
new_fire_criteria = original_definition['fire_criteria']
|
|
else:
|
|
new_fire_criteria = json.dumps(fire_criteria).encode('utf8')
|
|
|
|
if expiration is None:
|
|
new_expiration = original_definition['expiration']
|
|
else:
|
|
new_expiration = expiration
|
|
|
|
now = timeutils.utcnow()
|
|
|
|
update_parms = [
|
|
new_name,
|
|
new_description,
|
|
new_select,
|
|
new_group_by,
|
|
new_fire_criteria,
|
|
new_expiration,
|
|
now,
|
|
tenant_id,
|
|
stream_definition_id]
|
|
|
|
cursor.execute(patch_query, update_parms)
|
|
|
|
# Update the fire and expire actions in the database if defined
|
|
|
|
if fire_actions is not None:
|
|
self._delete_stream_actions(cursor, stream_definition_id,
|
|
u'FIRE')
|
|
if expire_actions is not None:
|
|
self._delete_stream_actions(cursor, stream_definition_id,
|
|
u'EXPIRE')
|
|
|
|
self._insert_into_stream_actions(cursor, stream_definition_id,
|
|
fire_actions,
|
|
u"FIRE")
|
|
self._insert_into_stream_actions(cursor, stream_definition_id,
|
|
expire_actions,
|
|
u"EXPIRE")
|
|
|
|
# Get updated entry from mysql
|
|
cursor.execute(query, parms)
|
|
|
|
return cursor.fetchall()[0]
|
|
|
|
def _delete_stream_actions(self, cursor, stream_definition_id, action_type):
|
|
|
|
query = """
|
|
delete
|
|
from stream_actions
|
|
where stream_definition_id = %s and action_type = %s
|
|
"""
|
|
parms = [stream_definition_id, action_type.encode('utf8')]
|
|
cursor.execute(query, parms)
|
|
|
|
def _insert_into_stream_actions(self, cursor, stream_definition_id,
|
|
actions, action_type):
|
|
|
|
if actions is None:
|
|
return
|
|
|
|
for action in actions:
|
|
cursor.execute(
|
|
"select id,type from notification_method where id = %s",
|
|
(action.encode('utf8'),))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
raise exceptions.InvalidUpdateException(
|
|
"Non-existent notification id {} submitted for {} "
|
|
"notification action".format(action.encode('utf8'),
|
|
action_type.encode('utf8')))
|
|
else:
|
|
if row['type'] == 'PAGERDUTY':
|
|
raise exceptions.InvalidUpdateException(
|
|
"PAGERDUTY action not supported for "
|
|
"notification id {} submitted for {} "
|
|
"notification action".format(
|
|
action.encode('utf8'),
|
|
action_type.encode('utf8')))
|
|
cursor.execute("""insert into stream_actions(
|
|
stream_definition_id,
|
|
action_type,
|
|
action_id)
|
|
values(%s,%s,%s)""", (
|
|
stream_definition_id, action_type.encode('utf8'),
|
|
action.encode('utf8')))
|