Move datasource query_retry into baseclass.

Moves the query_retry method into the baseclass and makes the query
retry and timeout options part of the watcher_datasources config group.
This makes the query_retry behavior uniform across all datasources.

A new baseclass method named query_retry_reset is added so datasources
can define operations to perform when recovering from a query error.
Test cases are added to verify the behavior of query_retry.

The query_max_retries and query_timeout config parameters are
deprecated in the gnocchi_client group and will be removed in a future
release.

Change-Id: I33e9dc2d1f5ba8f83fcf1488ff583ca5be5529cc
This commit is contained in:
Dantali0n 2019-05-29 15:16:15 +02:00
parent 46a36d1ad7
commit 584eeefdc8
9 changed files with 141 additions and 43 deletions

View File

@ -0,0 +1,17 @@
---
features:
- |
All datasources can now be configured to retry retrieving a metric upon
encountering an error. Between each attempt will be a set amount of time
which can be adjusted from the configuration. These configuration
options can be found in the `[watcher_datasources]` group and are named
`query_max_retries` and `query_timeout`.
upgrade:
- |
If Gnocchi was configured to have a custom amount of retries and or a
custom timeout then the configuration needs to moved into the
`[watcher_datasources]` group instead of the `[gnocchi_client]` group.
deprecations:
- |
The configuration options for query retries in `[gnocchi_client]` are
deprecated and the option in `[watcher_datasources]` should now be used.

View File

@ -34,7 +34,19 @@ DATASOURCES_OPTS = [
" the default for all strategies unless a strategy has a" " the default for all strategies unless a strategy has a"
" specific override.", " specific override.",
item_type=cfg.types.String(choices=possible_datasources), item_type=cfg.types.String(choices=possible_datasources),
default=possible_datasources) default=possible_datasources),
cfg.IntOpt('query_max_retries',
min=1,
default=10,
mutable=True,
help='How many times Watcher is trying to query again',
deprecated_group="gnocchi_client"),
cfg.IntOpt('query_timeout',
min=0,
default=1,
mutable=True,
help='How many seconds Watcher should wait to do query again',
deprecated_group="gnocchi_client")
] ]

View File

@ -32,15 +32,8 @@ GNOCCHI_CLIENT_OPTS = [
'The default is public.'), 'The default is public.'),
cfg.StrOpt('region_name', cfg.StrOpt('region_name',
help='Region in Identity service catalog to use for ' help='Region in Identity service catalog to use for '
'communication with the OpenStack service.'), 'communication with the OpenStack service.')
cfg.IntOpt('query_max_retries', ]
default=10,
mutable=True,
help='How many times Watcher is trying to query again'),
cfg.IntOpt('query_timeout',
default=1,
mutable=True,
help='How many seconds Watcher should wait to do query again')]
def register_opts(conf): def register_opts(conf):

View File

@ -14,6 +14,15 @@
# limitations under the License. # limitations under the License.
import abc import abc
import time
from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class DataSourceBase(object): class DataSourceBase(object):
@ -30,6 +39,9 @@ class DataSourceBase(object):
"""Possible options for the parameters named resource_type""" """Possible options for the parameters named resource_type"""
RESOURCE_TYPES = ['compute_node', 'instance', 'bare_metal', 'storage'] RESOURCE_TYPES = ['compute_node', 'instance', 'bare_metal', 'storage']
"""Each datasource should have a uniquely identifying name"""
NAME = ''
"""Possible metrics a datasource can support and their internal name""" """Possible metrics a datasource can support and their internal name"""
METRIC_MAP = dict(host_cpu_usage=None, METRIC_MAP = dict(host_cpu_usage=None,
host_ram_usage=None, host_ram_usage=None,
@ -44,8 +56,7 @@ class DataSourceBase(object):
instance_root_disk_size=None, instance_root_disk_size=None,
) )
@abc.abstractmethod def query_retry(self, f, *args, **kwargs):
def query_retry(self, f, *args, **kargs):
"""Attempts to retrieve metrics from the external service """Attempts to retrieve metrics from the external service
Attempts to access data from the external service and handles Attempts to access data from the external service and handles
@ -53,9 +64,26 @@ class DataSourceBase(object):
to the value of query_max_retries to the value of query_max_retries
:param f: The method that performs the actual querying for metrics :param f: The method that performs the actual querying for metrics
:param args: Array of arguments supplied to the method :param args: Array of arguments supplied to the method
:param kargs: The amount of arguments supplied to the method :param kwargs: The amount of arguments supplied to the method
:return: The value as retrieved from the external service :return: The value as retrieved from the external service
""" """
num_retries = CONF.watcher_datasources.query_max_retries
timeout = CONF.watcher_datasources.query_timeout
for i in range(num_retries):
try:
return f(*args, **kwargs)
except Exception as e:
LOG.exception(e)
self.query_retry_reset(e)
LOG.warning("Retry {0} of {1} while retrieving metrics retry "
"in {2} seconds".format(i+1, num_retries, timeout))
time.sleep(timeout)
raise exception.DataSourceNotAvailable(datasource=self.NAME)
@abc.abstractmethod
def query_retry_reset(self, exception_instance):
"""Abstract method to perform reset operations upon request failure"""
pass pass
@abc.abstractmethod @abc.abstractmethod

View File

