Merge "Implements base method for time series metrics"

This commit is contained in:
Zuul 2020-09-27 02:45:20 +00:00 committed by Gerrit Code Review
commit 45dca00dee
7 changed files with 191 additions and 13 deletions

View File

@ -19,6 +19,8 @@ import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher.common import exception
CONF = cfg.CONF CONF = cfg.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -54,6 +56,13 @@ class DataSourceBase(object):
instance_root_disk_size=None, instance_root_disk_size=None,
) )
def _get_meter(self, meter_name):
"""Retrieve the meter from the metric map or raise error"""
meter = self.METRIC_MAP.get(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
return meter
def query_retry(self, f, *args, **kwargs): def query_retry(self, f, *args, **kwargs):
"""Attempts to retrieve metrics from the external service """Attempts to retrieve metrics from the external service
@ -122,6 +131,30 @@ class DataSourceBase(object):
pass pass
@abc.abstractmethod
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
"""Retrieves metrics based on the specified parameters over a period
:param resource: Resource object as defined in watcher models such as
ComputeNode and Instance
:param resource_type: Indicates which type of object is supplied
to the resource parameter
:param meter_name: The desired metric to retrieve as key from
METRIC_MAP
:param start_time: The datetime to start retrieving metrics for
:type start_time: datetime.datetime
:param end_time: The datetime to limit the retrieval of metrics to
:type end_time: datetime.datetime
:param granularity: Interval between samples in measurements in
seconds
:return: Dictionary of key value pairs with timestamps and metric
values
"""
pass
@abc.abstractmethod @abc.abstractmethod
def get_host_cpu_usage(self, resource, period, aggregate, def get_host_cpu_usage(self, resource, period, aggregate,
granularity=None): granularity=None):

View File

@ -161,9 +161,7 @@ class CeilometerHelper(base.DataSourceBase):
end_time = datetime.datetime.utcnow() end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(seconds=int(period)) start_time = end_time - datetime.timedelta(seconds=int(period))
meter = self.METRIC_MAP.get(meter_name) meter = self._get_meter(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'mean': if aggregate == 'mean':
aggregate = 'avg' aggregate = 'avg'
@ -194,6 +192,12 @@ class CeilometerHelper(base.DataSourceBase):
item_value *= 10 item_value *= 10
return item_value return item_value
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
raise NotImplementedError(
_('Ceilometer helper does not support statistic series method'))
def get_host_cpu_usage(self, resource, period, def get_host_cpu_usage(self, resource, period,
aggregate, granularity=None): aggregate, granularity=None):

View File

@ -23,7 +23,6 @@ from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher.common import clients from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base from watcher.decision_engine.datasources import base
CONF = cfg.CONF CONF = cfg.CONF
@ -72,9 +71,7 @@ class GnocchiHelper(base.DataSourceBase):
stop_time = datetime.utcnow() stop_time = datetime.utcnow()
start_time = stop_time - timedelta(seconds=(int(period))) start_time = stop_time - timedelta(seconds=(int(period)))
meter = self.METRIC_MAP.get(meter_name) meter = self._get_meter(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'count': if aggregate == 'count':
aggregate = 'mean' aggregate = 'mean'
@ -123,6 +120,52 @@ class GnocchiHelper(base.DataSourceBase):
return return_value return return_value
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
meter = self._get_meter(meter_name)
resource_id = resource.uuid
if resource_type == 'compute_node':
resource_id = "%s_%s" % (resource.hostname, resource.hostname)
kwargs = dict(query={"=": {"original_resource_id": resource_id}},
limit=1)
resources = self.query_retry(
f=self.gnocchi.resource.search, **kwargs)
if not resources:
LOG.warning("The {0} resource {1} could not be "
"found".format(self.NAME, resource_id))
return
resource_id = resources[0]['id']
raw_kwargs = dict(
metric=meter,
start=start_time,
stop=end_time,
resource_id=resource_id,
granularity=granularity,
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
statistics = self.query_retry(
f=self.gnocchi.metric.get_measures, **kwargs)
return_value = None
if statistics:
# measure has structure [time, granularity, value]
if meter_name == 'host_airflow':
# Airflow from hardware.ipmi.node.airflow is reported as
# 1/10 th of actual CFM
return_value = {s[0]: s[2]*10 for s in statistics}
else:
return_value = {s[0]: s[2] for s in statistics}
return return_value
def get_host_cpu_usage(self, resource, period, aggregate, def get_host_cpu_usage(self, resource, period, aggregate,
granularity=300): granularity=300):

View File

@ -21,6 +21,7 @@ from urllib import parse as urlparse
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher._i18n import _
from watcher.common import clients from watcher.common import clients
from watcher.common import exception from watcher.common import exception
from watcher.decision_engine.datasources import base from watcher.decision_engine.datasources import base
@ -188,6 +189,12 @@ class GrafanaHelper(base.DataSourceBase):
return result return result
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
raise NotImplementedError(
_('Grafana helper does not support statistic series method'))
def get_host_cpu_usage(self, resource, period=300, def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None): aggregate="mean", granularity=None):
return self.statistic_aggregation( return self.statistic_aggregation(

View File

@ -21,7 +21,6 @@ import datetime
from monascaclient import exc from monascaclient import exc
from watcher.common import clients from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base from watcher.decision_engine.datasources import base
@ -90,9 +89,7 @@ class MonascaHelper(base.DataSourceBase):
stop_time = datetime.datetime.utcnow() stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(seconds=(int(period))) start_time = stop_time - datetime.timedelta(seconds=(int(period)))
meter = self.METRIC_MAP.get(meter_name) meter = self._get_meter(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'mean': if aggregate == 'mean':
aggregate = 'avg' aggregate = 'avg'
@ -121,6 +118,34 @@ class MonascaHelper(base.DataSourceBase):
return cpu_usage return cpu_usage
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
meter = self._get_meter(meter_name)
raw_kwargs = dict(
name=meter,
start_time=start_time.isoformat(),
end_time=end_time.isoformat(),
dimensions={'hostname': resource.uuid},
statistics='avg',
group_by='*',
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
statistics = self.query_retry(
f=self.monasca.metrics.list_statistics, **kwargs)
result = {}
for stat in statistics:
v_index = stat['columns'].index('avg')
t_index = stat['columns'].index('timestamp')
result.update({r[t_index]: r[v_index] for r in stat['statistics']})
return result
def get_host_cpu_usage(self, resource, period, def get_host_cpu_usage(self, resource, period,
aggregate, granularity=None): aggregate, granularity=None):
return self.statistic_aggregation( return self.statistic_aggregation(

View File

@ -13,7 +13,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime
from unittest import mock from unittest import mock
from oslo_config import cfg from oslo_config import cfg
@ -59,6 +59,35 @@ class TestGnocchiHelper(base.BaseTestCase):
) )
self.assertEqual(expected_result, result) self.assertEqual(expected_result, result)
def test_gnocchi_statistic_series(self, mock_gnocchi):
gnocchi = mock.MagicMock()
expected_result = {
"2017-02-02T09:00:00.000000": 5.5,
"2017-02-02T09:03:60.000000": 5.8
}
expected_measures = [
["2017-02-02T09:00:00.000000", 360, 5.5],
["2017-02-02T09:03:60.000000", 360, 5.8]
]
gnocchi.metric.get_measures.return_value = expected_measures
mock_gnocchi.return_value = gnocchi
start = datetime(year=2017, month=2, day=2, hour=9, minute=0)
end = datetime(year=2017, month=2, day=2, hour=9, minute=4)
helper = gnocchi_helper.GnocchiHelper()
result = helper.statistic_series(
resource=mock.Mock(id='16a86790-327a-45f9-bc82-45839f062fdc'),
resource_type='instance',
meter_name='instance_cpu_usage',
start_time=start,
end_time=end,
granularity=360,
)
self.assertEqual(expected_result, result)
def test_statistic_aggregation_metric_unavailable(self, mock_gnocchi): def test_statistic_aggregation_metric_unavailable(self, mock_gnocchi):
helper = gnocchi_helper.GnocchiHelper() helper = gnocchi_helper.GnocchiHelper()

View File

@ -13,7 +13,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime
from unittest import mock from unittest import mock
from oslo_config import cfg from oslo_config import cfg
@ -67,6 +67,43 @@ class TestMonascaHelper(base.BaseTestCase):
) )
self.assertEqual(0.6, result) self.assertEqual(0.6, result)
def test_monasca_statistic_series(self, mock_monasca):
monasca = mock.MagicMock()
expected_stat = [{
'columns': ['timestamp', 'avg'],
'dimensions': {
'hostname': 'rdev-indeedsrv001',
'service': 'monasca'},
'id': '0',
'name': 'cpu.percent',
'statistics': [
['2016-07-29T12:45:00Z', 0.0],
['2016-07-29T12:50:00Z', 0.9],
['2016-07-29T12:55:00Z', 0.9]]}]
expected_result = {
'2016-07-29T12:45:00Z': 0.0,
'2016-07-29T12:50:00Z': 0.9,
'2016-07-29T12:55:00Z': 0.9,
}
monasca.metrics.list_statistics.return_value = expected_stat
mock_monasca.return_value = monasca
start = datetime(year=2016, month=7, day=29, hour=12, minute=45)
end = datetime(year=2016, month=7, day=29, hour=12, minute=55)
helper = monasca_helper.MonascaHelper()
result = helper.statistic_series(
resource=mock.Mock(id='NODE_UUID'),
resource_type='compute_node',
meter_name='host_cpu_usage',
start_time=start,
end_time=end,
granularity=300,
)
self.assertEqual(expected_result, result)
def test_statistic_aggregation_metric_unavailable(self, mock_monasca): def test_statistic_aggregation_metric_unavailable(self, mock_monasca):
helper = monasca_helper.MonascaHelper() helper = monasca_helper.MonascaHelper()