Merge "gnocchi: use gnocchiclient instead of requests"
This commit is contained in:
commit
7fd1d93b14
@ -18,14 +18,17 @@ import os
|
|||||||
import threading
|
import threading
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from gnocchiclient import client
|
||||||
|
from gnocchiclient import exceptions as gnocchi_exc
|
||||||
|
from keystoneauth1 import session as ka_session
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
import requests
|
||||||
import six
|
import six
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
|
|
||||||
from ceilometer import declarative
|
from ceilometer import declarative
|
||||||
from ceilometer import dispatcher
|
from ceilometer import dispatcher
|
||||||
from ceilometer.dispatcher import gnocchi_client
|
|
||||||
from ceilometer.i18n import _, _LE, _LW
|
from ceilometer.i18n import _, _LE, _LW
|
||||||
from ceilometer import keystone_client
|
from ceilometer import keystone_client
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -43,8 +46,8 @@ dispatcher_opts = [
|
|||||||
help='Gnocchi project used to filter out samples '
|
help='Gnocchi project used to filter out samples '
|
||||||
'generated by Gnocchi service activity'),
|
'generated by Gnocchi service activity'),
|
||||||
cfg.StrOpt('url',
|
cfg.StrOpt('url',
|
||||||
default="http://localhost:8041",
|
deprecated_for_removal=True,
|
||||||
help='URL to Gnocchi.'),
|
help='URL to Gnocchi. default: autodetection'),
|
||||||
cfg.StrOpt('archive_policy',
|
cfg.StrOpt('archive_policy',
|
||||||
default=None,
|
default=None,
|
||||||
help='The archive policy to use when the dispatcher '
|
help='The archive policy to use when the dispatcher '
|
||||||
@ -69,8 +72,10 @@ def log_and_ignore_unexpected_workflow_error(func):
|
|||||||
def log_and_ignore(self, *args, **kwargs):
|
def log_and_ignore(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
func(self, *args, **kwargs)
|
func(self, *args, **kwargs)
|
||||||
except gnocchi_client.UnexpectedError as e:
|
except gnocchi_exc.ClientException as e:
|
||||||
LOG.error(six.text_type(e))
|
LOG.error(six.text_type(e))
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(six.text_type(e), exc_info=True)
|
||||||
return log_and_ignore
|
return log_and_ignore
|
||||||
|
|
||||||
|
|
||||||
@ -131,6 +136,19 @@ class ResourcesDefinition(object):
|
|||||||
return attrs
|
return attrs
|
||||||
|
|
||||||
|
|
||||||
|
def GnocchiClient(conf):
|
||||||
|
requests_session = requests.session()
|
||||||
|
for scheme in requests_session.adapters.keys():
|
||||||
|
requests_session.mount(scheme, ka_session.TCPKeepAliveAdapter(
|
||||||
|
pool_block=True))
|
||||||
|
|
||||||
|
session = keystone_client.get_session(requests_session=requests_session)
|
||||||
|
return client.Client('1', session,
|
||||||
|
interface=conf.service_credentials.interface,
|
||||||
|
region_name=conf.service_credentials.region_name,
|
||||||
|
endpoint_override=conf.dispatcher_gnocchi.url)
|
||||||
|
|
||||||
|
|
||||||
class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
||||||
"""Dispatcher class for recording metering data into database.
|
"""Dispatcher class for recording metering data into database.
|
||||||
|
|
||||||
@ -178,10 +196,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|||||||
self._gnocchi_project_id_lock = threading.Lock()
|
self._gnocchi_project_id_lock = threading.Lock()
|
||||||
self._gnocchi_resource_lock = threading.Lock()
|
self._gnocchi_resource_lock = threading.Lock()
|
||||||
|
|
||||||
self._gnocchi = gnocchi_client.Client(conf.dispatcher_gnocchi.url)
|
self._gnocchi = GnocchiClient(conf)
|
||||||
|
|
||||||
# TODO(sileht): Share yaml loading with
|
|
||||||
# event converter and declarative notification
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_config_file(conf, config_file):
|
def _get_config_file(conf, config_file):
|
||||||
@ -287,42 +302,64 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|||||||
|
|
||||||
resource.update(resource_extra)
|
resource.update(resource_extra)
|
||||||
|
|
||||||
|
retry = True
|
||||||
try:
|
try:
|
||||||
self._gnocchi.post_measure(resource_type, resource_id,
|
self._gnocchi.metric.add_measures(metric_name, measures,
|
||||||
metric_name, measures)
|
resource_id)
|
||||||
except gnocchi_client.NoSuchMetric:
|
except gnocchi_exc.ResourceNotFound:
|
||||||
# TODO(sileht): Make gnocchi smarter to be able to detect 404
|
self._if_not_cached("create", resource_type, resource,
|
||||||
# for 'resource doesn't exist' and for 'metric doesn't exist'
|
self._create_resource, rd.metrics)
|
||||||
# https://bugs.launchpad.net/gnocchi/+bug/1476186
|
|
||||||
self._ensure_resource_and_metric(resource_type, resource,
|
|
||||||
rd.metrics, metric_name)
|
|
||||||
|
|
||||||
|
except gnocchi_exc.MetricNotFound:
|
||||||
|
metric = {'resource_id': resource['id'],
|
||||||
|
'name': metric_name}
|
||||||
|
metric.update(rd.metrics[metric_name])
|
||||||
try:
|
try:
|
||||||
self._gnocchi.post_measure(resource_type, resource_id,
|
self._gnocchi.metric.create(metric)
|
||||||
metric_name, measures)
|
except gnocchi_exc.NamedMetricAreadyExists:
|
||||||
except gnocchi_client.NoSuchMetric:
|
# NOTE(sileht): metric created in the meantime
|
||||||
LOG.error(_LE("Fail to post measures for "
|
pass
|
||||||
"%(resource_id)s/%(metric_name)s") %
|
else:
|
||||||
dict(resource_id=resource_id,
|
retry = False
|
||||||
metric_name=metric_name))
|
|
||||||
|
if retry:
|
||||||
|
self._gnocchi.metric.add_measures(metric_name, measures,
|
||||||
|
resource_id)
|
||||||
|
LOG.debug("Measure posted on metric %s of resource %s",
|
||||||
|
metric_name, resource_id)
|
||||||
|
|
||||||
if resource_extra:
|
if resource_extra:
|
||||||
if self.cache:
|
self._if_not_cached("update", resource_type, resource,
|
||||||
cache_key = resource['id']
|
self._update_resource, resource_extra)
|
||||||
attribute_hash = self._check_resource_cache(
|
|
||||||
cache_key, resource)
|
def _create_resource(self, resource_type, resource, metrics):
|
||||||
if attribute_hash:
|
try:
|
||||||
with self._gnocchi_resource_lock:
|
resource["metrics"] = metrics
|
||||||
self._gnocchi.update_resource(resource_type,
|
self._gnocchi.resource.create(resource_type, resource)
|
||||||
resource_id,
|
LOG.debug('Resource %s created', resource["id"])
|
||||||
resource_extra)
|
except gnocchi_exc.ResourceAlreadyExists:
|
||||||
self.cache.set(cache_key, attribute_hash)
|
# NOTE(sileht): resource created in the meantime
|
||||||
else:
|
pass
|
||||||
LOG.debug('resource cache hit for update %s',
|
|
||||||
cache_key)
|
def _update_resource(self, resource_type, resource, resource_extra):
|
||||||
|
self._gnocchi.resource.update(resource_type,
|
||||||
|
resource["id"],
|
||||||
|
resource_extra)
|
||||||
|
LOG.debug('Resource %s updated', resource["id"])
|
||||||
|
|
||||||
|
def _if_not_cached(self, operation, resource_type, resource, method,
|
||||||
|
*args, **kwargs):
|
||||||
|
if self.cache:
|
||||||
|
cache_key = resource['id']
|
||||||
|
attribute_hash = self._check_resource_cache(cache_key, resource)
|
||||||
|
if attribute_hash:
|
||||||
|
with self._gnocchi_resource_lock:
|
||||||
|
method(resource_type, resource, *args, **kwargs)
|
||||||
|
self.cache.set(cache_key, attribute_hash)
|
||||||
else:
|
else:
|
||||||
self._gnocchi.update_resource(resource_type, resource_id,
|
LOG.debug('Resource cache hit for %s %s', operation, cache_key)
|
||||||
resource_extra)
|
else:
|
||||||
|
method(resource_type, resource, *args, **kwargs)
|
||||||
|
|
||||||
def _check_resource_cache(self, key, resource_data):
|
def _check_resource_cache(self, key, resource_data):
|
||||||
cached_hash = self.cache.get(key)
|
cached_hash = self.cache.get(key)
|
||||||
@ -331,32 +368,3 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|||||||
if cached_hash != attribute_hash:
|
if cached_hash != attribute_hash:
|
||||||
return attribute_hash
|
return attribute_hash
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _ensure_resource_and_metric(self, resource_type, resource, metrics,
|
|
||||||
metric_name):
|
|
||||||
try:
|
|
||||||
if self.cache:
|
|
||||||
cache_key = resource['id']
|
|
||||||
attribute_hash = self._check_resource_cache(
|
|
||||||
cache_key, resource)
|
|
||||||
if attribute_hash:
|
|
||||||
with self._gnocchi_resource_lock:
|
|
||||||
resource['metrics'] = metrics
|
|
||||||
self._gnocchi.create_resource(resource_type,
|
|
||||||
resource)
|
|
||||||
self.cache.set(cache_key, attribute_hash)
|
|
||||||
else:
|
|
||||||
LOG.debug('resource cache hit for create %s',
|
|
||||||
cache_key)
|
|
||||||
else:
|
|
||||||
resource['metrics'] = metrics
|
|
||||||
self._gnocchi.create_resource(resource_type, resource)
|
|
||||||
except gnocchi_client.ResourceAlreadyExists:
|
|
||||||
try:
|
|
||||||
archive_policy = resource['metrics'][metric_name]
|
|
||||||
self._gnocchi.create_metric(resource_type, resource['id'],
|
|
||||||
metric_name, archive_policy)
|
|
||||||
except gnocchi_client.MetricAlreadyExists:
|
|
||||||
# NOTE(sileht): Just ignore the metric have been
|
|
||||||
# created in the meantime.
|
|
||||||
pass
|
|
||||||
|
@ -1,200 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright 2015 Red Hat
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import functools
|
|
||||||
import json
|
|
||||||
|
|
||||||
from oslo_log import log
|
|
||||||
import requests
|
|
||||||
import retrying
|
|
||||||
from six.moves.urllib import parse as urlparse
|
|
||||||
|
|
||||||
from ceilometer.i18n import _
|
|
||||||
from ceilometer import keystone_client
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class UnexpectedError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class AuthenticationError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NoSuchMetric(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class MetricAlreadyExists(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NoSuchResource(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ResourceAlreadyExists(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def retry_if_authentication_error(exception):
|
|
||||||
return isinstance(exception, AuthenticationError)
|
|
||||||
|
|
||||||
|
|
||||||
def maybe_retry_if_authentication_error():
|
|
||||||
return retrying.retry(retry_on_exception=retry_if_authentication_error,
|
|
||||||
wait_fixed=2000,
|
|
||||||
stop_max_delay=60000)
|
|
||||||
|
|
||||||
|
|
||||||
class GnocchiSession(object):
|
|
||||||
def __init__(self):
|
|
||||||
self._session = requests.session()
|
|
||||||
# NOTE(sileht): wait when the pool is empty
|
|
||||||
# instead of raising errors.
|
|
||||||
adapter = requests.adapters.HTTPAdapter(
|
|
||||||
pool_block=True)
|
|
||||||
self._session.mount("http://", adapter)
|
|
||||||
self._session.mount("https://", adapter)
|
|
||||||
|
|
||||||
self.post = functools.partial(self._do_method, method='post')
|
|
||||||
self.patch = functools.partial(self._do_method, method='patch')
|
|
||||||
|
|
||||||
def _do_method(self, *args, **kwargs):
|
|
||||||
method = kwargs.pop('method')
|
|
||||||
try:
|
|
||||||
response = getattr(self._session, method)(*args, **kwargs)
|
|
||||||
except requests.ConnectionError as e:
|
|
||||||
raise UnexpectedError("Connection error: %s " % e)
|
|
||||||
|
|
||||||
if response.status_code == 401:
|
|
||||||
LOG.info("Authentication failure, retrying...")
|
|
||||||
raise AuthenticationError()
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
class Client(object):
|
|
||||||
def __init__(self, url):
|
|
||||||
self._gnocchi_url = url.rstrip("/")
|
|
||||||
self._ks_client = keystone_client.get_client()
|
|
||||||
self._session = GnocchiSession()
|
|
||||||
|
|
||||||
def _get_headers(self, content_type="application/json"):
|
|
||||||
return {
|
|
||||||
'Content-Type': content_type,
|
|
||||||
'X-Auth-Token': keystone_client.get_auth_token(self._ks_client),
|
|
||||||
}
|
|
||||||
|
|
||||||
@maybe_retry_if_authentication_error()
|
|
||||||
def post_measure(self, resource_type, resource_id, metric_name,
|
|
||||||
measure_attributes):
|
|
||||||
r = self._session.post("%s/v1/resource/%s/%s/metric/%s/measures"
|
|
||||||
% (self._gnocchi_url, resource_type,
|
|
||||||
urlparse.quote(resource_id, safe=""),
|
|
||||||
metric_name),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(measure_attributes))
|
|
||||||
|
|
||||||
if r.status_code == 404:
|
|
||||||
LOG.debug("The metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s doesn't exists: "
|
|
||||||
"%(status_code)d",
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code})
|
|
||||||
raise NoSuchMetric
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedError(
|
|
||||||
_("Fail to post measure on metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s with status: "
|
|
||||||
"%(status_code)d: %(msg)s") %
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Measure posted on metric %s of resource %s",
|
|
||||||
metric_name, resource_id)
|
|
||||||
|
|
||||||
@maybe_retry_if_authentication_error()
|
|
||||||
def create_resource(self, resource_type, resource):
|
|
||||||
r = self._session.post("%s/v1/resource/%s"
|
|
||||||
% (self._gnocchi_url, resource_type),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(resource))
|
|
||||||
|
|
||||||
if r.status_code == 409:
|
|
||||||
LOG.debug("Resource %s already exists", resource['id'])
|
|
||||||
raise ResourceAlreadyExists
|
|
||||||
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedError(
|
|
||||||
_("Resource %(resource_id)s creation failed with "
|
|
||||||
"status: %(status_code)d: %(msg)s") %
|
|
||||||
{'resource_id': resource['id'],
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Resource %s created", resource['id'])
|
|
||||||
|
|
||||||
@maybe_retry_if_authentication_error()
|
|
||||||
def update_resource(self, resource_type, resource_id,
|
|
||||||
resource_extra):
|
|
||||||
r = self._session.patch(
|
|
||||||
"%s/v1/resource/%s/%s"
|
|
||||||
% (self._gnocchi_url, resource_type,
|
|
||||||
urlparse.quote(resource_id, safe="")),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(resource_extra))
|
|
||||||
|
|
||||||
if r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedError(
|
|
||||||
_("Resource %(resource_id)s update failed with "
|
|
||||||
"status: %(status_code)d: %(msg)s") %
|
|
||||||
{'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Resource %s updated", resource_id)
|
|
||||||
|
|
||||||
@maybe_retry_if_authentication_error()
|
|
||||||
def create_metric(self, resource_type, resource_id, metric_name,
|
|
||||||
archive_policy):
|
|
||||||
params = {metric_name: archive_policy}
|
|
||||||
r = self._session.post("%s/v1/resource/%s/%s/metric"
|
|
||||||
% (self._gnocchi_url, resource_type,
|
|
||||||
urlparse.quote(resource_id, safe="")),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(params))
|
|
||||||
if r.status_code == 409:
|
|
||||||
LOG.debug("Metric %s of resource %s already exists",
|
|
||||||
metric_name, resource_id)
|
|
||||||
raise MetricAlreadyExists
|
|
||||||
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedError(
|
|
||||||
_("Fail to create metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s with status: "
|
|
||||||
"%(status_code)d: %(msg)s") %
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Metric %s of resource %s created",
|
|
||||||
metric_name, resource_id)
|
|
@ -15,17 +15,16 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from gnocchiclient import exceptions as gnocchi_exc
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import fixture as config_fixture
|
from oslo_config import fixture as config_fixture
|
||||||
from oslo_utils import fileutils
|
from oslo_utils import fileutils
|
||||||
from oslotest import mockpatch
|
from oslotest import mockpatch
|
||||||
import requests
|
import requests
|
||||||
import six
|
import six
|
||||||
import six.moves.urllib.parse as urlparse
|
|
||||||
import testscenarios
|
import testscenarios
|
||||||
|
|
||||||
from ceilometer import declarative
|
from ceilometer import declarative
|
||||||
@ -36,17 +35,6 @@ from ceilometer.tests import base
|
|||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
|
|
||||||
|
|
||||||
class json_matcher(object):
|
|
||||||
def __init__(self, ref):
|
|
||||||
self.ref = ref
|
|
||||||
|
|
||||||
def __eq__(self, obj):
|
|
||||||
return self.ref == json.loads(obj)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<json_matcher \"%s\">" % self.ref
|
|
||||||
|
|
||||||
|
|
||||||
class DispatcherTest(base.BaseTestCase):
|
class DispatcherTest(base.BaseTestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -271,16 +259,14 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
('resource_update_fail', dict(measure=204, post_resource=None,
|
('resource_update_fail', dict(measure=204, post_resource=None,
|
||||||
metric=None, measure_retry=None,
|
metric=None, measure_retry=None,
|
||||||
patch_resource=500)),
|
patch_resource=500)),
|
||||||
('new_metric', dict(measure=404, post_resource=409, metric=204,
|
('new_metric', dict(measure=404, post_resource=None, metric=204,
|
||||||
measure_retry=204, patch_resource=204)),
|
measure_retry=204, patch_resource=204)),
|
||||||
('new_metric_fail', dict(measure=404, post_resource=409, metric=500,
|
('new_metric_fail', dict(measure=404, post_resource=None, metric=500,
|
||||||
measure_retry=None, patch_resource=None)),
|
measure_retry=None, patch_resource=None)),
|
||||||
('retry_fail', dict(measure=404, post_resource=409, metric=409,
|
('retry_fail', dict(measure=404, post_resource=409, metric=None,
|
||||||
measure_retry=500, patch_resource=None)),
|
measure_retry=500, patch_resource=None)),
|
||||||
('measure_fail', dict(measure=500, post_resource=None, metric=None,
|
('measure_fail', dict(measure=500, post_resource=None, metric=None,
|
||||||
measure_retry=None, patch_resource=None)),
|
measure_retry=None, patch_resource=None)),
|
||||||
('measure_auth', dict(measure=401, post_resource=None, metric=None,
|
|
||||||
measure_retry=None, patch_resource=204)),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -296,8 +282,6 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
self.conf.config(url='http://localhost:8041',
|
self.conf.config(url='http://localhost:8041',
|
||||||
group='dispatcher_gnocchi')
|
group='dispatcher_gnocchi')
|
||||||
ks_client = mock.Mock()
|
ks_client = mock.Mock()
|
||||||
ks_client.session.auth.get_access.return_value.auth_token = (
|
|
||||||
"fake_token")
|
|
||||||
ks_client.projects.find.return_value = mock.Mock(
|
ks_client.projects.find.return_value = mock.Mock(
|
||||||
name='gnocchi', id='a2d42c23-d518-46b6-96ab-3fba2e146859')
|
name='gnocchi', id='a2d42c23-d518-46b6-96ab-3fba2e146859')
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
@ -315,60 +299,33 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
self.sample['resource_id'] = str(uuid.uuid4()) + "/foobar"
|
self.sample['resource_id'] = str(uuid.uuid4()) + "/foobar"
|
||||||
|
|
||||||
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
|
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
|
||||||
@mock.patch('ceilometer.dispatcher.gnocchi_client.LOG')
|
@mock.patch('gnocchiclient.v1.client.Client')
|
||||||
@mock.patch('ceilometer.dispatcher.gnocchi_client.requests')
|
def test_workflow(self, fakeclient_cls, logger):
|
||||||
def test_workflow(self, fake_requests, client_logger, logger):
|
|
||||||
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
|
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
|
||||||
|
|
||||||
base_url = self.dispatcher.conf.dispatcher_gnocchi.url
|
fakeclient = fakeclient_cls.return_value
|
||||||
url_params = {
|
|
||||||
'url': urlparse.urljoin(base_url, '/v1/resource'),
|
|
||||||
# NOTE(sileht): we don't use urlparse.quote here
|
|
||||||
# to ensure / is converted in %2F
|
|
||||||
'resource_id': self.sample['resource_id'].replace("/", "%2F"),
|
|
||||||
'resource_type': self.resource_type,
|
|
||||||
'metric_name': self.sample['counter_name']
|
|
||||||
}
|
|
||||||
headers = {'Content-Type': 'application/json',
|
|
||||||
'X-Auth-Token': 'fake_token'}
|
|
||||||
|
|
||||||
patch_responses = []
|
# FIXME(sileht): we don't use urlparse.quote here
|
||||||
post_responses = []
|
# to ensure / is converted in %2F
|
||||||
|
# temporary disabled until we find a solution
|
||||||
|
# on gnocchi side. Current gnocchiclient doesn't
|
||||||
|
# encode the resource_id
|
||||||
|
resource_id = self.sample['resource_id'] # .replace("/", "%2F"),
|
||||||
|
metric_name = self.sample['counter_name']
|
||||||
|
|
||||||
# This is needed to mock Exception in py3
|
expected_calls = [mock.call.metric.add_measures(
|
||||||
fake_requests.ConnectionError = requests.ConnectionError
|
metric_name, self.measures_attributes, resource_id)]
|
||||||
|
|
||||||
expected_calls = [
|
add_measures_side_effect = []
|
||||||
mock.call.session(),
|
|
||||||
mock.call.adapters.HTTPAdapter(pool_block=True),
|
|
||||||
mock.call.session().mount('http://', mock.ANY),
|
|
||||||
mock.call.session().mount('https://', mock.ANY),
|
|
||||||
mock.call.session().post(
|
|
||||||
"%(url)s/%(resource_type)s/%(resource_id)s/"
|
|
||||||
"metric/%(metric_name)s/measures" % url_params,
|
|
||||||
headers=headers,
|
|
||||||
data=json_matcher(self.measures_attributes))
|
|
||||||
|
|
||||||
]
|
if self.measure == 404 and self.post_resource:
|
||||||
post_responses.append(MockResponse(self.measure))
|
add_measures_side_effect += [
|
||||||
|
gnocchi_exc.ResourceNotFound(404)]
|
||||||
if self.measure == 401:
|
elif self.measure == 404 and self.metric:
|
||||||
|
add_measures_side_effect += [
|
||||||
type(self.ks_client.session.auth.get_access.return_value
|
gnocchi_exc.MetricNotFound(404)]
|
||||||
).auth_token = (mock.PropertyMock(
|
elif self.measure == 500:
|
||||||
side_effect=['fake_token', 'new_token', 'new_token',
|
add_measures_side_effect += [Exception('boom!')]
|
||||||
'new_token', 'new_token']))
|
|
||||||
headers = {'Content-Type': 'application/json',
|
|
||||||
'X-Auth-Token': 'new_token'}
|
|
||||||
|
|
||||||
expected_calls.append(
|
|
||||||
mock.call.session().post(
|
|
||||||
"%(url)s/%(resource_type)s/%(resource_id)s/"
|
|
||||||
"metric/%(metric_name)s/measures" % url_params,
|
|
||||||
headers=headers,
|
|
||||||
data=json_matcher(self.measures_attributes)))
|
|
||||||
|
|
||||||
post_responses.append(MockResponse(200))
|
|
||||||
|
|
||||||
if self.post_resource:
|
if self.post_resource:
|
||||||
attributes = self.postable_attributes.copy()
|
attributes = self.postable_attributes.copy()
|
||||||
@ -376,79 +333,65 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
attributes['id'] = self.sample['resource_id']
|
attributes['id'] = self.sample['resource_id']
|
||||||
attributes['metrics'] = dict((metric_name, {})
|
attributes['metrics'] = dict((metric_name, {})
|
||||||
for metric_name in self.metric_names)
|
for metric_name in self.metric_names)
|
||||||
expected_calls.append(mock.call.session().post(
|
expected_calls.append(mock.call.resource.create(
|
||||||
"%(url)s/%(resource_type)s" % url_params,
|
self.resource_type, attributes))
|
||||||
headers=headers,
|
if self.post_resource == 409:
|
||||||
data=json_matcher(attributes)),
|
fakeclient.resource.create.side_effect = [
|
||||||
)
|
gnocchi_exc.ResourceAlreadyExists(409)]
|
||||||
post_responses.append(MockResponse(self.post_resource))
|
elif self.post_resource == 500:
|
||||||
|
fakeclient.resource.create.side_effect = [Exception('boom!')]
|
||||||
|
|
||||||
if self.metric:
|
if self.metric:
|
||||||
expected_calls.append(mock.call.session().post(
|
expected_calls.append(mock.call.metric.create({
|
||||||
"%(url)s/%(resource_type)s/%(resource_id)s/metric"
|
'name': self.sample['counter_name'],
|
||||||
% url_params,
|
'resource_id': resource_id}))
|
||||||
headers=headers,
|
if self.metric == 409:
|
||||||
data=json_matcher({self.sample['counter_name']: {}})
|
fakeclient.metric.create.side_effect = [
|
||||||
))
|
gnocchi_exc.NamedMetricAreadyExists(409)]
|
||||||
post_responses.append(MockResponse(self.metric))
|
elif self.metric == 500:
|
||||||
|
fakeclient.metric.create.side_effect = [Exception('boom!')]
|
||||||
|
|
||||||
if self.measure_retry:
|
if self.measure_retry:
|
||||||
expected_calls.append(mock.call.session().post(
|
expected_calls.append(mock.call.metric.add_measures(
|
||||||
"%(url)s/%(resource_type)s/%(resource_id)s/"
|
metric_name,
|
||||||
"metric/%(metric_name)s/measures" % url_params,
|
self.measures_attributes,
|
||||||
headers=headers,
|
resource_id))
|
||||||
data=json_matcher(self.measures_attributes))
|
if self.measure_retry == 204:
|
||||||
)
|
add_measures_side_effect += [None]
|
||||||
post_responses.append(MockResponse(self.measure_retry))
|
elif self.measure_retry == 500:
|
||||||
|
add_measures_side_effect += [
|
||||||
|
Exception('boom!')]
|
||||||
|
else:
|
||||||
|
add_measures_side_effect += [None]
|
||||||
|
|
||||||
if self.patch_resource and self.patchable_attributes:
|
if self.patch_resource and self.patchable_attributes:
|
||||||
expected_calls.append(mock.call.session().patch(
|
expected_calls.append(mock.call.resource.update(
|
||||||
"%(url)s/%(resource_type)s/%(resource_id)s" % url_params,
|
self.resource_type, resource_id,
|
||||||
headers=headers,
|
self.patchable_attributes))
|
||||||
data=json_matcher(self.patchable_attributes)),
|
if self.patch_resource == 500:
|
||||||
)
|
fakeclient.resource.update.side_effect = [Exception('boom!')]
|
||||||
patch_responses.append(MockResponse(self.patch_resource))
|
|
||||||
|
|
||||||
s = fake_requests.session.return_value
|
fakeclient.metric.add_measures.side_effect = add_measures_side_effect
|
||||||
s.patch.side_effect = patch_responses
|
|
||||||
s.post.side_effect = post_responses
|
|
||||||
|
|
||||||
self.dispatcher.record_metering_data([self.sample])
|
self.dispatcher.record_metering_data([self.sample])
|
||||||
|
|
||||||
# Check that the last log message is the expected one
|
# Check that the last log message is the expected one
|
||||||
if self.measure == 500 or self.measure_retry == 500:
|
if (self.measure == 500 or self.measure_retry == 500 or
|
||||||
logger.error.assert_called_with(
|
self.metric == 500 or self.post_resource == 500 or
|
||||||
"Fail to post measure on metric %s of resource %s "
|
(self.patch_resource == 500 and self.patchable_attributes)):
|
||||||
"with status: %d: Internal Server Error" %
|
logger.error.assert_called_with('boom!', exc_info=True)
|
||||||
(self.sample['counter_name'],
|
|
||||||
self.sample['resource_id'],
|
|
||||||
500))
|
|
||||||
|
|
||||||
elif self.post_resource == 500 or (self.patch_resource == 500 and
|
|
||||||
self.patchable_attributes):
|
|
||||||
logger.error.assert_called_with(
|
|
||||||
"Resource %s %s failed with status: "
|
|
||||||
"%d: Internal Server Error" %
|
|
||||||
(self.sample['resource_id'],
|
|
||||||
'update' if self.patch_resource else 'creation',
|
|
||||||
500))
|
|
||||||
elif self.metric == 500:
|
|
||||||
logger.error.assert_called_with(
|
|
||||||
"Fail to create metric %s of resource %s "
|
|
||||||
"with status: %d: Internal Server Error" %
|
|
||||||
(self.sample['counter_name'],
|
|
||||||
self.sample['resource_id'],
|
|
||||||
500))
|
|
||||||
elif self.patch_resource == 204 and self.patchable_attributes:
|
elif self.patch_resource == 204 and self.patchable_attributes:
|
||||||
client_logger.debug.assert_called_with(
|
logger.debug.assert_called_with(
|
||||||
'Resource %s updated', self.sample['resource_id'])
|
'Resource %s updated', self.sample['resource_id'])
|
||||||
|
self.assertEqual(0, logger.error.call_count)
|
||||||
elif self.measure == 200:
|
elif self.measure == 200:
|
||||||
client_logger.debug.assert_called_with(
|
logger.debug.assert_called_with(
|
||||||
"Measure posted on metric %s of resource %s",
|
"Measure posted on metric %s of resource %s",
|
||||||
self.sample['counter_name'],
|
self.sample['counter_name'],
|
||||||
self.sample['resource_id'])
|
self.sample['resource_id'])
|
||||||
|
self.assertEqual(0, logger.error.call_count)
|
||||||
|
|
||||||
self.assertEqual(expected_calls, fake_requests.mock_calls)
|
self.assertEqual(expected_calls, fakeclient.mock_calls)
|
||||||
|
|
||||||
|
|
||||||
DispatcherWorkflowTest.generate_scenarios()
|
DispatcherWorkflowTest.generate_scenarios()
|
||||||
|
@ -22,6 +22,7 @@ oslo.vmware>=1.16.0 # Apache-2.0
|
|||||||
psycopg2>=2.5
|
psycopg2>=2.5
|
||||||
pylint==1.4.5 # GNU GPL v2
|
pylint==1.4.5 # GNU GPL v2
|
||||||
pymongo>=3.0.2
|
pymongo>=3.0.2
|
||||||
|
gnocchiclient>=2.1.0
|
||||||
python-subunit>=0.0.18
|
python-subunit>=0.0.18
|
||||||
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
|
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
|
||||||
sphinxcontrib-httpdomain
|
sphinxcontrib-httpdomain
|
||||||
|
Loading…
Reference in New Issue
Block a user