@ -129,15 +129,10 @@ class CeilometerHelper(base.DataSourceBase):
"value": end_timestamp}) "value": end_timestamp})
return query return query
def query_retry(self, f, *args, **kargs): def query_retry_reset(self, exception_instance):
try: if isinstance(exception_instance, exc.HTTPUnauthorized):
return f(*args, **kargs)
except exc.HTTPUnauthorized:
self.osc.reset_clients() self.osc.reset_clients()
self.ceilometer = self.osc.ceilometer() self.ceilometer = self.osc.ceilometer()
return f(*args, **kargs)
except Exception:
raise
def list_metrics(self): def list_metrics(self):
"""List the user's meters.""" """List the user's meters."""

View File

@ -18,7 +18,6 @@
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -52,16 +51,6 @@ class GnocchiHelper(base.DataSourceBase):
self.osc = osc if osc else clients.OpenStackClients() self.osc = osc if osc else clients.OpenStackClients()
self.gnocchi = self.osc.gnocchi() self.gnocchi = self.osc.gnocchi()
def query_retry(self, f, *args, **kwargs):
# TODO(Dantali0n) move gnocchi query_max_retries into general config
for i in range(CONF.gnocchi_client.query_max_retries):
try:
return f(*args, **kwargs)
except Exception as e:
LOG.exception(e)
time.sleep(CONF.gnocchi_client.query_timeout)
raise exception.DataSourceNotAvailable(datasource='gnocchi')
def check_availability(self): def check_availability(self):
try: try:
self.query_retry(self.gnocchi.status.get) self.query_retry(self.gnocchi.status.get)

View File

@ -46,16 +46,6 @@ class MonascaHelper(base.DataSourceBase):
self.osc = osc if osc else clients.OpenStackClients() self.osc = osc if osc else clients.OpenStackClients()
self.monasca = self.osc.monasca() self.monasca = self.osc.monasca()
def query_retry(self, f, *args, **kwargs):
try:
return f(*args, **kwargs)
except exc.Unauthorized:
self.osc.reset_clients()
self.monasca = self.osc.monasca()
return f(*args, **kwargs)
except Exception:
raise
def _format_time_params(self, start_time, end_time, period): def _format_time_params(self, start_time, end_time, period):
"""Format time-related params to the correct Monasca format """Format time-related params to the correct Monasca format
@ -77,6 +67,11 @@ class MonascaHelper(base.DataSourceBase):
return start_timestamp, end_timestamp, period return start_timestamp, end_timestamp, period
def query_retry_reset(self, exception_instance):
if isinstance(exception_instance, exc.Unauthorized):
self.osc.reset_clients()
self.monasca = self.osc.monasca()
def check_availability(self): def check_availability(self):
try: try:
self.query_retry(self.monasca.metrics.list) self.query_retry(self.monasca.metrics.list)

View File

@ -0,0 +1,69 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from watcher.common import exception
from watcher.datasources import base as datasource
from watcher.tests import base
CONF = cfg.CONF
class TestBaseDatasourceHelper(base.BaseTestCase):
def test_query_retry(self):
exc = Exception()
method = mock.Mock()
# first call will fail but second will succeed
method.side_effect = [exc, True]
# Max 2 attempts
CONF.set_override("query_max_retries", 2,
group='watcher_datasources')
# Reduce sleep time to 0
CONF.set_override("query_timeout", 0,
group='watcher_datasources')
helper = datasource.DataSourceBase()
helper.query_retry_reset = mock.Mock()
self.assertTrue(helper.query_retry(f=method))
helper.query_retry_reset.assert_called_once_with(exc)
def test_query_retry_exception(self):
exc = Exception()
method = mock.Mock()
# only third call will succeed
method.side_effect = [exc, exc, True]
# Max 2 attempts
CONF.set_override("query_max_retries", 2,
group='watcher_datasources')
# Reduce sleep time to 0
CONF.set_override("query_timeout", 0,
group='watcher_datasources')
helper = datasource.DataSourceBase()
helper.query_retry_reset = mock.Mock()
# Maximum number of retries exceeded query_retry should raise error
self.assertRaises(exception.DataSourceNotAvailable,
helper.query_retry, f=method)
# query_retry_reset should be called twice
helper.query_retry_reset.assert_has_calls(
[mock.call(exc), mock.call(exc)])

View File

@ -127,7 +127,7 @@ class TestGnocchiHelper(base.BaseTestCase):
def test_gnocchi_check_availability_with_failure(self, mock_gnocchi): def test_gnocchi_check_availability_with_failure(self, mock_gnocchi):
cfg.CONF.set_override("query_max_retries", 1, cfg.CONF.set_override("query_max_retries", 1,
group='gnocchi_client') group='watcher_datasources')
gnocchi = mock.MagicMock() gnocchi = mock.MagicMock()
gnocchi.status.get.side_effect = Exception() gnocchi.status.get.side_effect = Exception()
mock_gnocchi.return_value = gnocchi mock_gnocchi.return_value = gnocchi
@ -147,7 +147,7 @@ class TestGnocchiHelper(base.BaseTestCase):
def test_gnocchi_list_metrics_with_failure(self, mock_gnocchi): def test_gnocchi_list_metrics_with_failure(self, mock_gnocchi):
cfg.CONF.set_override("query_max_retries", 1, cfg.CONF.set_override("query_max_retries", 1,
group='gnocchi_client') group='watcher_datasources')
gnocchi = mock.MagicMock() gnocchi = mock.MagicMock()
gnocchi.metric.list.side_effect = Exception() gnocchi.metric.list.side_effect = Exception()
mock_gnocchi.return_value = gnocchi mock_gnocchi.return_value = gnocchi