From 75d973f4e82011f83514c6ad7af8d6b3117b0c1e Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 25 Sep 2015 14:55:09 +0200 Subject: [PATCH] gnocchi: use gnocchiclient instead of requests Depends-On: I63f8735e2841291f5b66aeeab5fbdf957a001f0c Change-Id: Ib18cc0bd62156d953fcec68467d7e99ee9cc04f0 --- ceilometer/dispatcher/gnocchi.py | 142 +++++++------ ceilometer/dispatcher/gnocchi_client.py | 200 ------------------ .../tests/unit/dispatcher/test_gnocchi.py | 189 ++++++----------- test-requirements.txt | 1 + 4 files changed, 142 insertions(+), 390 deletions(-) delete mode 100644 ceilometer/dispatcher/gnocchi_client.py diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index 13f3a8294e..3c4fea6342 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -18,14 +18,17 @@ import os import threading import uuid +from gnocchiclient import client +from gnocchiclient import exceptions as gnocchi_exc +from keystoneauth1 import session as ka_session from oslo_config import cfg from oslo_log import log +import requests import six from stevedore import extension from ceilometer import declarative from ceilometer import dispatcher -from ceilometer.dispatcher import gnocchi_client from ceilometer.i18n import _, _LE, _LW from ceilometer import keystone_client from ceilometer import utils @@ -43,8 +46,8 @@ dispatcher_opts = [ help='Gnocchi project used to filter out samples ' 'generated by Gnocchi service activity'), cfg.StrOpt('url', - default="http://localhost:8041", - help='URL to Gnocchi.'), + deprecated_for_removal=True, + help='URL to Gnocchi. default: autodetection'), cfg.StrOpt('archive_policy', default=None, help='The archive policy to use when the dispatcher ' @@ -69,8 +72,10 @@ def log_and_ignore_unexpected_workflow_error(func): def log_and_ignore(self, *args, **kwargs): try: func(self, *args, **kwargs) - except gnocchi_client.UnexpectedError as e: + except gnocchi_exc.ClientException as e: LOG.error(six.text_type(e)) + except Exception as e: + LOG.error(six.text_type(e), exc_info=True) return log_and_ignore @@ -131,6 +136,19 @@ class ResourcesDefinition(object): return attrs +def GnocchiClient(conf): + requests_session = requests.session() + for scheme in requests_session.adapters.keys(): + requests_session.mount(scheme, ka_session.TCPKeepAliveAdapter( + pool_block=True)) + + session = keystone_client.get_session(requests_session=requests_session) + return client.Client('1', session, + interface=conf.service_credentials.interface, + region_name=conf.service_credentials.region_name, + endpoint_override=conf.dispatcher_gnocchi.url) + + class GnocchiDispatcher(dispatcher.MeterDispatcherBase): """Dispatcher class for recording metering data into database. @@ -178,10 +196,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): self._gnocchi_project_id_lock = threading.Lock() self._gnocchi_resource_lock = threading.Lock() - self._gnocchi = gnocchi_client.Client(conf.dispatcher_gnocchi.url) - - # TODO(sileht): Share yaml loading with - # event converter and declarative notification + self._gnocchi = GnocchiClient(conf) @staticmethod def _get_config_file(conf, config_file): @@ -287,42 +302,64 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): resource.update(resource_extra) + retry = True try: - self._gnocchi.post_measure(resource_type, resource_id, - metric_name, measures) - except gnocchi_client.NoSuchMetric: - # TODO(sileht): Make gnocchi smarter to be able to detect 404 - # for 'resource doesn't exist' and for 'metric doesn't exist' - # https://bugs.launchpad.net/gnocchi/+bug/1476186 - self._ensure_resource_and_metric(resource_type, resource, - rd.metrics, metric_name) + self._gnocchi.metric.add_measures(metric_name, measures, + resource_id) + except gnocchi_exc.ResourceNotFound: + self._if_not_cached("create", resource_type, resource, + self._create_resource, rd.metrics) + except gnocchi_exc.MetricNotFound: + metric = {'resource_id': resource['id'], + 'name': metric_name} + metric.update(rd.metrics[metric_name]) try: - self._gnocchi.post_measure(resource_type, resource_id, - metric_name, measures) - except gnocchi_client.NoSuchMetric: - LOG.error(_LE("Fail to post measures for " - "%(resource_id)s/%(metric_name)s") % - dict(resource_id=resource_id, - metric_name=metric_name)) + self._gnocchi.metric.create(metric) + except gnocchi_exc.NamedMetricAreadyExists: + # NOTE(sileht): metric created in the meantime + pass + else: + retry = False + + if retry: + self._gnocchi.metric.add_measures(metric_name, measures, + resource_id) + LOG.debug("Measure posted on metric %s of resource %s", + metric_name, resource_id) if resource_extra: - if self.cache: - cache_key = resource['id'] - attribute_hash = self._check_resource_cache( - cache_key, resource) - if attribute_hash: - with self._gnocchi_resource_lock: - self._gnocchi.update_resource(resource_type, - resource_id, - resource_extra) - self.cache.set(cache_key, attribute_hash) - else: - LOG.debug('resource cache hit for update %s', - cache_key) + self._if_not_cached("update", resource_type, resource, + self._update_resource, resource_extra) + + def _create_resource(self, resource_type, resource, metrics): + try: + resource["metrics"] = metrics + self._gnocchi.resource.create(resource_type, resource) + LOG.debug('Resource %s created', resource["id"]) + except gnocchi_exc.ResourceAlreadyExists: + # NOTE(sileht): resource created in the meantime + pass + + def _update_resource(self, resource_type, resource, resource_extra): + self._gnocchi.resource.update(resource_type, + resource["id"], + resource_extra) + LOG.debug('Resource %s updated', resource["id"]) + + def _if_not_cached(self, operation, resource_type, resource, method, + *args, **kwargs): + if self.cache: + cache_key = resource['id'] + attribute_hash = self._check_resource_cache(cache_key, resource) + if attribute_hash: + with self._gnocchi_resource_lock: + method(resource_type, resource, *args, **kwargs) + self.cache.set(cache_key, attribute_hash) else: - self._gnocchi.update_resource(resource_type, resource_id, - resource_extra) + LOG.debug('Resource cache hit for %s %s', operation, cache_key) + else: + method(resource_type, resource, *args, **kwargs) def _check_resource_cache(self, key, resource_data): cached_hash = self.cache.get(key) @@ -331,32 +368,3 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): if cached_hash != attribute_hash: return attribute_hash return None - - def _ensure_resource_and_metric(self, resource_type, resource, metrics, - metric_name): - try: - if self.cache: - cache_key = resource['id'] - attribute_hash = self._check_resource_cache( - cache_key, resource) - if attribute_hash: - with self._gnocchi_resource_lock: - resource['metrics'] = metrics - self._gnocchi.create_resource(resource_type, - resource) - self.cache.set(cache_key, attribute_hash) - else: - LOG.debug('resource cache hit for create %s', - cache_key) - else: - resource['metrics'] = metrics - self._gnocchi.create_resource(resource_type, resource) - except gnocchi_client.ResourceAlreadyExists: - try: - archive_policy = resource['metrics'][metric_name] - self._gnocchi.create_metric(resource_type, resource['id'], - metric_name, archive_policy) - except gnocchi_client.MetricAlreadyExists: - # NOTE(sileht): Just ignore the metric have been - # created in the meantime. - pass diff --git a/ceilometer/dispatcher/gnocchi_client.py b/ceilometer/dispatcher/gnocchi_client.py deleted file mode 100644 index bb7c0cc483..0000000000 --- a/ceilometer/dispatcher/gnocchi_client.py +++ /dev/null @@ -1,200 +0,0 @@ -# -# Copyright 2015 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import json - -from oslo_log import log -import requests -import retrying -from six.moves.urllib import parse as urlparse - -from ceilometer.i18n import _ -from ceilometer import keystone_client - -LOG = log.getLogger(__name__) - - -class UnexpectedError(Exception): - pass - - -class AuthenticationError(Exception): - pass - - -class NoSuchMetric(Exception): - pass - - -class MetricAlreadyExists(Exception): - pass - - -class NoSuchResource(Exception): - pass - - -class ResourceAlreadyExists(Exception): - pass - - -def retry_if_authentication_error(exception): - return isinstance(exception, AuthenticationError) - - -def maybe_retry_if_authentication_error(): - return retrying.retry(retry_on_exception=retry_if_authentication_error, - wait_fixed=2000, - stop_max_delay=60000) - - -class GnocchiSession(object): - def __init__(self): - self._session = requests.session() - # NOTE(sileht): wait when the pool is empty - # instead of raising errors. - adapter = requests.adapters.HTTPAdapter( - pool_block=True) - self._session.mount("http://", adapter) - self._session.mount("https://", adapter) - - self.post = functools.partial(self._do_method, method='post') - self.patch = functools.partial(self._do_method, method='patch') - - def _do_method(self, *args, **kwargs): - method = kwargs.pop('method') - try: - response = getattr(self._session, method)(*args, **kwargs) - except requests.ConnectionError as e: - raise UnexpectedError("Connection error: %s " % e) - - if response.status_code == 401: - LOG.info("Authentication failure, retrying...") - raise AuthenticationError() - - return response - - -class Client(object): - def __init__(self, url): - self._gnocchi_url = url.rstrip("/") - self._ks_client = keystone_client.get_client() - self._session = GnocchiSession() - - def _get_headers(self, content_type="application/json"): - return { - 'Content-Type': content_type, - 'X-Auth-Token': keystone_client.get_auth_token(self._ks_client), - } - - @maybe_retry_if_authentication_error() - def post_measure(self, resource_type, resource_id, metric_name, - measure_attributes): - r = self._session.post("%s/v1/resource/%s/%s/metric/%s/measures" - % (self._gnocchi_url, resource_type, - urlparse.quote(resource_id, safe=""), - metric_name), - headers=self._get_headers(), - data=json.dumps(measure_attributes)) - - if r.status_code == 404: - LOG.debug("The metric %(metric_name)s of " - "resource %(resource_id)s doesn't exists: " - "%(status_code)d", - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code}) - raise NoSuchMetric - elif r.status_code // 100 != 2: - raise UnexpectedError( - _("Fail to post measure on metric %(metric_name)s of " - "resource %(resource_id)s with status: " - "%(status_code)d: %(msg)s") % - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Measure posted on metric %s of resource %s", - metric_name, resource_id) - - @maybe_retry_if_authentication_error() - def create_resource(self, resource_type, resource): - r = self._session.post("%s/v1/resource/%s" - % (self._gnocchi_url, resource_type), - headers=self._get_headers(), - data=json.dumps(resource)) - - if r.status_code == 409: - LOG.debug("Resource %s already exists", resource['id']) - raise ResourceAlreadyExists - - elif r.status_code // 100 != 2: - raise UnexpectedError( - _("Resource %(resource_id)s creation failed with " - "status: %(status_code)d: %(msg)s") % - {'resource_id': resource['id'], - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Resource %s created", resource['id']) - - @maybe_retry_if_authentication_error() - def update_resource(self, resource_type, resource_id, - resource_extra): - r = self._session.patch( - "%s/v1/resource/%s/%s" - % (self._gnocchi_url, resource_type, - urlparse.quote(resource_id, safe="")), - headers=self._get_headers(), - data=json.dumps(resource_extra)) - - if r.status_code // 100 != 2: - raise UnexpectedError( - _("Resource %(resource_id)s update failed with " - "status: %(status_code)d: %(msg)s") % - {'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Resource %s updated", resource_id) - - @maybe_retry_if_authentication_error() - def create_metric(self, resource_type, resource_id, metric_name, - archive_policy): - params = {metric_name: archive_policy} - r = self._session.post("%s/v1/resource/%s/%s/metric" - % (self._gnocchi_url, resource_type, - urlparse.quote(resource_id, safe="")), - headers=self._get_headers(), - data=json.dumps(params)) - if r.status_code == 409: - LOG.debug("Metric %s of resource %s already exists", - metric_name, resource_id) - raise MetricAlreadyExists - - elif r.status_code // 100 != 2: - raise UnexpectedError( - _("Fail to create metric %(metric_name)s of " - "resource %(resource_id)s with status: " - "%(status_code)d: %(msg)s") % - {'metric_name': metric_name, - 'resource_id': resource_id, - 'status_code': r.status_code, - 'msg': r.text}) - else: - LOG.debug("Metric %s of resource %s created", - metric_name, resource_id) diff --git a/ceilometer/tests/unit/dispatcher/test_gnocchi.py b/ceilometer/tests/unit/dispatcher/test_gnocchi.py index eb2c0609fa..e6c5153340 100644 --- a/ceilometer/tests/unit/dispatcher/test_gnocchi.py +++ b/ceilometer/tests/unit/dispatcher/test_gnocchi.py @@ -15,17 +15,16 @@ # License for the specific language governing permissions and limitations # under the License. -import json import os import uuid +from gnocchiclient import exceptions as gnocchi_exc import mock from oslo_config import fixture as config_fixture from oslo_utils import fileutils from oslotest import mockpatch import requests import six -import six.moves.urllib.parse as urlparse import testscenarios from ceilometer import declarative @@ -36,17 +35,6 @@ from ceilometer.tests import base load_tests = testscenarios.load_tests_apply_scenarios -class json_matcher(object): - def __init__(self, ref): - self.ref = ref - - def __eq__(self, obj): - return self.ref == json.loads(obj) - - def __repr__(self): - return "" % self.ref - - class DispatcherTest(base.BaseTestCase): def setUp(self): @@ -271,16 +259,14 @@ class DispatcherWorkflowTest(base.BaseTestCase, ('resource_update_fail', dict(measure=204, post_resource=None, metric=None, measure_retry=None, patch_resource=500)), - ('new_metric', dict(measure=404, post_resource=409, metric=204, + ('new_metric', dict(measure=404, post_resource=None, metric=204, measure_retry=204, patch_resource=204)), - ('new_metric_fail', dict(measure=404, post_resource=409, metric=500, + ('new_metric_fail', dict(measure=404, post_resource=None, metric=500, measure_retry=None, patch_resource=None)), - ('retry_fail', dict(measure=404, post_resource=409, metric=409, + ('retry_fail', dict(measure=404, post_resource=409, metric=None, measure_retry=500, patch_resource=None)), ('measure_fail', dict(measure=500, post_resource=None, metric=None, measure_retry=None, patch_resource=None)), - ('measure_auth', dict(measure=401, post_resource=None, metric=None, - measure_retry=None, patch_resource=204)), ] @classmethod @@ -296,8 +282,6 @@ class DispatcherWorkflowTest(base.BaseTestCase, self.conf.config(url='http://localhost:8041', group='dispatcher_gnocchi') ks_client = mock.Mock() - ks_client.session.auth.get_access.return_value.auth_token = ( - "fake_token") ks_client.projects.find.return_value = mock.Mock( name='gnocchi', id='a2d42c23-d518-46b6-96ab-3fba2e146859') self.useFixture(mockpatch.Patch( @@ -315,60 +299,33 @@ class DispatcherWorkflowTest(base.BaseTestCase, self.sample['resource_id'] = str(uuid.uuid4()) + "/foobar" @mock.patch('ceilometer.dispatcher.gnocchi.LOG') - @mock.patch('ceilometer.dispatcher.gnocchi_client.LOG') - @mock.patch('ceilometer.dispatcher.gnocchi_client.requests') - def test_workflow(self, fake_requests, client_logger, logger): + @mock.patch('gnocchiclient.v1.client.Client') + def test_workflow(self, fakeclient_cls, logger): self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf) - base_url = self.dispatcher.conf.dispatcher_gnocchi.url - url_params = { - 'url': urlparse.urljoin(base_url, '/v1/resource'), - # NOTE(sileht): we don't use urlparse.quote here - # to ensure / is converted in %2F - 'resource_id': self.sample['resource_id'].replace("/", "%2F"), - 'resource_type': self.resource_type, - 'metric_name': self.sample['counter_name'] - } - headers = {'Content-Type': 'application/json', - 'X-Auth-Token': 'fake_token'} + fakeclient = fakeclient_cls.return_value - patch_responses = [] - post_responses = [] + # FIXME(sileht): we don't use urlparse.quote here + # to ensure / is converted in %2F + # temporary disabled until we find a solution + # on gnocchi side. Current gnocchiclient doesn't + # encode the resource_id + resource_id = self.sample['resource_id'] # .replace("/", "%2F"), + metric_name = self.sample['counter_name'] - # This is needed to mock Exception in py3 - fake_requests.ConnectionError = requests.ConnectionError + expected_calls = [mock.call.metric.add_measures( + metric_name, self.measures_attributes, resource_id)] - expected_calls = [ - mock.call.session(), - mock.call.adapters.HTTPAdapter(pool_block=True), - mock.call.session().mount('http://', mock.ANY), - mock.call.session().mount('https://', mock.ANY), - mock.call.session().post( - "%(url)s/%(resource_type)s/%(resource_id)s/" - "metric/%(metric_name)s/measures" % url_params, - headers=headers, - data=json_matcher(self.measures_attributes)) + add_measures_side_effect = [] - ] - post_responses.append(MockResponse(self.measure)) - - if self.measure == 401: - - type(self.ks_client.session.auth.get_access.return_value - ).auth_token = (mock.PropertyMock( - side_effect=['fake_token', 'new_token', 'new_token', - 'new_token', 'new_token'])) - headers = {'Content-Type': 'application/json', - 'X-Auth-Token': 'new_token'} - - expected_calls.append( - mock.call.session().post( - "%(url)s/%(resource_type)s/%(resource_id)s/" - "metric/%(metric_name)s/measures" % url_params, - headers=headers, - data=json_matcher(self.measures_attributes))) - - post_responses.append(MockResponse(200)) + if self.measure == 404 and self.post_resource: + add_measures_side_effect += [ + gnocchi_exc.ResourceNotFound(404)] + elif self.measure == 404 and self.metric: + add_measures_side_effect += [ + gnocchi_exc.MetricNotFound(404)] + elif self.measure == 500: + add_measures_side_effect += [Exception('boom!')] if self.post_resource: attributes = self.postable_attributes.copy() @@ -376,79 +333,65 @@ class DispatcherWorkflowTest(base.BaseTestCase, attributes['id'] = self.sample['resource_id'] attributes['metrics'] = dict((metric_name, {}) for metric_name in self.metric_names) - expected_calls.append(mock.call.session().post( - "%(url)s/%(resource_type)s" % url_params, - headers=headers, - data=json_matcher(attributes)), - ) - post_responses.append(MockResponse(self.post_resource)) + expected_calls.append(mock.call.resource.create( + self.resource_type, attributes)) + if self.post_resource == 409: + fakeclient.resource.create.side_effect = [ + gnocchi_exc.ResourceAlreadyExists(409)] + elif self.post_resource == 500: + fakeclient.resource.create.side_effect = [Exception('boom!')] if self.metric: - expected_calls.append(mock.call.session().post( - "%(url)s/%(resource_type)s/%(resource_id)s/metric" - % url_params, - headers=headers, - data=json_matcher({self.sample['counter_name']: {}}) - )) - post_responses.append(MockResponse(self.metric)) + expected_calls.append(mock.call.metric.create({ + 'name': self.sample['counter_name'], + 'resource_id': resource_id})) + if self.metric == 409: + fakeclient.metric.create.side_effect = [ + gnocchi_exc.NamedMetricAreadyExists(409)] + elif self.metric == 500: + fakeclient.metric.create.side_effect = [Exception('boom!')] if self.measure_retry: - expected_calls.append(mock.call.session().post( - "%(url)s/%(resource_type)s/%(resource_id)s/" - "metric/%(metric_name)s/measures" % url_params, - headers=headers, - data=json_matcher(self.measures_attributes)) - ) - post_responses.append(MockResponse(self.measure_retry)) + expected_calls.append(mock.call.metric.add_measures( + metric_name, + self.measures_attributes, + resource_id)) + if self.measure_retry == 204: + add_measures_side_effect += [None] + elif self.measure_retry == 500: + add_measures_side_effect += [ + Exception('boom!')] + else: + add_measures_side_effect += [None] if self.patch_resource and self.patchable_attributes: - expected_calls.append(mock.call.session().patch( - "%(url)s/%(resource_type)s/%(resource_id)s" % url_params, - headers=headers, - data=json_matcher(self.patchable_attributes)), - ) - patch_responses.append(MockResponse(self.patch_resource)) + expected_calls.append(mock.call.resource.update( + self.resource_type, resource_id, + self.patchable_attributes)) + if self.patch_resource == 500: + fakeclient.resource.update.side_effect = [Exception('boom!')] - s = fake_requests.session.return_value - s.patch.side_effect = patch_responses - s.post.side_effect = post_responses + fakeclient.metric.add_measures.side_effect = add_measures_side_effect self.dispatcher.record_metering_data([self.sample]) # Check that the last log message is the expected one - if self.measure == 500 or self.measure_retry == 500: - logger.error.assert_called_with( - "Fail to post measure on metric %s of resource %s " - "with status: %d: Internal Server Error" % - (self.sample['counter_name'], - self.sample['resource_id'], - 500)) - - elif self.post_resource == 500 or (self.patch_resource == 500 and - self.patchable_attributes): - logger.error.assert_called_with( - "Resource %s %s failed with status: " - "%d: Internal Server Error" % - (self.sample['resource_id'], - 'update' if self.patch_resource else 'creation', - 500)) - elif self.metric == 500: - logger.error.assert_called_with( - "Fail to create metric %s of resource %s " - "with status: %d: Internal Server Error" % - (self.sample['counter_name'], - self.sample['resource_id'], - 500)) + if (self.measure == 500 or self.measure_retry == 500 or + self.metric == 500 or self.post_resource == 500 or + (self.patch_resource == 500 and self.patchable_attributes)): + logger.error.assert_called_with('boom!', exc_info=True) elif self.patch_resource == 204 and self.patchable_attributes: - client_logger.debug.assert_called_with( + logger.debug.assert_called_with( 'Resource %s updated', self.sample['resource_id']) + self.assertEqual(0, logger.error.call_count) elif self.measure == 200: - client_logger.debug.assert_called_with( + logger.debug.assert_called_with( "Measure posted on metric %s of resource %s", self.sample['counter_name'], self.sample['resource_id']) + self.assertEqual(0, logger.error.call_count) - self.assertEqual(expected_calls, fake_requests.mock_calls) + self.assertEqual(expected_calls, fakeclient.mock_calls) DispatcherWorkflowTest.generate_scenarios() diff --git a/test-requirements.txt b/test-requirements.txt index fc1fafa69f..e45abef6b6 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -22,6 +22,7 @@ oslo.vmware>=1.16.0 # Apache-2.0 psycopg2>=2.5 pylint==1.4.4 # GNU GPL v2 pymongo>=3.0.2 +gnocchiclient>=2.1.0 python-subunit>=0.0.18 sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 sphinxcontrib-httpdomain