From aa67c96b68451617fe1877755b7fdb038c4812ad Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 28 Jul 2015 15:38:35 +0200 Subject: [PATCH] gnocchi: retry with a new token on 401 The gnocchi dispatcher just bail out on any unexpected Gnocchi HTTP code. This change moves all Gnocchi API and HTTP stuffs in the gnocchi_client module. And improves the handling of 401 error code to retry requests when the authentication fail, usualy because the token have expired. Change-Id: I9d7e29337de2cb480c64af03032dcf9fa2a3829c --- ceilometer/dispatcher/gnocchi.py | 176 ++--------------- ceilometer/dispatcher/gnocchi_client.py | 197 ++++++++++++++++++++ ceilometer/tests/dispatcher/test_gnocchi.py | 39 +++- 3 files changed, 243 insertions(+), 169 deletions(-) create mode 100644 ceilometer/dispatcher/gnocchi_client.py diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index fa15070e..3b2c396e 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -17,7 +17,6 @@ # under the License. import fnmatch import itertools -import json import operator import os import threading @@ -25,11 +24,11 @@ import threading import jsonpath_rw from oslo_config import cfg from oslo_log import log -import requests import six import yaml from ceilometer import dispatcher +from ceilometer.dispatcher import gnocchi_client from ceilometer.i18n import _, _LE from ceilometer import keystone_client @@ -65,35 +64,11 @@ dispatcher_opts = [ cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi") -class UnexpectedWorkflowError(Exception): - pass - - -class NoSuchMetric(Exception): - pass - - -class MetricAlreadyExists(Exception): - pass - - -class NoSuchResource(Exception): - pass - - -class ResourceAlreadyExists(Exception): - pass - - def log_and_ignore_unexpected_workflow_error(func): def log_and_ignore(self, *args, **kwargs): try: func(self, *args, **kwargs) - except requests.ConnectionError as e: - with self._gnocchi_api_lock: - self._gnocchi_api = None - LOG.warn("Connection error, reconnecting...") - except UnexpectedWorkflowError as e: + except gnocchi_client.UnexpectedError as e: LOG.error(six.text_type(e)) return log_and_ignore @@ -197,20 +172,13 @@ class GnocchiDispatcher(dispatcher.Base): self.filter_service_activity = ( conf.dispatcher_gnocchi.filter_service_activity) self._ks_client = keystone_client.get_client() - self.gnocchi_url = conf.dispatcher_gnocchi.url self.gnocchi_archive_policy_data = self._load_archive_policy(conf) self.resources_definition = self._load_resources_definitions(conf) self._gnocchi_project_id = None self._gnocchi_project_id_lock = threading.Lock() - self._gnocchi_api = None - self._gnocchi_api_lock = threading.Lock() - def _get_headers(self, content_type="application/json"): - return { - 'Content-Type': content_type, - 'X-Auth-Token': self._ks_client.auth_token, - } + self._gnocchi = gnocchi_client.Client(conf.dispatcher_gnocchi.url) # TODO(sileht): Share yaml loading with # event converter and declarative notification @@ -268,23 +236,6 @@ class GnocchiDispatcher(dispatcher.Base): self.gnocchi_project_id) return self._gnocchi_project_id - @property - def gnocchi_api(self): - """return a working requests session object""" - if self._gnocchi_api is not None: - return self._gnocchi_api - - with self._gnocchi_api_lock: - if self._gnocchi_api is None: - self._gnocchi_api = requests.session() - # NOTE(sileht): wait when the pool is empty - # instead of raising errors. - adapter = requests.adapters.HTTPAdapter(pool_block=True) - self._gnocchi_api.mount("http://", adapter) - self._gnocchi_api.mount("https://", adapter) - - return self._gnocchi_api - def _is_swift_account_sample(self, sample): return bool([rd for rd in self.resources_definition if rd.cfg['resource_type'] == 'swift_account' @@ -328,11 +279,6 @@ class GnocchiDispatcher(dispatcher.Base): @log_and_ignore_unexpected_workflow_error def _process_resource(self, resource_id, metric_grouped_samples): - # TODO(sileht): Any HTTP 50X/401 error is just logged and this method - # stop, perhaps we can be smarter and retry later in case of 50X and - # directly in case of 401. A gnocchiclient would help a lot for the - # latest. - resource_extra = {} for metric_name, samples in metric_grouped_samples: samples = list(samples) @@ -359,9 +305,9 @@ class GnocchiDispatcher(dispatcher.Base): resource.update(resource_extra) try: - self._post_measure(resource_type, resource_id, metric_name, - measures) - except NoSuchMetric: + self._gnocchi.post_measure(resource_type, resource_id, + metric_name, measures) + except gnocchi_client.NoSuchMetric: # TODO(sileht): Make gnocchi smarter to be able to detect 404 # for 'resource doesn't exist' and for 'metric doesn't exist' # https://bugs.launchpad.net/gnocchi/+bug/1476186 @@ -369,122 +315,32 @@ class GnocchiDispatcher(dispatcher.Base): metric_name) try: - self._post_measure(resource_type, resource_id, - metric_name, measures) - except NoSuchMetric: + self._gnocchi.post_measure(resource_type, resource_id, + metric_name, measures) + except gnocchi_client.NoSuchMetric: LOG.error(_LE("Fail to post measures for " "%(resource_id)s/%(metric_name)s") % dict(resource_id=resource_id, metric_name=metric_name)) if resource_extra: - self._update_resource(resource_type, resource_id, resource_extra) + self._gnocchi.update_resource(resource_type, resource_id, + resource_extra) def _ensure_resource_and_metric(self, resource_type, resource, metric_name): try: - self._create_resource(resource_type, resource) - except ResourceAlreadyExists: + self._gnocchi.create_resource(resource_type, resource) + except gnocchi_client.ResourceAlreadyExists: try: archive_policy = resource['metrics'][metric_name] - self._create_metric(resource_type, resource['id'], - metric_name, archive_policy) - except MetricAlreadyExists: + self._gnocchi.create_metric(resource_type, resource['id'], + metric_name, archive_policy) + except gnocchi_client.MetricAlreadyExists: # NOTE(sileht): Just ignore the metric have been # created in the meantime. pass - def _post_measure(self, resource_type, resource_id, metric_name, - measure_attributes): - r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric/%s/measures" - % (self.gnocchi_url, resource_type, - resource_id, metric_name), - headers=self._get_headers(), - data=json.dumps(measure_attributes)) - if r.status_code == 404: - LOG.debug(_("The metric %(metric_name)s of " - "resource %(resource_id)s doesn't exists: " - "%(status_code)d"), - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code}) - raise NoSuchMetric - elif r.status_code // 100 != 2: - raise UnexpectedWorkflowError( - _("Fail to post measure on metric %(metric_name)s of " - "resource %(resource_id)s with status: " - "%(status_code)d: %(msg)s") % - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Measure posted on metric %s of resource %s", - metric_name, resource_id) - - def _create_resource(self, resource_type, resource): - r = self.gnocchi_api.post("%s/v1/resource/%s" - % (self.gnocchi_url, resource_type), - headers=self._get_headers(), - data=json.dumps(resource)) - if r.status_code == 409: - LOG.debug("Resource %s already exists", resource['id']) - raise ResourceAlreadyExists - - elif r.status_code // 100 != 2: - raise UnexpectedWorkflowError( - _("Resource %(resource_id)s creation failed with " - "status: %(status_code)d: %(msg)s") % - {'resource_id': resource['id'], - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Resource %s created", resource['id']) - - def _update_resource(self, resource_type, resource_id, - resource_extra): - r = self.gnocchi_api.patch( - "%s/v1/resource/%s/%s" - % (self.gnocchi_url, resource_type, resource_id), - headers=self._get_headers(), - data=json.dumps(resource_extra)) - - if r.status_code // 100 != 2: - raise UnexpectedWorkflowError( - _("Resource %(resource_id)s update failed with " - "status: %(status_code)d: %(msg)s") % - {'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Resource %s updated", resource_id) - - def _create_metric(self, resource_type, resource_id, metric_name, - archive_policy): - params = {metric_name: archive_policy} - r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric" - % (self.gnocchi_url, resource_type, - resource_id), - headers=self._get_headers(), - data=json.dumps(params)) - if r.status_code == 409: - LOG.debug("Metric %s of resource %s already exists", - metric_name, resource_id) - raise MetricAlreadyExists - - elif r.status_code // 100 != 2: - raise UnexpectedWorkflowError( - _("Fail to create metric %(metric_name)s of " - "resource %(resource_id)s with status: " - "%(status_code)d: %(msg)s") % - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Metric %s of resource %s created", - metric_name, resource_id) - @staticmethod def record_events(events): raise NotImplementedError diff --git a/ceilometer/dispatcher/gnocchi_client.py b/ceilometer/dispatcher/gnocchi_client.py new file mode 100644 index 00000000..bad7acd8 --- /dev/null +++ b/ceilometer/dispatcher/gnocchi_client.py @@ -0,0 +1,197 @@ +# +# Copyright 2015 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import json + +from oslo_log import log +import requests +import retrying + +from ceilometer.i18n import _ +from ceilometer import keystone_client + +LOG = log.getLogger(__name__) + + +class UnexpectedError(Exception): + pass + + +class AuthenticationError(Exception): + pass + + +class NoSuchMetric(Exception): + pass + + +class MetricAlreadyExists(Exception): + pass + + +class NoSuchResource(Exception): + pass + + +class ResourceAlreadyExists(Exception): + pass + + +def retry_if_authentication_error(exception): + return isinstance(exception, AuthenticationError) + + +def maybe_retry_if_authentication_error(): + return retrying.retry(retry_on_exception=retry_if_authentication_error, + wait_fixed=2000, + stop_max_delay=60000) + + +class GnocchiSession(object): + def __init__(self): + self._session = requests.session() + # NOTE(sileht): wait when the pool is empty + # instead of raising errors. + adapter = requests.adapters.HTTPAdapter( + pool_block=True) + self._session.mount("http://", adapter) + self._session.mount("https://", adapter) + + self.post = functools.partial(self._do_method, method='post') + self.patch = functools.partial(self._do_method, method='patch') + + def _do_method(self, *args, **kwargs): + method = kwargs.pop('method') + try: + response = getattr(self._session, method)(*args, **kwargs) + except requests.ConnectionError as e: + raise UnexpectedError("Connection error: %s " % e) + + if response.status_code == 401: + LOG.info("Authentication failure, retrying...") + raise AuthenticationError() + + return response + + +class Client(object): + def __init__(self, url): + self._gnocchi_url = url + self._ks_client = keystone_client.get_client() + self._session = GnocchiSession() + + def _get_headers(self, content_type="application/json"): + return { + 'Content-Type': content_type, + 'X-Auth-Token': self._ks_client.auth_token, + } + + @maybe_retry_if_authentication_error() + def post_measure(self, resource_type, resource_id, metric_name, + measure_attributes): + r = self._session.post("%s/v1/resource/%s/%s/metric/%s/measures" + % (self._gnocchi_url, resource_type, + resource_id, metric_name), + headers=self._get_headers(), + data=json.dumps(measure_attributes)) + + if r.status_code == 404: + LOG.debug(_("The metric %(metric_name)s of " + "resource %(resource_id)s doesn't exists: " + "%(status_code)d"), + {'metric_name': metric_name, + 'resource_id': resource_id, + 'status_code': r.status_code}) + raise NoSuchMetric + elif r.status_code // 100 != 2: + raise UnexpectedError( + _("Fail to post measure on metric %(metric_name)s of " + "resource %(resource_id)s with status: " + "%(status_code)d: %(msg)s") % + {'metric_name': metric_name, + 'resource_id': resource_id, + 'status_code': r.status_code, + 'msg': r.text}) + else: + LOG.debug("Measure posted on metric %s of resource %s", + metric_name, resource_id) + + @maybe_retry_if_authentication_error() + def create_resource(self, resource_type, resource): + r = self._session.post("%s/v1/resource/%s" + % (self._gnocchi_url, resource_type), + headers=self._get_headers(), + data=json.dumps(resource)) + + if r.status_code == 409: + LOG.debug("Resource %s already exists", resource['id']) + raise ResourceAlreadyExists + + elif r.status_code // 100 != 2: + raise UnexpectedError( + _("Resource %(resource_id)s creation failed with " + "status: %(status_code)d: %(msg)s") % + {'resource_id': resource['id'], + 'status_code': r.status_code, + 'msg': r.text}) + else: + LOG.debug("Resource %s created", resource['id']) + + @maybe_retry_if_authentication_error() + def update_resource(self, resource_type, resource_id, + resource_extra): + r = self._session.patch( + "%s/v1/resource/%s/%s" + % (self._gnocchi_url, resource_type, resource_id), + headers=self._get_headers(), + data=json.dumps(resource_extra)) + + if r.status_code // 100 != 2: + raise UnexpectedError( + _("Resource %(resource_id)s update failed with " + "status: %(status_code)d: %(msg)s") % + {'resource_id': resource_id, + 'status_code': r.status_code, + 'msg': r.text}) + else: + LOG.debug("Resource %s updated", resource_id) + + @maybe_retry_if_authentication_error() + def create_metric(self, resource_type, resource_id, metric_name, + archive_policy): + params = {metric_name: archive_policy} + r = self._session.post("%s/v1/resource/%s/%s/metric" + % (self._gnocchi_url, resource_type, + resource_id), + headers=self._get_headers(), + data=json.dumps(params)) + if r.status_code == 409: + LOG.debug("Metric %s of resource %s already exists", + metric_name, resource_id) + raise MetricAlreadyExists + + elif r.status_code // 100 != 2: + raise UnexpectedError( + _("Fail to create metric %(metric_name)s of " + "resource %(resource_id)s with status: " + "%(status_code)d: %(msg)s") % + {'metric_name': metric_name, + 'resource_id': resource_id, + 'status_code': r.status_code, + 'msg': r.text}) + else: + LOG.debug("Metric %s of resource %s created", + metric_name, resource_id) diff --git a/ceilometer/tests/dispatcher/test_gnocchi.py b/ceilometer/tests/dispatcher/test_gnocchi.py index f38adce0..1c48c222 100644 --- a/ceilometer/tests/dispatcher/test_gnocchi.py +++ b/ceilometer/tests/dispatcher/test_gnocchi.py @@ -306,6 +306,8 @@ class DispatcherWorkflowTest(base.BaseTestCase, measure_retry=500, patch_resource=None)), ('measure_fail', dict(measure=500, post_resource=None, metric=None, measure_retry=None, patch_resource=None)), + ('measure_auth', dict(measure=401, post_resource=None, metric=None, + measure_retry=None, patch_resource=204)), ] @classmethod @@ -326,6 +328,7 @@ class DispatcherWorkflowTest(base.BaseTestCase, self.useFixture(mockpatch.Patch( 'ceilometer.keystone_client.get_client', return_value=ks_client)) + self.ks_client = ks_client ceilometer_service.prepare_service([]) self.conf.config( @@ -334,12 +337,14 @@ class DispatcherWorkflowTest(base.BaseTestCase, group="dispatcher_gnocchi" ) - self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf) self.sample['resource_id'] = str(uuid.uuid4()) @mock.patch('ceilometer.dispatcher.gnocchi.LOG') - @mock.patch('ceilometer.dispatcher.gnocchi.requests') - def test_workflow(self, fake_requests, logger): + @mock.patch('ceilometer.dispatcher.gnocchi_client.LOG') + @mock.patch('ceilometer.dispatcher.gnocchi_client.requests') + def test_workflow(self, fake_requests, client_logger, logger): + self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf) + base_url = self.dispatcher.conf.dispatcher_gnocchi.url url_params = { 'url': urlparse.urljoin(base_url, '/v1/resource'), @@ -350,14 +355,13 @@ class DispatcherWorkflowTest(base.BaseTestCase, headers = {'Content-Type': 'application/json', 'X-Auth-Token': 'fake_token'} - expected_calls = [] patch_responses = [] post_responses = [] # This is needed to mock Exception in py3 fake_requests.ConnectionError = requests.ConnectionError - expected_calls.extend([ + expected_calls = [ mock.call.session(), mock.call.adapters.HTTPAdapter(pool_block=True), mock.call.session().mount('http://', mock.ANY), @@ -367,9 +371,26 @@ class DispatcherWorkflowTest(base.BaseTestCase, "metric/%(metric_name)s/measures" % url_params, headers=headers, data=json_matcher(self.measures_attributes)) - ]) + + ] post_responses.append(MockResponse(self.measure)) + if self.measure == 401: + type(self.ks_client).auth_token = mock.PropertyMock( + side_effect=['fake_token', 'new_token', 'new_token', + 'new_token', 'new_token']) + headers = {'Content-Type': 'application/json', + 'X-Auth-Token': 'new_token'} + + expected_calls.append( + mock.call.session().post( + "%(url)s/%(resource_type)s/%(resource_id)s/" + "metric/%(metric_name)s/measures" % url_params, + headers=headers, + data=json_matcher(self.measures_attributes))) + + post_responses.append(MockResponse(200)) + if self.post_resource: attributes = self.postable_attributes.copy() attributes.update(self.patchable_attributes) @@ -442,10 +463,10 @@ class DispatcherWorkflowTest(base.BaseTestCase, self.sample['resource_id'], 500)) elif self.patch_resource == 204 and self.patchable_attributes: - logger.debug.assert_called_with( + client_logger.debug.assert_called_with( 'Resource %s updated', self.sample['resource_id']) - else: - logger.debug.assert_called_with( + elif self.measure == 200: + client_logger.debug.assert_called_with( "Measure posted on metric %s of resource %s", self.sample['counter_name'], self.sample['resource_id'])