Implement Influxdb 0.9.0
Implement paging. Change-Id: I2a4bcf3f3af77423523bd42dc074160bffe6ebc2
This commit is contained in:
@@ -174,4 +174,4 @@ class KafkaConnection(object):
|
||||
code = 500
|
||||
LOG.exception('Unknown error.')
|
||||
|
||||
return code
|
||||
return code
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
class MessageQueueException(Exception):
|
||||
pass
|
||||
pass
|
||||
|
@@ -21,4 +21,4 @@ class FakePublisher(publisher.Publisher):
|
||||
pass
|
||||
|
||||
def send_message(self, message):
|
||||
pass
|
||||
pass
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
def transform(events, tenant_id, region):
|
||||
raise NotImplemented()
|
||||
raise NotImplemented()
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
def transform(metrics, tenant_id, region):
|
||||
raise NotImplemented()
|
||||
raise NotImplemented()
|
||||
|
@@ -26,4 +26,4 @@ def create_events_transform():
|
||||
elif message_format == 'cadf':
|
||||
return cadf_events.transform
|
||||
else:
|
||||
return ident_events.transform
|
||||
return ident_events.transform
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
class TransformationException(Exception):
|
||||
pass
|
||||
pass
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
def transform(events, tenant_id, region):
|
||||
return events
|
||||
return events
|
||||
|
@@ -14,4 +14,4 @@
|
||||
|
||||
|
||||
def transform(metrics, tenant_id, region):
|
||||
return metrics
|
||||
return metrics
|
||||
|
@@ -26,4 +26,4 @@ def create_metrics_transform():
|
||||
elif metrics_message_format == 'cadf':
|
||||
return cadf_metrics.transform
|
||||
else:
|
||||
return id_metrics.transform
|
||||
return id_metrics.transform
|
||||
|
@@ -25,4 +25,4 @@ def transform(event, tenant_id, region):
|
||||
creation_time=timeutils.utcnow_ts()
|
||||
)
|
||||
|
||||
return transformed_event
|
||||
return transformed_event
|
||||
|
@@ -25,4 +25,4 @@ class Publisher(object):
|
||||
|
||||
:param message: Message to send.
|
||||
"""
|
||||
return
|
||||
return
|
||||
|
@@ -21,4 +21,4 @@ class RabbitmqPublisher(publisher.Publisher):
|
||||
pass
|
||||
|
||||
def send_message(self, message):
|
||||
raise NotImplemented()
|
||||
raise NotImplemented()
|
||||
|
@@ -49,7 +49,8 @@ class AlarmDefinitionsRepository(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_alarm_definitions(self, tenant_id, name, dimensions, offset):
|
||||
def get_alarm_definitions(self, tenant_id, name, dimensions, offset,
|
||||
limit):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -63,4 +64,4 @@ class AlarmDefinitionsRepository(object):
|
||||
ok_actions,
|
||||
undetermined_actions,
|
||||
match_by, severity, patch):
|
||||
pass
|
||||
pass
|
||||
|
@@ -40,5 +40,5 @@ class AlarmsRepository(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_alarms(self, tenant_id, query_parms, offset):
|
||||
def get_alarms(self, tenant_id, query_parms, offset, limit):
|
||||
pass
|
||||
|
@@ -1 +1 @@
|
||||
PAGE_LIMIT = 50
|
||||
PAGE_LIMIT = 10000
|
||||
|
@@ -24,4 +24,4 @@ class EventsRepository(events_repository.EventsRepository):
|
||||
return
|
||||
|
||||
def list_events(self, tenant_id, name, dimensions):
|
||||
return {}
|
||||
return {}
|
||||
|
@@ -23,5 +23,5 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
def list_metrics(self, tenant_id, name, dimensions):
|
||||
return {}
|
||||
def list_metrics(self, tenant_id, name, dimensions, offset, limit):
|
||||
return {}
|
||||
|
@@ -13,14 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
import urllib
|
||||
|
||||
from influxdb import client
|
||||
from oslo.config import cfg
|
||||
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories import metrics_repository
|
||||
from monasca.openstack.common import log
|
||||
@@ -30,57 +26,64 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
MULTIPLE_METRICS_MESSAGE = ("Found multiple metrics matching metric name"
|
||||
+ " and dimensions. Please refine your search"
|
||||
+ " criteria using a unique"
|
||||
+ " metric name or additional dimensions."
|
||||
+ " Alternatively, you may specify"
|
||||
+ " 'merge_metrics=True' as a query"
|
||||
+ " parameter to combine all metrics"
|
||||
+ " matching search criteria into a single"
|
||||
+ " series.")
|
||||
|
||||
def __init__(self):
|
||||
|
||||
try:
|
||||
|
||||
self.conf = cfg.CONF
|
||||
self.influxdb_client = client.InfluxDBClient(
|
||||
self.conf.influxdb.ip_address, self.conf.influxdb.port,
|
||||
self.conf.influxdb.user, self.conf.influxdb.password,
|
||||
self.conf.influxdb.database_name)
|
||||
|
||||
# compile regex only once for efficiency
|
||||
self._serie_name_reqex = re.compile(
|
||||
'([^?&=]+)\?([^?&=]+)&([^?&=]+)(&[^?&=]+=[^?&=]+)*')
|
||||
self._serie_tenant_id_region_name_regex = re.compile(
|
||||
'[^?&=]+\?[^?&=]+&[^?&=]+')
|
||||
self._serie_name_dimension_regex = re.compile('&[^?&=]+=[^?&=]+')
|
||||
self._serie_name_dimension_parts_regex = re.compile(
|
||||
'&([^?&=]+)=([^?&=]+)')
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception()
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
||||
def _build_list_series_query(self, dimensions, name, tenant_id, region):
|
||||
def _build_show_series_query(self, dimensions, name, tenant_id, region):
|
||||
|
||||
regex_clause = self._build_regex_clause(dimensions, name, tenant_id,
|
||||
where_clause = self._build_where_clause(dimensions, name, tenant_id,
|
||||
region)
|
||||
|
||||
query = 'list series ' + regex_clause
|
||||
query = 'show series ' + where_clause
|
||||
|
||||
return query
|
||||
|
||||
def _build_select_query(self, dimensions, name, tenant_id,
|
||||
region, start_timestamp, end_timestamp, offset):
|
||||
def _build_select_measurement_query(self, dimensions, name, tenant_id,
|
||||
region, start_timestamp, end_timestamp,
|
||||
offset, limit):
|
||||
|
||||
from_clause = self._build_from_clause(dimensions, name, tenant_id,
|
||||
region, start_timestamp,
|
||||
end_timestamp)
|
||||
|
||||
offset_clause = self._build_offset_clause(offset)
|
||||
offset_clause = self._build_offset_clause(offset, limit)
|
||||
|
||||
query = 'select * ' + from_clause + offset_clause
|
||||
query = 'select value, value_meta ' + from_clause + offset_clause
|
||||
|
||||
return query
|
||||
|
||||
def _build_statistics_query(self, dimensions, name, tenant_id,
|
||||
region, start_timestamp, end_timestamp,
|
||||
statistics, period):
|
||||
statistics, period, offset, limit):
|
||||
|
||||
from_clause = self._build_from_clause(dimensions, name, tenant_id,
|
||||
region, start_timestamp,
|
||||
end_timestamp)
|
||||
if offset:
|
||||
offset_clause = (" and time > '{}'".format(offset))
|
||||
from_clause += offset_clause
|
||||
|
||||
statistics = [statistic.replace('avg', 'mean') for statistic in
|
||||
statistics]
|
||||
@@ -95,66 +98,68 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
query += " group by time(" + period + "s)"
|
||||
|
||||
limit_clause = " limit {}".format(str(limit + 1))
|
||||
|
||||
query += limit_clause
|
||||
|
||||
return query
|
||||
|
||||
def _build_regex_clause(self, dimensions, name, tenant_id, region,
|
||||
def _build_where_clause(self, dimensions, name, tenant_id, region,
|
||||
start_timestamp=None, end_timestamp=None):
|
||||
|
||||
regex_clause = '/^'
|
||||
|
||||
# tenant id
|
||||
regex_clause += urllib.quote(tenant_id.encode('utf8'), safe='')
|
||||
|
||||
# region
|
||||
regex_clause += '\?' + urllib.quote(region.encode('utf8'), safe='')
|
||||
where_clause = ''
|
||||
|
||||
# name - optional
|
||||
if name:
|
||||
regex_clause += '&' + urllib.quote(name.encode('utf8'), safe='')
|
||||
regex_clause += '(&|$)'
|
||||
where_clause += ' from "{}" '.format(name.encode('utf8'))
|
||||
|
||||
# tenant id
|
||||
where_clause += " where _tenant_id = '{}' ".format(tenant_id.encode(
|
||||
"utf8"))
|
||||
|
||||
# region
|
||||
where_clause += " and _region = '{}' ".format(region.encode('utf8'))
|
||||
|
||||
# dimensions - optional
|
||||
if dimensions:
|
||||
for dimension_name, dimension_value in iter(
|
||||
sorted(dimensions.iteritems())):
|
||||
regex_clause += '(.*&)*'
|
||||
regex_clause += urllib.quote(dimension_name.encode('utf8'),
|
||||
safe='')
|
||||
regex_clause += '='
|
||||
regex_clause += urllib.quote(dimension_value.encode('utf8'),
|
||||
safe='')
|
||||
regex_clause += '(&|$)'
|
||||
where_clause += " and {} = '{}'".format(
|
||||
dimension_name.encode('utf8'), dimension_value.encode(
|
||||
'utf8'))
|
||||
|
||||
regex_clause += '/'
|
||||
if start_timestamp:
|
||||
where_clause += " and time > " + str(int(start_timestamp)) + "s"
|
||||
|
||||
if start_timestamp is not None:
|
||||
# subtract 1 from timestamp to get >= semantics
|
||||
regex_clause += " where time > " + str(start_timestamp - 1) + "s"
|
||||
if end_timestamp is not None:
|
||||
# add 1 to timestamp to get <= semantics
|
||||
regex_clause += " and time < " + str(end_timestamp + 1) + "s"
|
||||
if end_timestamp:
|
||||
where_clause += " and time < " + str(int(end_timestamp)) + "s"
|
||||
|
||||
return regex_clause
|
||||
return where_clause
|
||||
|
||||
def _build_from_clause(self, dimensions, name, tenant_id, region,
|
||||
start_timestamp=None, end_timestamp=None):
|
||||
from_clause = 'from '
|
||||
from_clause += self._build_regex_clause(dimensions, name, tenant_id,
|
||||
region, start_timestamp,
|
||||
end_timestamp)
|
||||
|
||||
from_clause = self._build_where_clause(dimensions, name, tenant_id,
|
||||
region, start_timestamp,
|
||||
end_timestamp)
|
||||
return from_clause
|
||||
|
||||
def list_metrics(self, tenant_id, region, name, dimensions, offset):
|
||||
def list_metrics(self, tenant_id, region, name, dimensions, offset,
|
||||
limit):
|
||||
|
||||
try:
|
||||
|
||||
query = self._build_list_series_query(dimensions, name, tenant_id,
|
||||
query = self._build_show_series_query(dimensions, name, tenant_id,
|
||||
region)
|
||||
|
||||
result = self.influxdb_client.query(query, 's')
|
||||
query += " limit {}".format(limit + 1)
|
||||
|
||||
json_metric_list = self._decode_influxdb_serie_name_list(result,
|
||||
offset)
|
||||
if offset:
|
||||
query += ' offset {}'.format(int(offset) + 1)
|
||||
|
||||
result = self.influxdb_client.query(query)
|
||||
|
||||
json_metric_list = self._build_serie_name_list(result)
|
||||
|
||||
return json_metric_list
|
||||
|
||||
@@ -162,199 +167,94 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
||||
def _decode_influxdb_serie_name_list(self, series_names, offset):
|
||||
|
||||
"""Example series_names from InfluxDB.
|
||||
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
"tenant?useast&%E5%8D%83&dim1=%E5%8D%83&dim2=%E5%8D%83"
|
||||
]
|
||||
],
|
||||
"name": "list_series_result",
|
||||
"columns": [
|
||||
"time",
|
||||
"name"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
:param series_names:
|
||||
:return:
|
||||
"""
|
||||
def _build_serie_name_list(self, series_names):
|
||||
|
||||
json_metric_list = []
|
||||
serie_names_list_list = series_names[0]['points']
|
||||
for serie_name_list in serie_names_list_list:
|
||||
serie_name = serie_name_list[1]
|
||||
if offset is not None:
|
||||
if serie_name < urllib.unquote(offset):
|
||||
continue
|
||||
metric = self._decode_influxdb_serie_name(serie_name)
|
||||
if metric is None:
|
||||
continue
|
||||
json_metric_list.append(metric)
|
||||
|
||||
if offset is not None:
|
||||
if len(json_metric_list) >= constants.PAGE_LIMIT:
|
||||
break
|
||||
if not series_names:
|
||||
return json_metric_list
|
||||
|
||||
if 'series' in series_names.raw['results'][0]:
|
||||
|
||||
id = 0
|
||||
|
||||
for series in series_names.raw['results'][0]['series']:
|
||||
|
||||
for tag_values in series[u'values']:
|
||||
|
||||
dimensions = {}
|
||||
i = 0
|
||||
for name in series[u'columns']:
|
||||
if name not in [u'_id', u'_tenant_id', u'_region']:
|
||||
if tag_values[i]:
|
||||
dimensions[name] = tag_values[i]
|
||||
i += 1
|
||||
|
||||
metric = {u'id': str(id),
|
||||
u'name': series[u'name'],
|
||||
u'dimensions': dimensions}
|
||||
id += 1
|
||||
|
||||
json_metric_list.append(metric)
|
||||
|
||||
return json_metric_list
|
||||
|
||||
def _decode_influxdb_serie_name(self, serie_name):
|
||||
|
||||
"""Decodes a serie name from InfluxDB.
|
||||
|
||||
The raw serie name is
|
||||
formed by url encoding the tenant id, region, name, and dimensions,
|
||||
and concatenating them into a quasi URL query string.
|
||||
|
||||
urlencode(tenant)?urlencode(region)&urlencode(name)[&urlencode(
|
||||
dim_name)=urlencode(dim_value)]...
|
||||
|
||||
|
||||
:param serie_name:
|
||||
:return:
|
||||
"""
|
||||
|
||||
match = self._serie_name_reqex.match(serie_name)
|
||||
if match:
|
||||
|
||||
# throw tenant_id (match.group(1) and region (match.group(2) away
|
||||
|
||||
metric_name = (
|
||||
urllib.unquote_plus(match.group(3).encode(
|
||||
'utf8')).decode('utf8'))
|
||||
|
||||
metric = {u'name': metric_name,
|
||||
u'id': urllib.quote(serie_name)}
|
||||
|
||||
# only returns the last match. we need all dimensions.
|
||||
dimensions = match.group(4)
|
||||
if dimensions:
|
||||
# remove the name, tenant_id, and region; just
|
||||
# dimensions remain
|
||||
dimensions_part = self._serie_tenant_id_region_name_regex.sub(
|
||||
'', serie_name)
|
||||
dimensions = {}
|
||||
dimension_list = self._serie_name_dimension_regex.findall(
|
||||
dimensions_part)
|
||||
for dimension in dimension_list:
|
||||
match = self._serie_name_dimension_parts_regex.match(
|
||||
dimension)
|
||||
dimension_name = urllib.unquote(
|
||||
match.group(1).encode('utf8')).decode('utf8')
|
||||
dimension_value = urllib.unquote(
|
||||
match.group(2).encode('utf8')).decode('utf8')
|
||||
dimensions[dimension_name] = dimension_value
|
||||
|
||||
metric["dimensions"] = dimensions
|
||||
else:
|
||||
metric = None
|
||||
|
||||
return metric
|
||||
|
||||
def measurement_list(self, tenant_id, region, name, dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp, offset):
|
||||
"""Example result from InfluxDB.
|
||||
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
[
|
||||
1413230362,
|
||||
5369370001,
|
||||
99.99
|
||||
]
|
||||
],
|
||||
"name": "tenant?useast&%E5%8D%83&dim1=%E5%8D%83&dim2
|
||||
=%E5%8D%83",
|
||||
"columns": [
|
||||
"time",
|
||||
"sequence_number",
|
||||
"value"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
After url decoding the result would look like this. In this example
|
||||
the name, dim1 value, and dim2 value were non-ascii chars.
|
||||
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
[
|
||||
1413230362,
|
||||
5369370001,
|
||||
99.99
|
||||
]
|
||||
],
|
||||
"name": "tenant?useast&千&dim1=千&dim2=千",
|
||||
"columns": [
|
||||
"time",
|
||||
"sequence_number",
|
||||
"value"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
:param tenant_id:
|
||||
:param name:
|
||||
:param dimensions:
|
||||
:return:
|
||||
"""
|
||||
start_timestamp, end_timestamp, offset,
|
||||
limit, merge_metrics_flag):
|
||||
|
||||
json_measurement_list = []
|
||||
|
||||
try:
|
||||
query = self._build_select_query(dimensions, name, tenant_id,
|
||||
region, start_timestamp,
|
||||
end_timestamp, offset)
|
||||
|
||||
try:
|
||||
result = self.influxdb_client.query(query, 'ms')
|
||||
except client.InfluxDBClientError as ex:
|
||||
# check for non-existent serie name.
|
||||
msg = "Couldn't look up columns"
|
||||
if ex.code == 400 and ex.content == (msg):
|
||||
if not merge_metrics_flag:
|
||||
|
||||
metrics_list = self.list_metrics(tenant_id, region, name,
|
||||
dimensions, None, 2)
|
||||
|
||||
if len(metrics_list) > 1:
|
||||
raise (exceptions.RepositoryException(
|
||||
MetricsRepository.MULTIPLE_METRICS_MESSAGE))
|
||||
|
||||
query = self._build_select_measurement_query(dimensions, name,
|
||||
tenant_id,
|
||||
region,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
offset, limit)
|
||||
|
||||
if not merge_metrics_flag:
|
||||
query += " slimit 1"
|
||||
|
||||
result = self.influxdb_client.query(query)
|
||||
|
||||
if not result:
|
||||
return json_measurement_list
|
||||
|
||||
if 'error' in result.raw['results'][0]:
|
||||
if (result.raw['results'][0]['error'].startswith(
|
||||
"measurement not found")):
|
||||
return json_measurement_list
|
||||
else:
|
||||
raise ex
|
||||
|
||||
for serie in result:
|
||||
for serie in result.raw['results'][0]['series']:
|
||||
|
||||
metric = self._decode_influxdb_serie_name(serie['name'])
|
||||
if 'values' in serie:
|
||||
|
||||
if metric is None:
|
||||
continue
|
||||
measurements_list = []
|
||||
for point in serie['values']:
|
||||
|
||||
# Replace 'sequence_number' -> 'id' for column name
|
||||
columns = [column.replace('sequence_number', 'id') for column
|
||||
in serie['columns']]
|
||||
# Replace 'time' -> 'timestamp' for column name
|
||||
columns = [column.replace('time', 'timestamp') for column in
|
||||
columns]
|
||||
measurements_list.append([point[0],
|
||||
point[1],
|
||||
json.loads(point[2])])
|
||||
|
||||
# format the utc date in the points
|
||||
fmtd_pts = [['%s.%03dZ' % (time.strftime('%Y-%m-%dT%H:%M:%S',
|
||||
time.gmtime(
|
||||
point[0] / 1000)),
|
||||
point[0] % 1000), point[1],
|
||||
point[2]] for point in serie['points']]
|
||||
measurement = {u'name': serie['name'],
|
||||
u'id': measurements_list[-1][0],
|
||||
u'dimensions': dimensions,
|
||||
u'columns': [u'timestamp', u'value',
|
||||
u'value_meta'],
|
||||
u'measurements': measurements_list}
|
||||
|
||||
# Set the last point's time as the id. Used for next link.
|
||||
measurement = {u"name": metric['name'],
|
||||
u"id": serie['points'][-1][0],
|
||||
u"dimensions": metric['dimensions'],
|
||||
u"columns": columns,
|
||||
u"measurements": fmtd_pts}
|
||||
|
||||
json_measurement_list.append(measurement)
|
||||
json_measurement_list.append(measurement)
|
||||
|
||||
return json_measurement_list
|
||||
|
||||
@@ -364,52 +264,57 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
def metrics_statistics(self, tenant_id, region, name, dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp, statistics, period):
|
||||
end_timestamp, statistics, period, offset, limit,
|
||||
merge_metrics_flag):
|
||||
|
||||
json_statistics_list = []
|
||||
|
||||
try:
|
||||
|
||||
if not merge_metrics_flag:
|
||||
metrics_list = self.list_metrics(tenant_id, region, name,
|
||||
dimensions, None, 2)
|
||||
|
||||
if len(metrics_list) > 1:
|
||||
raise (exceptions.RepositoryException(
|
||||
MetricsRepository.MULTIPLE_METRICS_MESSAGE))
|
||||
|
||||
query = self._build_statistics_query(dimensions, name, tenant_id,
|
||||
region,
|
||||
start_timestamp,
|
||||
end_timestamp, statistics,
|
||||
period)
|
||||
period, offset, limit)
|
||||
|
||||
try:
|
||||
result = self.influxdb_client.query(query, 's')
|
||||
except client.InfluxDBClientError as ex:
|
||||
# check for non-existent serie name.
|
||||
msg = "Couldn't look up columns"
|
||||
if ex.code == 400 and ex.content == (msg):
|
||||
if not merge_metrics_flag:
|
||||
query += " slimit 1"
|
||||
|
||||
result = self.influxdb_client.query(query)
|
||||
|
||||
if not result:
|
||||
return json_statistics_list
|
||||
|
||||
if 'error' in result.raw['results'][0]:
|
||||
if (result.raw['results'][0]['error'].startswith(
|
||||
"measurement not found")):
|
||||
return json_statistics_list
|
||||
else:
|
||||
raise ex
|
||||
|
||||
for serie in result:
|
||||
for serie in result.raw['results'][0]['series']:
|
||||
|
||||
metric = self._decode_influxdb_serie_name(serie['name'])
|
||||
if 'values' in serie:
|
||||
|
||||
if metric is None:
|
||||
continue
|
||||
columns = ([column.replace('mean', 'avg') for column in
|
||||
result.raw['results'][0]['series'][0][
|
||||
'columns']])
|
||||
|
||||
# Replace 'avg' -> 'mean' for column name
|
||||
columns = [column.replace('mean', 'avg') for column in
|
||||
serie['columns']]
|
||||
# Replace 'time' -> 'timestamp' for column name
|
||||
columns = [column.replace('time', 'timestamp') for column in
|
||||
columns]
|
||||
stats_list = [stats for stats in serie['values']]
|
||||
|
||||
fmtd_pts_list_list = [[time.strftime("%Y-%m-%dT%H:%M:%SZ",
|
||||
time.gmtime(pts_list[
|
||||
0]))] + pts_list[1:]
|
||||
for pts_list in serie['points']]
|
||||
statistic = {u'name': serie['name'],
|
||||
u'id': stats_list[-1][0],
|
||||
u'dimensions': dimensions,
|
||||
u'columns': columns,
|
||||
u'statistics': stats_list}
|
||||
|
||||
measurement = {"name": metric['name'],
|
||||
"dimensions": metric['dimensions'],
|
||||
"columns": columns,
|
||||
"measurements": fmtd_pts_list_list}
|
||||
|
||||
json_statistics_list.append(measurement)
|
||||
json_statistics_list.append(statistic)
|
||||
|
||||
return json_statistics_list
|
||||
|
||||
@@ -417,71 +322,21 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
||||
def _build_offset_clause(self, offset):
|
||||
def _build_offset_clause(self, offset, limit):
|
||||
|
||||
if offset is not None:
|
||||
|
||||
# If offset is not empty.
|
||||
if offset:
|
||||
|
||||
offset_clause = (
|
||||
' and time < {}s limit {}'.
|
||||
format(offset, str(constants.PAGE_LIMIT)))
|
||||
else:
|
||||
|
||||
offset_clause = ' limit {}'.format(str(
|
||||
constants.PAGE_LIMIT))
|
||||
if offset:
|
||||
|
||||
offset_clause = (
|
||||
" and time > '{}' limit {}".format(offset, str(limit + 1)))
|
||||
else:
|
||||
|
||||
offset_clause = ''
|
||||
offset_clause = " limit {}".format(str(limit + 1))
|
||||
|
||||
return offset_clause
|
||||
|
||||
def alarm_history(self, tenant_id, alarm_id_list,
|
||||
offset, start_timestamp=None,
|
||||
offset, limit, start_timestamp=None,
|
||||
end_timestamp=None):
|
||||
"""Example result from Influxdb.
|
||||
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
[
|
||||
1415894490,
|
||||
272140001,
|
||||
"6ac10841-d02f-4f7d-a191-ae0a3d9a25f2",
|
||||
"[{\"name\": \"cpu.system_perc\", \"dimensions\": {
|
||||
\"hostname\": \"mini-mon\", \"component\":
|
||||
\"monasca-agent\", \"service\": \"monitoring\"}},
|
||||
{\"name\": \"load.avg_1_min\", \"dimensions\": {
|
||||
\"hostname\": \"mini-mon\", \"component\":
|
||||
\"monasca-agent\", \"service\": \"monitoring\"}}]",
|
||||
"ALARM",
|
||||
"OK",
|
||||
"Thresholds were exceeded for the sub-alarms: [max(
|
||||
load.avg_1_min{hostname=mini-mon}) > 0.0,
|
||||
max(cpu.system_perc) > 0.0]",
|
||||
"{}"
|
||||
],
|
||||
],
|
||||
"name": "alarm_state_history",
|
||||
"columns": [
|
||||
"time",
|
||||
"sequence_number",
|
||||
"alarm_id",
|
||||
"metrics",
|
||||
"new_state",
|
||||
"old_state",
|
||||
"reason",
|
||||
"reason_data"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
:param tenant_id:
|
||||
:param alarm_id:
|
||||
:return:
|
||||
"""
|
||||
|
||||
try:
|
||||
|
||||
@@ -497,8 +352,8 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
"semi-colon [;] characters[ {} ]".format(alarm_id))
|
||||
|
||||
query = """
|
||||
select alarm_id, metrics, old_state, new_state,
|
||||
reason, reason_data
|
||||
select alarm_id, metrics, new_state, old_state,
|
||||
reason, reason_data, sub_alarms, tenant_id
|
||||
from alarm_state_history
|
||||
"""
|
||||
|
||||
@@ -515,45 +370,43 @@ class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
time_clause = ''
|
||||
if start_timestamp:
|
||||
# subtract 1 from timestamp to get >= semantics
|
||||
time_clause += " and time > " + str(start_timestamp - 1) + "s"
|
||||
if end_timestamp:
|
||||
# add 1 to timestamp to get <= semantics
|
||||
time_clause += " and time < " + str(end_timestamp + 1) + "s"
|
||||
time_clause += (" and time > " + str(int(start_timestamp)) +
|
||||
"s ")
|
||||
|
||||
offset_clause = self._build_offset_clause(offset)
|
||||
if end_timestamp:
|
||||
time_clause += " and time < " + str(int(end_timestamp)) + "s "
|
||||
|
||||
offset_clause = self._build_offset_clause(offset, limit)
|
||||
|
||||
query += where_clause + time_clause + offset_clause
|
||||
|
||||
try:
|
||||
result = self.influxdb_client.query(query, 's')
|
||||
except client.InfluxDBClientError as ex:
|
||||
# check for non-existent serie name. only happens
|
||||
# if alarm_state_history serie does not exist.
|
||||
msg = "Couldn't look up columns"
|
||||
if ex.code == 400 and ex.content == (msg):
|
||||
result = self.influxdb_client.query(query)
|
||||
|
||||
if 'error' in result.raw['results'][0]:
|
||||
if (result.raw['results'][0]['error'].startswith(
|
||||
"measurement not found")):
|
||||
return json_alarm_history_list
|
||||
else:
|
||||
raise ex
|
||||
|
||||
if not result:
|
||||
return json_alarm_history_list
|
||||
|
||||
# There's only one serie, alarm_state_history.
|
||||
for point in result[0]['points']:
|
||||
alarm_point = {u'alarm_id': point[2],
|
||||
u'metrics': json.loads(point[3]),
|
||||
u'old_state': point[4], u'new_state': point[5],
|
||||
u'reason': point[6], u'reason_data': point[7],
|
||||
u'timestamp': time.strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ",
|
||||
time.gmtime(point[0])),
|
||||
u'id': point[0]}
|
||||
if 'values' in result.raw['results'][0]['series'][0]:
|
||||
|
||||
json_alarm_history_list.append(alarm_point)
|
||||
for point in result.raw['results'][0]['series'][0]['values']:
|
||||
alarm_point = {u'timestamp': point[0],
|
||||
u'alarm_id': point[1],
|
||||
u'metrics': json.loads(point[2]),
|
||||
u'new_state': point[3],
|
||||
u'old_state': point[4],
|
||||
u'reason': point[5],
|
||||
u'reason_data': point[6],
|
||||
u'sub_alarms': json.loads(point[7]),
|
||||
u'tenant_id': point[8]}
|
||||
|
||||
json_alarm_history_list.append(alarm_point)
|
||||
|
||||
return json_alarm_history_list
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
@@ -20,20 +20,22 @@ import six
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MetricsRepository(object):
|
||||
@abc.abstractmethod
|
||||
def list_metrics(self, tenant_id, region, name, dimensions, offset):
|
||||
def list_metrics(self, tenant_id, region, name, dimensions, offset, limit):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def measurement_list(self, tenant_id, region, name, dimensions,
|
||||
start_timestamp, end_timestamp, offset):
|
||||
start_timestamp, end_timestamp, offset, limit,
|
||||
merge_metrics_flag):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def metrics_statistics(self, tenant_id, region, name, dimensions,
|
||||
start_timestamp, end_timestamp, statistics, period):
|
||||
start_timestamp, end_timestamp, statistics,
|
||||
period, offset, limit, merge_metrics_flag):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def alarm_history(self, tenant_id, alarm_id_list,
|
||||
offset, start_timestamp, end_timestamp):
|
||||
pass
|
||||
offset, limit, start_timestamp, end_timestamp):
|
||||
pass
|
||||
|
@@ -140,4 +140,4 @@ class SubAlarmDefinition(object):
|
||||
self.dimensions == other.dimensions and
|
||||
self.function == other.function and
|
||||
self.period == other.period and
|
||||
self.periods == other.periods)
|
||||
self.periods == other.periods)
|
||||
|
@@ -14,7 +14,6 @@
|
||||
import datetime
|
||||
|
||||
from monasca.common.repositories import alarm_definitions_repository as adr
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories.model import sub_alarm_definition
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
@@ -76,7 +75,8 @@ class AlarmDefinitionsRepository(mysql_repository.MySQLRepository,
|
||||
raise exceptions.DoesNotExistException
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def get_alarm_definitions(self, tenant_id, name, dimensions, offset):
|
||||
def get_alarm_definitions(self, tenant_id, name, dimensions, offset,
|
||||
limit):
|
||||
|
||||
parms = [tenant_id]
|
||||
|
||||
@@ -88,15 +88,14 @@ class AlarmDefinitionsRepository(mysql_repository.MySQLRepository,
|
||||
where_clause += " and ad.name = %s "
|
||||
parms.append(name.encode('utf8'))
|
||||
|
||||
if offset is not None:
|
||||
order_by_clause = " order by ad.id, ad.created_at "
|
||||
order_by_clause = " order by ad.id, ad.created_at "
|
||||
|
||||
if offset:
|
||||
where_clause += " and ad.id > %s "
|
||||
parms.append(offset.encode('utf8'))
|
||||
limit_clause = " limit %s "
|
||||
parms.append(constants.PAGE_LIMIT)
|
||||
else:
|
||||
order_by_clause = " order by ad.created_at "
|
||||
limit_clause = ""
|
||||
|
||||
limit_clause = " limit %s "
|
||||
parms.append(limit + 1)
|
||||
|
||||
if dimensions:
|
||||
inner_join = """ inner join sub_alarm_definition as sad
|
||||
|
@@ -13,7 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
from monasca.common.repositories import alarms_repository
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
from monasca.openstack.common import log
|
||||
@@ -195,7 +194,7 @@ class AlarmsRepository(mysql_repository.MySQLRepository,
|
||||
return rows
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def get_alarms(self, tenant_id, query_parms, offset):
|
||||
def get_alarms(self, tenant_id, query_parms, offset, limit):
|
||||
|
||||
parms = [tenant_id]
|
||||
|
||||
@@ -205,11 +204,13 @@ class AlarmsRepository(mysql_repository.MySQLRepository,
|
||||
|
||||
where_clause = " where ad.tenant_id = %s "
|
||||
|
||||
if offset is not None:
|
||||
if offset:
|
||||
where_clause += " and ad.id > %s"
|
||||
parms.append(offset.encode('utf8'))
|
||||
|
||||
if limit:
|
||||
limit_clause = " limit %s "
|
||||
parms.append(constants.PAGE_LIMIT)
|
||||
parms.append(limit + 1)
|
||||
else:
|
||||
limit_clause = ""
|
||||
|
||||
|
@@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import events_repository as er
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
from monasca.openstack.common import log
|
||||
@@ -68,8 +67,6 @@ class EventsRepository(mysql_repository.MySQLRepository,
|
||||
return rows
|
||||
|
||||
def _find_event_ids(self, offset, limit):
|
||||
if not limit:
|
||||
limit = constants.PAGE_LIMIT
|
||||
|
||||
parameters = []
|
||||
|
||||
|
@@ -28,4 +28,4 @@ db = peewee.MySQLDatabase(cfg.CONF.mysql.database_name,
|
||||
|
||||
class Model(peewee.Model):
|
||||
class Meta:
|
||||
database = db
|
||||
database = db
|
||||
|
@@ -14,7 +14,6 @@
|
||||
|
||||
import datetime
|
||||
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
from monasca.common.repositories import notifications_repository as nr
|
||||
@@ -78,7 +77,7 @@ class NotificationsRepository(mysql_repository.MySQLRepository,
|
||||
return notification_id
|
||||
|
||||
@mysql_repository.mysql_try_catch_block
|
||||
def list_notifications(self, tenant_id, offset):
|
||||
def list_notifications(self, tenant_id, offset, limit):
|
||||
|
||||
query = """
|
||||
select *
|
||||
@@ -87,10 +86,12 @@ class NotificationsRepository(mysql_repository.MySQLRepository,
|
||||
|
||||
parms = [tenant_id]
|
||||
|
||||
if offset is not None:
|
||||
query += " and id > %s order by id limit %s"
|
||||
if offset:
|
||||
query += " and id > %s "
|
||||
parms.append(offset.encode('utf8'))
|
||||
parms.append(constants.PAGE_LIMIT)
|
||||
|
||||
query += " order by id limit %s "
|
||||
parms.append(limit + 1)
|
||||
|
||||
rows = self._execute_query(query, parms)
|
||||
|
||||
|
@@ -16,7 +16,6 @@ import uuid
|
||||
import MySQLdb
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from monasca.common.repositories import constants
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.common.repositories.mysql import mysql_repository
|
||||
from monasca.common.repositories import streams_repository as sdr
|
||||
@@ -85,15 +84,14 @@ class StreamsRepository(mysql_repository.MySQLRepository,
|
||||
where_clause += " and sd.name = %s "
|
||||
parms.append(name.encode('utf8'))
|
||||
|
||||
if offset is not None:
|
||||
order_by_clause = " order by sd.id, sd.created_at "
|
||||
order_by_clause = " order by sd.id, sd.created_at "
|
||||
|
||||
if offset:
|
||||
where_clause += " and sd.id > %s "
|
||||
parms.append(offset.encode('utf8'))
|
||||
limit_clause = " limit %s "
|
||||
parms.append(constants.PAGE_LIMIT)
|
||||
else:
|
||||
order_by_clause = " order by sd.created_at "
|
||||
limit_clause = ""
|
||||
|
||||
limit_clause = " limit %s "
|
||||
parms.append(limit + 1)
|
||||
|
||||
query = select_clause + where_clause + order_by_clause + limit_clause
|
||||
|
||||
|
@@ -26,7 +26,7 @@ class NotificationsRepository(object):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_notifications(self, tenant_id, offset):
|
||||
def list_notifications(self, tenant_id, offset, limit):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@@ -144,4 +144,4 @@ class ResourceAPI(falcon.API):
|
||||
(resource_name, driver_name))
|
||||
self.add_route(uri, resource_driver)
|
||||
LOG.debug('%s dispatcher driver has been added to the routes!' %
|
||||
(resource_name))
|
||||
(resource_name))
|
||||
|
@@ -162,11 +162,11 @@ class AlarmDefinitions(alarm_definitions_api_v2.AlarmDefinitionsV2API,
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
dimensions = helpers.get_query_dimensions(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
|
||||
result = self._alarm_definition_list(tenant_id, name, dimensions,
|
||||
req.uri, offset)
|
||||
req.uri, offset, limit)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
@@ -286,12 +286,12 @@ class AlarmDefinitions(alarm_definitions_api_v2.AlarmDefinitionsV2API,
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _alarm_definition_list(self, tenant_id, name, dimensions, req_uri,
|
||||
offset):
|
||||
offset, limit):
|
||||
|
||||
alarm_definition_rows = (
|
||||
self._alarm_definitions_repo.get_alarm_definitions(tenant_id, name,
|
||||
dimensions,
|
||||
offset))
|
||||
offset, limit))
|
||||
|
||||
result = []
|
||||
for alarm_definition_row in alarm_definition_rows:
|
||||
@@ -324,7 +324,7 @@ class AlarmDefinitions(alarm_definitions_api_v2.AlarmDefinitionsV2API,
|
||||
helpers.add_links_to_resource(ad, req_uri)
|
||||
result.append(ad)
|
||||
|
||||
result = helpers.paginate(result, req_uri, offset)
|
||||
result = helpers.paginate(result, req_uri, limit)
|
||||
|
||||
return result
|
||||
|
||||
|
@@ -98,10 +98,12 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
|
||||
query_parms = falcon.uri.parse_query_string(req.query_string)
|
||||
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
|
||||
result = self._alarm_list(req.uri, tenant_id, query_parms, offset)
|
||||
limit = helpers.get_limit(req)
|
||||
|
||||
result = self._alarm_list(req.uri, tenant_id, query_parms, offset,
|
||||
limit)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
@@ -130,12 +132,12 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
start_timestamp = helpers.get_query_starttime_timestamp(req, False)
|
||||
end_timestamp = helpers.get_query_endtime_timestamp(req, False)
|
||||
query_parms = falcon.uri.parse_query_string(req.query_string)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
|
||||
result = self._alarm_history_list(tenant_id, start_timestamp,
|
||||
end_timestamp, query_parms,
|
||||
req.uri, offset)
|
||||
req.uri, offset, limit)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
@@ -145,10 +147,10 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
|
||||
result = self._alarm_history(tenant_id, [id], req.uri, offset)
|
||||
result = self._alarm_history(tenant_id, [id], req.uri, offset, limit)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
@@ -187,7 +189,8 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _alarm_history_list(self, tenant_id, start_timestamp,
|
||||
end_timestamp, query_parms, req_uri, offset):
|
||||
end_timestamp, query_parms, req_uri, offset,
|
||||
limit):
|
||||
|
||||
# get_alarms expects 'metric_dimensions' for dimensions key.
|
||||
if 'dimensions' in query_parms:
|
||||
@@ -195,22 +198,24 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
else:
|
||||
new_query_parms = {}
|
||||
|
||||
alarm_rows = self._alarms_repo.get_alarms(tenant_id, new_query_parms)
|
||||
alarm_rows = self._alarms_repo.get_alarms(tenant_id, new_query_parms,
|
||||
None, None)
|
||||
alarm_id_list = [alarm_row['alarm_id'] for alarm_row in alarm_rows]
|
||||
|
||||
result = self._metrics_repo.alarm_history(tenant_id, alarm_id_list,
|
||||
offset,
|
||||
offset, limit,
|
||||
start_timestamp,
|
||||
end_timestamp)
|
||||
|
||||
return helpers.paginate(result, req_uri, offset)
|
||||
return helpers.paginate(result, req_uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _alarm_history(self, tenant_id, alarm_id, req_uri, offset):
|
||||
def _alarm_history(self, tenant_id, alarm_id, req_uri, offset, limit):
|
||||
|
||||
result = self._metrics_repo.alarm_history(tenant_id, alarm_id, offset)
|
||||
result = self._metrics_repo.alarm_history(tenant_id, alarm_id, offset,
|
||||
limit)
|
||||
|
||||
return helpers.paginate(result, req_uri, offset)
|
||||
return helpers.paginate(result, req_uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _alarm_delete(self, tenant_id, id):
|
||||
@@ -266,10 +271,10 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
return alarm
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _alarm_list(self, req_uri, tenant_id, query_parms, offset):
|
||||
def _alarm_list(self, req_uri, tenant_id, query_parms, offset, limit):
|
||||
|
||||
alarm_rows = self._alarms_repo.get_alarms(tenant_id, query_parms,
|
||||
offset)
|
||||
offset, limit)
|
||||
|
||||
result = []
|
||||
|
||||
@@ -314,7 +319,7 @@ class Alarms(alarms_api_v2.AlarmsV2API, Alarming):
|
||||
|
||||
result.append(alarm)
|
||||
|
||||
return helpers.paginate(result, req_uri, offset)
|
||||
return helpers.paginate(result, req_uri, limit)
|
||||
|
||||
def _get_alarm_state(self, req):
|
||||
|
||||
|
@@ -88,7 +88,7 @@ class Events(monasca_events_api_v2.EventsV2API):
|
||||
@resource.resource_try_catch_block
|
||||
def _list_events(self, tenant_id, uri, offset, limit):
|
||||
rows = self._events_repo.list_events(tenant_id, offset, limit)
|
||||
return helpers.paginate(self._build_events(rows), uri, offset)
|
||||
return helpers.paginate(self._build_events(rows), uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _list_event(self, tenant_id, event_id, uri):
|
||||
@@ -144,9 +144,8 @@ class Events(monasca_events_api_v2.EventsV2API):
|
||||
def do_get_events(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
limit = helpers.get_query_param(req, 'limit')
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
|
||||
result = self._list_events(tenant_id, req.uri, offset, limit)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
|
@@ -14,6 +14,7 @@
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import urllib
|
||||
import urlparse
|
||||
|
||||
import falcon
|
||||
@@ -131,11 +132,6 @@ def get_query_param(req, param_name, required=False, default_val=None):
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def normalize_offset(offset):
|
||||
|
||||
return u'' if offset == u'x' else offset
|
||||
|
||||
|
||||
def get_query_name(req, name_required=False):
|
||||
"""Returns the query param "name" if supplied.
|
||||
|
||||
@@ -273,101 +269,180 @@ def validate_query_dimensions(dimensions):
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def paginate(resource, uri, offset):
|
||||
def paginate(resource, uri, limit):
|
||||
|
||||
if offset is not None:
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
|
||||
if resource:
|
||||
self_link = build_base_uri(parsed_uri)
|
||||
|
||||
if len(resource) >= constants.PAGE_LIMIT:
|
||||
old_query_params = _get_old_query_params(parsed_uri)
|
||||
|
||||
new_offset = resource[-1]['id']
|
||||
if old_query_params:
|
||||
self_link += '?' + '&'.join(old_query_params)
|
||||
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
if resource and len(resource) > limit:
|
||||
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
if 'timestamp' in resource[limit - 1]:
|
||||
new_offset = resource[limit - 1]['timestamp']
|
||||
|
||||
new_query_params = [u'offset' + '=' + str(new_offset).decode(
|
||||
'utf8')]
|
||||
if 'id' in resource[limit - 1]:
|
||||
new_offset = resource[limit - 1]['id']
|
||||
|
||||
for query_param in parsed_uri.query.split('&'):
|
||||
query_param_name, query_param_val = query_param.split('=')
|
||||
if query_param_name.lower() != 'offset':
|
||||
new_query_params.append(query_param)
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
new_query_params = [u'offset' + '=' + urllib.quote(
|
||||
new_offset.encode('utf8'), safe='')]
|
||||
|
||||
resource = {u'links':
|
||||
[{u'rel': u'self', u'href': uri.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}],
|
||||
u'elements': resource}
|
||||
_get_old_query_params_except_offset(new_query_params, parsed_uri)
|
||||
|
||||
else:
|
||||
if new_query_params:
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
|
||||
resource = {u'links':
|
||||
[{u'rel': u'self', u'href': uri.decode('utf8')}],
|
||||
u'elements': resource}
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}]),
|
||||
u'elements': resource[:limit]}
|
||||
|
||||
else:
|
||||
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')}]),
|
||||
u'elements': resource}
|
||||
|
||||
return resource
|
||||
|
||||
|
||||
def paginate_measurement(measurement, uri, offset):
|
||||
def paginate_measurement(measurement, uri, limit):
|
||||
|
||||
if offset is not None:
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
|
||||
if measurement['measurements']:
|
||||
self_link = build_base_uri(parsed_uri)
|
||||
|
||||
if len(measurement['measurements']) >= constants.PAGE_LIMIT:
|
||||
old_query_params = _get_old_query_params(parsed_uri)
|
||||
|
||||
new_offset = measurement['id']
|
||||
if old_query_params:
|
||||
self_link += '?' + '&'.join(old_query_params)
|
||||
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
if (measurement
|
||||
and measurement[0]
|
||||
and measurement[0]['measurements']
|
||||
and len(measurement[0]['measurements']) > limit):
|
||||
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
new_offset = measurement[0]['measurements'][limit - 1][0]
|
||||
|
||||
new_query_params = [u'offset' + '=' + str(new_offset).decode(
|
||||
'utf8')]
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
|
||||
# Add the query parms back to the URL without the original
|
||||
# offset and dimensions.
|
||||
for query_param in parsed_uri.query.split('&'):
|
||||
query_param_name, query_param_val = query_param.split('=')
|
||||
if (query_param_name.lower() != 'offset' and
|
||||
query_param_name.lower() != 'dimensions'):
|
||||
new_query_params.append(query_param)
|
||||
new_query_params = [u'offset' + '=' + urllib.quote(
|
||||
new_offset.encode('utf8'), safe='')]
|
||||
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
_get_old_query_params_except_offset(new_query_params, parsed_uri)
|
||||
|
||||
# Add the dimensions for this particular measurement.
|
||||
if measurement['dimensions']:
|
||||
dims = []
|
||||
for k, v in measurement['dimensions'].iteritems():
|
||||
dims.append(k + ":" + v)
|
||||
if new_query_params:
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
|
||||
if dims:
|
||||
next_link += '&dimensions' + ','.join(dims)
|
||||
truncated_measurement = {u'dimensions': measurement[0]['dimensions'],
|
||||
u'measurements': (measurement[0]
|
||||
['measurements'][:limit]),
|
||||
u'name': measurement[0]['name'],
|
||||
u'columns': measurement[0]['columns'],
|
||||
u'id': new_offset}
|
||||
|
||||
measurement = {u'links': [{u'rel': u'self',
|
||||
u'href': uri.decode('utf8')},
|
||||
{u'rel': u'next', u'href':
|
||||
next_link.decode('utf8')}],
|
||||
u'elements': measurement}
|
||||
|
||||
else:
|
||||
|
||||
measurement = {
|
||||
u'links': [
|
||||
{u'rel': u'self',
|
||||
u'href': uri.decode('utf8')}],
|
||||
u'elements': measurement
|
||||
}
|
||||
|
||||
return measurement
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}]),
|
||||
u'elements': truncated_measurement}
|
||||
|
||||
else:
|
||||
|
||||
return measurement
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')}]),
|
||||
u'elements': measurement}
|
||||
|
||||
return resource
|
||||
|
||||
|
||||
def _get_old_query_params(parsed_uri):
|
||||
|
||||
old_query_params = []
|
||||
|
||||
if parsed_uri.query:
|
||||
|
||||
for query_param in parsed_uri.query.split('&'):
|
||||
query_param_name, query_param_val = query_param.split('=')
|
||||
|
||||
old_query_params.append(urllib.quote(
|
||||
query_param_name.encode('utf8'), safe='')
|
||||
+ "="
|
||||
+ urllib.quote(query_param_val.encode('utf8'), safe=''))
|
||||
|
||||
return old_query_params
|
||||
|
||||
|
||||
def _get_old_query_params_except_offset(new_query_params, parsed_uri):
|
||||
|
||||
if parsed_uri.query:
|
||||
|
||||
for query_param in parsed_uri.query.split('&'):
|
||||
query_param_name, query_param_val = query_param.split('=')
|
||||
if query_param_name.lower() != 'offset':
|
||||
new_query_params.append(urllib.quote(
|
||||
query_param_name.encode(
|
||||
'utf8'), safe='') + "=" + urllib.quote(
|
||||
query_param_val.encode(
|
||||
'utf8'), safe=''))
|
||||
|
||||
|
||||
def paginate_statistics(statistic, uri, limit):
|
||||
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
|
||||
self_link = build_base_uri(parsed_uri)
|
||||
|
||||
old_query_params = _get_old_query_params(parsed_uri)
|
||||
|
||||
if old_query_params:
|
||||
self_link += '?' + '&'.join(old_query_params)
|
||||
|
||||
if (statistic
|
||||
and statistic[0]
|
||||
and statistic[0]['statistics']
|
||||
and len(statistic[0]['statistics']) > limit):
|
||||
|
||||
new_offset = (
|
||||
statistic[0]['statistics'][limit - 1][0])
|
||||
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
|
||||
new_query_params = [u'offset' + '=' + urllib.quote(
|
||||
new_offset.encode('utf8'), safe='')]
|
||||
|
||||
_get_old_query_params_except_offset(new_query_params, parsed_uri)
|
||||
|
||||
if new_query_params:
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
|
||||
truncated_statistic = {u'dimensions': statistic[0]['dimensions'],
|
||||
u'statistics': (statistic[0]['statistics'][
|
||||
:limit]),
|
||||
u'name': statistic[0]['name'],
|
||||
u'columns': statistic[0]['columns'],
|
||||
u'id': new_offset}
|
||||
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}]),
|
||||
u'elements': truncated_statistic}
|
||||
|
||||
else:
|
||||
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')}]),
|
||||
u'elements': statistic}
|
||||
|
||||
return resource
|
||||
|
||||
|
||||
def build_base_uri(parsed_uri):
|
||||
@@ -449,3 +524,28 @@ def raise_not_found_exception(resource_name, resource_id, tenant_id):
|
||||
def dumpit_utf8(thingy):
|
||||
|
||||
return json.dumps(thingy, ensure_ascii=False).encode('utf8')
|
||||
|
||||
|
||||
def str_2_bool(s):
|
||||
return s.lower() in ("true")
|
||||
|
||||
|
||||
def get_limit(req):
|
||||
|
||||
limit = get_query_param(req, 'limit')
|
||||
|
||||
if limit:
|
||||
if limit.isdigit():
|
||||
limit = int(limit)
|
||||
if limit > constants.PAGE_LIMIT:
|
||||
return constants.PAGE_LIMIT
|
||||
else:
|
||||
return limit
|
||||
else:
|
||||
raise falcon.HTTPBadRequest("Invalid limit",
|
||||
"Limit "
|
||||
"parameter must "
|
||||
"be "
|
||||
"an integer")
|
||||
else:
|
||||
return constants.PAGE_LIMIT
|
||||
|
@@ -21,6 +21,7 @@ from monasca.common.messaging.message_formats import metrics_transform_factory
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.reference import helpers
|
||||
from monasca.v2.reference import resource
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@@ -56,11 +57,7 @@ class Metrics(monasca_api_v2.V2API):
|
||||
ex.message)
|
||||
|
||||
def _validate_metrics(self, metrics):
|
||||
"""Validates the metrics
|
||||
|
||||
:param metrics: A metric object or array of metrics objects.
|
||||
:raises falcon.HTTPBadRequest
|
||||
"""
|
||||
try:
|
||||
if isinstance(metrics, list):
|
||||
for metric in metrics:
|
||||
@@ -84,11 +81,6 @@ class Metrics(monasca_api_v2.V2API):
|
||||
assert len(metric['dimensions'][d]) <= 255
|
||||
|
||||
def _send_metrics(self, metrics):
|
||||
"""Send the metrics using the message queue.
|
||||
|
||||
:param metrics: A metric object or array of metrics objects.
|
||||
:raises: falcon.HTTPServiceUnavailable
|
||||
"""
|
||||
|
||||
try:
|
||||
self._message_queue.send_message_batch(metrics)
|
||||
@@ -97,72 +89,51 @@ class Metrics(monasca_api_v2.V2API):
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
|
||||
def _list_metrics(self, tenant_id, name, dimensions, req_uri, offset):
|
||||
"""Query the metric repo for the metrics, format them and return them.
|
||||
@resource.resource_try_catch_block
|
||||
def _list_metrics(self, tenant_id, name, dimensions, req_uri, offset,
|
||||
limit):
|
||||
|
||||
:param tenant_id:
|
||||
:param name:
|
||||
:param dimensions:
|
||||
:raises falcon.HTTPServiceUnavailable
|
||||
"""
|
||||
result = self._metrics_repo.list_metrics(tenant_id,
|
||||
self._region,
|
||||
name,
|
||||
dimensions, offset, limit)
|
||||
|
||||
try:
|
||||
result = self._metrics_repo.list_metrics(tenant_id,
|
||||
return helpers.paginate(result, req_uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _measurement_list(self, tenant_id, name, dimensions, start_timestamp,
|
||||
end_timestamp, req_uri, offset,
|
||||
limit, merge_metrics_flag):
|
||||
|
||||
result = self._metrics_repo.measurement_list(tenant_id,
|
||||
self._region,
|
||||
name,
|
||||
dimensions, offset)
|
||||
dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
offset,
|
||||
limit,
|
||||
merge_metrics_flag)
|
||||
|
||||
return helpers.paginate(result, req_uri, offset)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
|
||||
def _measurement_list(self, tenant_id, name, dimensions, start_timestamp,
|
||||
end_timestamp, req_uri, offset):
|
||||
try:
|
||||
result = self._metrics_repo.measurement_list(tenant_id,
|
||||
self._region,
|
||||
name,
|
||||
dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
offset)
|
||||
|
||||
if offset is not None:
|
||||
|
||||
paginated_result = []
|
||||
for measurement in result:
|
||||
paginated_result.append(
|
||||
helpers.paginate_measurement(measurement,
|
||||
req_uri, offset))
|
||||
|
||||
result = {u'links': [{u'rel': u'self',
|
||||
u'href': req_uri.decode('utf8')}],
|
||||
u'elements': paginated_result}
|
||||
|
||||
return result
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
return helpers.paginate_measurement(result, req_uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _metric_statistics(self, tenant_id, name, dimensions, start_timestamp,
|
||||
end_timestamp, statistics, period):
|
||||
try:
|
||||
return self._metrics_repo.metrics_statistics(tenant_id,
|
||||
self._region,
|
||||
name,
|
||||
dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
statistics, period)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
end_timestamp, statistics, period, req_uri,
|
||||
offset, limit, merge_metrics_flag):
|
||||
|
||||
result = self._metrics_repo.metrics_statistics(tenant_id,
|
||||
self._region,
|
||||
name,
|
||||
dimensions,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
statistics, period,
|
||||
offset,
|
||||
limit,
|
||||
merge_metrics_flag)
|
||||
|
||||
return helpers.paginate_statistics(result, req_uri, limit)
|
||||
|
||||
@resource_api.Restify('/v2.0/metrics/', method='post')
|
||||
def do_post_metrics(self, req, res):
|
||||
@@ -187,10 +158,10 @@ class Metrics(monasca_api_v2.V2API):
|
||||
helpers.validate_query_name(name)
|
||||
dimensions = helpers.get_query_dimensions(req)
|
||||
helpers.validate_query_dimensions(dimensions)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
result = self._list_metrics(tenant_id, name, dimensions,
|
||||
req.uri, offset)
|
||||
req.uri, offset, limit)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@@ -198,17 +169,24 @@ class Metrics(monasca_api_v2.V2API):
|
||||
def do_get_measurements(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
name = helpers.get_query_name(req, True)
|
||||
helpers.validate_query_name(name)
|
||||
dimensions = helpers.get_query_dimensions(req)
|
||||
helpers.validate_query_dimensions(dimensions)
|
||||
start_timestamp = helpers.get_query_starttime_timestamp(req)
|
||||
end_timestamp = helpers.get_query_endtime_timestamp(req, False)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
merge_metrics_flag = helpers.get_query_param(req, 'merge_metrics',
|
||||
False,
|
||||
False)
|
||||
merge_metrics_flag = (
|
||||
self._get_boolean_merge_metrics_flag(merge_metrics_flag))
|
||||
|
||||
result = self._measurement_list(tenant_id, name, dimensions,
|
||||
start_timestamp, end_timestamp,
|
||||
req.uri, offset)
|
||||
req.uri, offset,
|
||||
limit, merge_metrics_flag)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
@@ -217,7 +195,7 @@ class Metrics(monasca_api_v2.V2API):
|
||||
def do_get_statistics(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
name = helpers.get_query_name(req, True)
|
||||
helpers.validate_query_name(name)
|
||||
dimensions = helpers.get_query_dimensions(req)
|
||||
helpers.validate_query_dimensions(dimensions)
|
||||
@@ -225,8 +203,26 @@ class Metrics(monasca_api_v2.V2API):
|
||||
end_timestamp = helpers.get_query_endtime_timestamp(req, False)
|
||||
statistics = helpers.get_query_statistics(req)
|
||||
period = helpers.get_query_period(req)
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
merge_metrics_flag = helpers.get_query_param(req, 'merge_metrics',
|
||||
False,
|
||||
False)
|
||||
|
||||
merge_metrics_flag = (
|
||||
self._get_boolean_merge_metrics_flag(merge_metrics_flag))
|
||||
|
||||
result = self._metric_statistics(tenant_id, name, dimensions,
|
||||
start_timestamp, end_timestamp,
|
||||
statistics, period)
|
||||
statistics, period, req.uri,
|
||||
offset, limit, merge_metrics_flag)
|
||||
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
def _get_boolean_merge_metrics_flag(self, merge_metrics_flag_str):
|
||||
|
||||
if merge_metrics_flag_str is not False:
|
||||
return helpers.str_2_bool(merge_metrics_flag_str)
|
||||
else:
|
||||
return False
|
||||
|
@@ -99,14 +99,15 @@ class Notifications(monasca_notifications_api_v2.NotificationsV2API):
|
||||
return helpers.add_links_to_resource(response, uri)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _list_notifications(self, tenant_id, uri, offset):
|
||||
def _list_notifications(self, tenant_id, uri, offset, limit):
|
||||
|
||||
rows = self._notifications_repo.list_notifications(tenant_id, offset)
|
||||
rows = self._notifications_repo.list_notifications(tenant_id, offset,
|
||||
limit)
|
||||
|
||||
result = [self._build_notification_result(row,
|
||||
uri) for row in rows]
|
||||
|
||||
return helpers.paginate(result, uri, offset)
|
||||
return helpers.paginate(result, uri, limit)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _list_notification(self, tenant_id, notification_id, uri):
|
||||
@@ -151,9 +152,9 @@ class Notifications(monasca_notifications_api_v2.NotificationsV2API):
|
||||
def do_get_notification_methods(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
result = self._list_notifications(tenant_id, req.uri, offset)
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
result = self._list_notifications(tenant_id, req.uri, offset, limit)
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
|
@@ -41,7 +41,9 @@ def resource_try_catch_block(fun):
|
||||
except exceptions.RepositoryException as ex:
|
||||
LOG.exception(ex)
|
||||
msg = " ".join(map(str, ex.message.args))
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', msg)
|
||||
raise falcon.HTTPInternalServerError('The repository was unable '
|
||||
'to process your request',
|
||||
msg)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', ex)
|
||||
|
@@ -108,9 +108,8 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(req,
|
||||
'offset'))
|
||||
limit = helpers.get_query_param(req, 'limit')
|
||||
offset = helpers.get_query_param(req, 'offset')
|
||||
limit = helpers.get_limit(req)
|
||||
result = self._stream_definition_list(tenant_id, name,
|
||||
req.uri, offset, limit)
|
||||
|
||||
@@ -219,7 +218,7 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
||||
helpers.add_links_to_resource(sd, req_uri)
|
||||
result.append(sd)
|
||||
|
||||
result = helpers.paginate(result, req_uri, offset)
|
||||
result = helpers.paginate(result, req_uri, limit)
|
||||
|
||||
return result
|
||||
|
||||
|
@@ -37,5 +37,7 @@ peewee>=2.4.7
|
||||
pyparsing>=2.0.3
|
||||
voluptuous>=0.8.7
|
||||
MySQL-python>=1.2.3
|
||||
influxdb>=0.2.0
|
||||
influxdb>=1.0.0
|
||||
eventlet
|
||||
kafka-python>=0.9.2,<0.9.3
|
||||
|
||||
|
Reference in New Issue
Block a user