gnocchi: retry with a new token on 401
The gnocchi dispatcher just bail out on any unexpected Gnocchi HTTP code. This change moves all Gnocchi API and HTTP stuffs in the gnocchi_client module. And improves the handling of 401 error code to retry requests when the authentication fail, usualy because the token have expired. Change-Id: I9d7e29337de2cb480c64af03032dcf9fa2a3829c
This commit is contained in:
parent
c21f377128
commit
aa67c96b68
@ -17,7 +17,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
|
||||||
import operator
|
import operator
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
@ -25,11 +24,11 @@ import threading
|
|||||||
import jsonpath_rw
|
import jsonpath_rw
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import requests
|
|
||||||
import six
|
import six
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from ceilometer import dispatcher
|
from ceilometer import dispatcher
|
||||||
|
from ceilometer.dispatcher import gnocchi_client
|
||||||
from ceilometer.i18n import _, _LE
|
from ceilometer.i18n import _, _LE
|
||||||
from ceilometer import keystone_client
|
from ceilometer import keystone_client
|
||||||
|
|
||||||
@ -65,35 +64,11 @@ dispatcher_opts = [
|
|||||||
cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi")
|
cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi")
|
||||||
|
|
||||||
|
|
||||||
class UnexpectedWorkflowError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NoSuchMetric(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class MetricAlreadyExists(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NoSuchResource(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ResourceAlreadyExists(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def log_and_ignore_unexpected_workflow_error(func):
|
def log_and_ignore_unexpected_workflow_error(func):
|
||||||
def log_and_ignore(self, *args, **kwargs):
|
def log_and_ignore(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
func(self, *args, **kwargs)
|
func(self, *args, **kwargs)
|
||||||
except requests.ConnectionError as e:
|
except gnocchi_client.UnexpectedError as e:
|
||||||
with self._gnocchi_api_lock:
|
|
||||||
self._gnocchi_api = None
|
|
||||||
LOG.warn("Connection error, reconnecting...")
|
|
||||||
except UnexpectedWorkflowError as e:
|
|
||||||
LOG.error(six.text_type(e))
|
LOG.error(six.text_type(e))
|
||||||
return log_and_ignore
|
return log_and_ignore
|
||||||
|
|
||||||
@ -197,20 +172,13 @@ class GnocchiDispatcher(dispatcher.Base):
|
|||||||
self.filter_service_activity = (
|
self.filter_service_activity = (
|
||||||
conf.dispatcher_gnocchi.filter_service_activity)
|
conf.dispatcher_gnocchi.filter_service_activity)
|
||||||
self._ks_client = keystone_client.get_client()
|
self._ks_client = keystone_client.get_client()
|
||||||
self.gnocchi_url = conf.dispatcher_gnocchi.url
|
|
||||||
self.gnocchi_archive_policy_data = self._load_archive_policy(conf)
|
self.gnocchi_archive_policy_data = self._load_archive_policy(conf)
|
||||||
self.resources_definition = self._load_resources_definitions(conf)
|
self.resources_definition = self._load_resources_definitions(conf)
|
||||||
|
|
||||||
self._gnocchi_project_id = None
|
self._gnocchi_project_id = None
|
||||||
self._gnocchi_project_id_lock = threading.Lock()
|
self._gnocchi_project_id_lock = threading.Lock()
|
||||||
self._gnocchi_api = None
|
|
||||||
self._gnocchi_api_lock = threading.Lock()
|
|
||||||
|
|
||||||
def _get_headers(self, content_type="application/json"):
|
self._gnocchi = gnocchi_client.Client(conf.dispatcher_gnocchi.url)
|
||||||
return {
|
|
||||||
'Content-Type': content_type,
|
|
||||||
'X-Auth-Token': self._ks_client.auth_token,
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO(sileht): Share yaml loading with
|
# TODO(sileht): Share yaml loading with
|
||||||
# event converter and declarative notification
|
# event converter and declarative notification
|
||||||
@ -268,23 +236,6 @@ class GnocchiDispatcher(dispatcher.Base):
|
|||||||
self.gnocchi_project_id)
|
self.gnocchi_project_id)
|
||||||
return self._gnocchi_project_id
|
return self._gnocchi_project_id
|
||||||
|
|
||||||
@property
|
|
||||||
def gnocchi_api(self):
|
|
||||||
"""return a working requests session object"""
|
|
||||||
if self._gnocchi_api is not None:
|
|
||||||
return self._gnocchi_api
|
|
||||||
|
|
||||||
with self._gnocchi_api_lock:
|
|
||||||
if self._gnocchi_api is None:
|
|
||||||
self._gnocchi_api = requests.session()
|
|
||||||
# NOTE(sileht): wait when the pool is empty
|
|
||||||
# instead of raising errors.
|
|
||||||
adapter = requests.adapters.HTTPAdapter(pool_block=True)
|
|
||||||
self._gnocchi_api.mount("http://", adapter)
|
|
||||||
self._gnocchi_api.mount("https://", adapter)
|
|
||||||
|
|
||||||
return self._gnocchi_api
|
|
||||||
|
|
||||||
def _is_swift_account_sample(self, sample):
|
def _is_swift_account_sample(self, sample):
|
||||||
return bool([rd for rd in self.resources_definition
|
return bool([rd for rd in self.resources_definition
|
||||||
if rd.cfg['resource_type'] == 'swift_account'
|
if rd.cfg['resource_type'] == 'swift_account'
|
||||||
@ -328,11 +279,6 @@ class GnocchiDispatcher(dispatcher.Base):
|
|||||||
|
|
||||||
@log_and_ignore_unexpected_workflow_error
|
@log_and_ignore_unexpected_workflow_error
|
||||||
def _process_resource(self, resource_id, metric_grouped_samples):
|
def _process_resource(self, resource_id, metric_grouped_samples):
|
||||||
# TODO(sileht): Any HTTP 50X/401 error is just logged and this method
|
|
||||||
# stop, perhaps we can be smarter and retry later in case of 50X and
|
|
||||||
# directly in case of 401. A gnocchiclient would help a lot for the
|
|
||||||
# latest.
|
|
||||||
|
|
||||||
resource_extra = {}
|
resource_extra = {}
|
||||||
for metric_name, samples in metric_grouped_samples:
|
for metric_name, samples in metric_grouped_samples:
|
||||||
samples = list(samples)
|
samples = list(samples)
|
||||||
@ -359,9 +305,9 @@ class GnocchiDispatcher(dispatcher.Base):
|
|||||||
resource.update(resource_extra)
|
resource.update(resource_extra)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._post_measure(resource_type, resource_id, metric_name,
|
self._gnocchi.post_measure(resource_type, resource_id,
|
||||||
measures)
|
metric_name, measures)
|
||||||
except NoSuchMetric:
|
except gnocchi_client.NoSuchMetric:
|
||||||
# TODO(sileht): Make gnocchi smarter to be able to detect 404
|
# TODO(sileht): Make gnocchi smarter to be able to detect 404
|
||||||
# for 'resource doesn't exist' and for 'metric doesn't exist'
|
# for 'resource doesn't exist' and for 'metric doesn't exist'
|
||||||
# https://bugs.launchpad.net/gnocchi/+bug/1476186
|
# https://bugs.launchpad.net/gnocchi/+bug/1476186
|
||||||
@ -369,122 +315,32 @@ class GnocchiDispatcher(dispatcher.Base):
|
|||||||
metric_name)
|
metric_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._post_measure(resource_type, resource_id,
|
self._gnocchi.post_measure(resource_type, resource_id,
|
||||||
metric_name, measures)
|
metric_name, measures)
|
||||||
except NoSuchMetric:
|
except gnocchi_client.NoSuchMetric:
|
||||||
LOG.error(_LE("Fail to post measures for "
|
LOG.error(_LE("Fail to post measures for "
|
||||||
"%(resource_id)s/%(metric_name)s") %
|
"%(resource_id)s/%(metric_name)s") %
|
||||||
dict(resource_id=resource_id,
|
dict(resource_id=resource_id,
|
||||||
metric_name=metric_name))
|
metric_name=metric_name))
|
||||||
|
|
||||||
if resource_extra:
|
if resource_extra:
|
||||||
self._update_resource(resource_type, resource_id, resource_extra)
|
self._gnocchi.update_resource(resource_type, resource_id,
|
||||||
|
resource_extra)
|
||||||
|
|
||||||
def _ensure_resource_and_metric(self, resource_type, resource,
|
def _ensure_resource_and_metric(self, resource_type, resource,
|
||||||
metric_name):
|
metric_name):
|
||||||
try:
|
try:
|
||||||
self._create_resource(resource_type, resource)
|
self._gnocchi.create_resource(resource_type, resource)
|
||||||
except ResourceAlreadyExists:
|
except gnocchi_client.ResourceAlreadyExists:
|
||||||
try:
|
try:
|
||||||
archive_policy = resource['metrics'][metric_name]
|
archive_policy = resource['metrics'][metric_name]
|
||||||
self._create_metric(resource_type, resource['id'],
|
self._gnocchi.create_metric(resource_type, resource['id'],
|
||||||
metric_name, archive_policy)
|
metric_name, archive_policy)
|
||||||
except MetricAlreadyExists:
|
except gnocchi_client.MetricAlreadyExists:
|
||||||
# NOTE(sileht): Just ignore the metric have been
|
# NOTE(sileht): Just ignore the metric have been
|
||||||
# created in the meantime.
|
# created in the meantime.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _post_measure(self, resource_type, resource_id, metric_name,
|
|
||||||
measure_attributes):
|
|
||||||
r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric/%s/measures"
|
|
||||||
% (self.gnocchi_url, resource_type,
|
|
||||||
resource_id, metric_name),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(measure_attributes))
|
|
||||||
if r.status_code == 404:
|
|
||||||
LOG.debug(_("The metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s doesn't exists: "
|
|
||||||
"%(status_code)d"),
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code})
|
|
||||||
raise NoSuchMetric
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedWorkflowError(
|
|
||||||
_("Fail to post measure on metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s with status: "
|
|
||||||
"%(status_code)d: %(msg)s") %
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Measure posted on metric %s of resource %s",
|
|
||||||
metric_name, resource_id)
|
|
||||||
|
|
||||||
def _create_resource(self, resource_type, resource):
|
|
||||||
r = self.gnocchi_api.post("%s/v1/resource/%s"
|
|
||||||
% (self.gnocchi_url, resource_type),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(resource))
|
|
||||||
if r.status_code == 409:
|
|
||||||
LOG.debug("Resource %s already exists", resource['id'])
|
|
||||||
raise ResourceAlreadyExists
|
|
||||||
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedWorkflowError(
|
|
||||||
_("Resource %(resource_id)s creation failed with "
|
|
||||||
"status: %(status_code)d: %(msg)s") %
|
|
||||||
{'resource_id': resource['id'],
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Resource %s created", resource['id'])
|
|
||||||
|
|
||||||
def _update_resource(self, resource_type, resource_id,
|
|
||||||
resource_extra):
|
|
||||||
r = self.gnocchi_api.patch(
|
|
||||||
"%s/v1/resource/%s/%s"
|
|
||||||
% (self.gnocchi_url, resource_type, resource_id),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(resource_extra))
|
|
||||||
|
|
||||||
if r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedWorkflowError(
|
|
||||||
_("Resource %(resource_id)s update failed with "
|
|
||||||
"status: %(status_code)d: %(msg)s") %
|
|
||||||
{'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Resource %s updated", resource_id)
|
|
||||||
|
|
||||||
def _create_metric(self, resource_type, resource_id, metric_name,
|
|
||||||
archive_policy):
|
|
||||||
params = {metric_name: archive_policy}
|
|
||||||
r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric"
|
|
||||||
% (self.gnocchi_url, resource_type,
|
|
||||||
resource_id),
|
|
||||||
headers=self._get_headers(),
|
|
||||||
data=json.dumps(params))
|
|
||||||
if r.status_code == 409:
|
|
||||||
LOG.debug("Metric %s of resource %s already exists",
|
|
||||||
metric_name, resource_id)
|
|
||||||
raise MetricAlreadyExists
|
|
||||||
|
|
||||||
elif r.status_code // 100 != 2:
|
|
||||||
raise UnexpectedWorkflowError(
|
|
||||||
_("Fail to create metric %(metric_name)s of "
|
|
||||||
"resource %(resource_id)s with status: "
|
|
||||||
"%(status_code)d: %(msg)s") %
|
|
||||||
{'metric_name': metric_name,
|
|
||||||
'resource_id': resource_id,
|
|
||||||
'status_code': r.status_code,
|
|
||||||
'msg': r.text})
|
|
||||||
else:
|
|
||||||
LOG.debug("Metric %s of resource %s created",
|
|
||||||
metric_name, resource_id)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def record_events(events):
|
def record_events(events):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
197
ceilometer/dispatcher/gnocchi_client.py
Normal file
197
ceilometer/dispatcher/gnocchi_client.py
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
#
|
||||||
|
# Copyright 2015 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import json
|
||||||
|
|
||||||
|
from oslo_log import log
|
||||||
|
import requests
|
||||||
|
import retrying
|
||||||
|
|
||||||
|
from ceilometer.i18n import _
|
||||||
|
from ceilometer import keystone_client
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class UnexpectedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AuthenticationError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NoSuchMetric(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MetricAlreadyExists(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NoSuchResource(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceAlreadyExists(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def retry_if_authentication_error(exception):
|
||||||
|
return isinstance(exception, AuthenticationError)
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_retry_if_authentication_error():
|
||||||
|
return retrying.retry(retry_on_exception=retry_if_authentication_error,
|
||||||
|
wait_fixed=2000,
|
||||||
|
stop_max_delay=60000)
|
||||||
|
|
||||||
|
|
||||||
|
class GnocchiSession(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._session = requests.session()
|
||||||
|
# NOTE(sileht): wait when the pool is empty
|
||||||
|
# instead of raising errors.
|
||||||
|
adapter = requests.adapters.HTTPAdapter(
|
||||||
|
pool_block=True)
|
||||||
|
self._session.mount("http://", adapter)
|
||||||
|
self._session.mount("https://", adapter)
|
||||||
|
|
||||||
|
self.post = functools.partial(self._do_method, method='post')
|
||||||
|
self.patch = functools.partial(self._do_method, method='patch')
|
||||||
|
|
||||||
|
def _do_method(self, *args, **kwargs):
|
||||||
|
method = kwargs.pop('method')
|
||||||
|
try:
|
||||||
|
response = getattr(self._session, method)(*args, **kwargs)
|
||||||
|
except requests.ConnectionError as e:
|
||||||
|
raise UnexpectedError("Connection error: %s " % e)
|
||||||
|
|
||||||
|
if response.status_code == 401:
|
||||||
|
LOG.info("Authentication failure, retrying...")
|
||||||
|
raise AuthenticationError()
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
class Client(object):
|
||||||
|
def __init__(self, url):
|
||||||
|
self._gnocchi_url = url
|
||||||
|
self._ks_client = keystone_client.get_client()
|
||||||
|
self._session = GnocchiSession()
|
||||||
|
|
||||||
|
def _get_headers(self, content_type="application/json"):
|
||||||
|
return {
|
||||||
|
'Content-Type': content_type,
|
||||||
|
'X-Auth-Token': self._ks_client.auth_token,
|
||||||
|
}
|
||||||
|
|
||||||
|
@maybe_retry_if_authentication_error()
|
||||||
|
def post_measure(self, resource_type, resource_id, metric_name,
|
||||||
|
measure_attributes):
|
||||||
|
r = self._session.post("%s/v1/resource/%s/%s/metric/%s/measures"
|
||||||
|
% (self._gnocchi_url, resource_type,
|
||||||
|
resource_id, metric_name),
|
||||||
|
headers=self._get_headers(),
|
||||||
|
data=json.dumps(measure_attributes))
|
||||||
|
|
||||||
|
if r.status_code == 404:
|
||||||
|
LOG.debug(_("The metric %(metric_name)s of "
|
||||||
|
"resource %(resource_id)s doesn't exists: "
|
||||||
|
"%(status_code)d"),
|
||||||
|
{'metric_name': metric_name,
|
||||||
|
'resource_id': resource_id,
|
||||||
|
'status_code': r.status_code})
|
||||||
|
raise NoSuchMetric
|
||||||
|
elif r.status_code // 100 != 2:
|
||||||
|
raise UnexpectedError(
|
||||||
|
_("Fail to post measure on metric %(metric_name)s of "
|
||||||
|
"resource %(resource_id)s with status: "
|
||||||
|
"%(status_code)d: %(msg)s") %
|
||||||
|
{'metric_name': metric_name,
|
||||||
|
'resource_id': resource_id,
|
||||||
|
'status_code': r.status_code,
|
||||||
|
'msg': r.text})
|
||||||
|
else:
|
||||||
|
LOG.debug("Measure posted on metric %s of resource %s",
|
||||||
|
metric_name, resource_id)
|
||||||
|
|
||||||
|
@maybe_retry_if_authentication_error()
|
||||||
|
def create_resource(self, resource_type, resource):
|
||||||
|
r = self._session.post("%s/v1/resource/%s"
|
||||||
|
% (self._gnocchi_url, resource_type),
|
||||||
|
headers=self._get_headers(),
|
||||||
|
data=json.dumps(resource))
|
||||||
|
|
||||||
|
if r.status_code == 409:
|
||||||
|
LOG.debug("Resource %s already exists", resource['id'])
|
||||||
|
raise ResourceAlreadyExists
|
||||||
|
|
||||||
|
elif r.status_code // 100 != 2:
|
||||||
|
raise UnexpectedError(
|
||||||
|
_("Resource %(resource_id)s creation failed with "
|
||||||
|
"status: %(status_code)d: %(msg)s") %
|
||||||
|
{'resource_id': resource['id'],
|
||||||
|
'status_code': r.status_code,
|
||||||
|
'msg': r.text})
|
||||||
|
else:
|
||||||
|
LOG.debug("Resource %s created", resource['id'])
|
||||||
|
|
||||||
|
@maybe_retry_if_authentication_error()
|
||||||
|
def update_resource(self, resource_type, resource_id,
|
||||||
|
resource_extra):
|
||||||
|
r = self._session.patch(
|
||||||
|
"%s/v1/resource/%s/%s"
|
||||||
|
% (self._gnocchi_url, resource_type, resource_id),
|
||||||
|
headers=self._get_headers(),
|
||||||
|
data=json.dumps(resource_extra))
|
||||||
|
|
||||||
|
if r.status_code // 100 != 2:
|
||||||
|
raise UnexpectedError(
|
||||||
|
_("Resource %(resource_id)s update failed with "
|
||||||
|
"status: %(status_code)d: %(msg)s") %
|
||||||
|
{'resource_id': resource_id,
|
||||||
|
'status_code': r.status_code,
|
||||||
|
'msg': r.text})
|
||||||
|
else:
|
||||||
|
LOG.debug("Resource %s updated", resource_id)
|
||||||
|
|
||||||
|
@maybe_retry_if_authentication_error()
|
||||||
|
def create_metric(self, resource_type, resource_id, metric_name,
|
||||||
|
archive_policy):
|
||||||
|
params = {metric_name: archive_policy}
|
||||||
|
r = self._session.post("%s/v1/resource/%s/%s/metric"
|
||||||
|
% (self._gnocchi_url, resource_type,
|
||||||
|
resource_id),
|
||||||
|
headers=self._get_headers(),
|
||||||
|
data=json.dumps(params))
|
||||||
|
if r.status_code == 409:
|
||||||
|
LOG.debug("Metric %s of resource %s already exists",
|
||||||
|
metric_name, resource_id)
|
||||||
|
raise MetricAlreadyExists
|
||||||
|
|
||||||
|
elif r.status_code // 100 != 2:
|
||||||
|
raise UnexpectedError(
|
||||||
|
_("Fail to create metric %(metric_name)s of "
|
||||||
|
"resource %(resource_id)s with status: "
|
||||||
|
"%(status_code)d: %(msg)s") %
|
||||||
|
{'metric_name': metric_name,
|
||||||
|
'resource_id': resource_id,
|
||||||
|
'status_code': r.status_code,
|
||||||
|
'msg': r.text})
|
||||||
|
else:
|
||||||
|
LOG.debug("Metric %s of resource %s created",
|
||||||
|
metric_name, resource_id)
|
@ -306,6 +306,8 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
measure_retry=500, patch_resource=None)),
|
measure_retry=500, patch_resource=None)),
|
||||||
('measure_fail', dict(measure=500, post_resource=None, metric=None,
|
('measure_fail', dict(measure=500, post_resource=None, metric=None,
|
||||||
measure_retry=None, patch_resource=None)),
|
measure_retry=None, patch_resource=None)),
|
||||||
|
('measure_auth', dict(measure=401, post_resource=None, metric=None,
|
||||||
|
measure_retry=None, patch_resource=204)),
|
||||||
]
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -326,6 +328,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'ceilometer.keystone_client.get_client',
|
'ceilometer.keystone_client.get_client',
|
||||||
return_value=ks_client))
|
return_value=ks_client))
|
||||||
|
self.ks_client = ks_client
|
||||||
|
|
||||||
ceilometer_service.prepare_service([])
|
ceilometer_service.prepare_service([])
|
||||||
self.conf.config(
|
self.conf.config(
|
||||||
@ -334,12 +337,14 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
group="dispatcher_gnocchi"
|
group="dispatcher_gnocchi"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
|
|
||||||
self.sample['resource_id'] = str(uuid.uuid4())
|
self.sample['resource_id'] = str(uuid.uuid4())
|
||||||
|
|
||||||
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
|
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
|
||||||
@mock.patch('ceilometer.dispatcher.gnocchi.requests')
|
@mock.patch('ceilometer.dispatcher.gnocchi_client.LOG')
|
||||||
def test_workflow(self, fake_requests, logger):
|
@mock.patch('ceilometer.dispatcher.gnocchi_client.requests')
|
||||||
|
def test_workflow(self, fake_requests, client_logger, logger):
|
||||||
|
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
|
||||||
|
|
||||||
base_url = self.dispatcher.conf.dispatcher_gnocchi.url
|
base_url = self.dispatcher.conf.dispatcher_gnocchi.url
|
||||||
url_params = {
|
url_params = {
|
||||||
'url': urlparse.urljoin(base_url, '/v1/resource'),
|
'url': urlparse.urljoin(base_url, '/v1/resource'),
|
||||||
@ -350,14 +355,13 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
headers = {'Content-Type': 'application/json',
|
headers = {'Content-Type': 'application/json',
|
||||||
'X-Auth-Token': 'fake_token'}
|
'X-Auth-Token': 'fake_token'}
|
||||||
|
|
||||||
expected_calls = []
|
|
||||||
patch_responses = []
|
patch_responses = []
|
||||||
post_responses = []
|
post_responses = []
|
||||||
|
|
||||||
# This is needed to mock Exception in py3
|
# This is needed to mock Exception in py3
|
||||||
fake_requests.ConnectionError = requests.ConnectionError
|
fake_requests.ConnectionError = requests.ConnectionError
|
||||||
|
|
||||||
expected_calls.extend([
|
expected_calls = [
|
||||||
mock.call.session(),
|
mock.call.session(),
|
||||||
mock.call.adapters.HTTPAdapter(pool_block=True),
|
mock.call.adapters.HTTPAdapter(pool_block=True),
|
||||||
mock.call.session().mount('http://', mock.ANY),
|
mock.call.session().mount('http://', mock.ANY),
|
||||||
@ -367,9 +371,26 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
"metric/%(metric_name)s/measures" % url_params,
|
"metric/%(metric_name)s/measures" % url_params,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
data=json_matcher(self.measures_attributes))
|
data=json_matcher(self.measures_attributes))
|
||||||
])
|
|
||||||
|
]
|
||||||
post_responses.append(MockResponse(self.measure))
|
post_responses.append(MockResponse(self.measure))
|
||||||
|
|
||||||
|
if self.measure == 401:
|
||||||
|
type(self.ks_client).auth_token = mock.PropertyMock(
|
||||||
|
side_effect=['fake_token', 'new_token', 'new_token',
|
||||||
|
'new_token', 'new_token'])
|
||||||
|
headers = {'Content-Type': 'application/json',
|
||||||
|
'X-Auth-Token': 'new_token'}
|
||||||
|
|
||||||
|
expected_calls.append(
|
||||||
|
mock.call.session().post(
|
||||||
|
"%(url)s/%(resource_type)s/%(resource_id)s/"
|
||||||
|
"metric/%(metric_name)s/measures" % url_params,
|
||||||
|
headers=headers,
|
||||||
|
data=json_matcher(self.measures_attributes)))
|
||||||
|
|
||||||
|
post_responses.append(MockResponse(200))
|
||||||
|
|
||||||
if self.post_resource:
|
if self.post_resource:
|
||||||
attributes = self.postable_attributes.copy()
|
attributes = self.postable_attributes.copy()
|
||||||
attributes.update(self.patchable_attributes)
|
attributes.update(self.patchable_attributes)
|
||||||
@ -442,10 +463,10 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
|||||||
self.sample['resource_id'],
|
self.sample['resource_id'],
|
||||||
500))
|
500))
|
||||||
elif self.patch_resource == 204 and self.patchable_attributes:
|
elif self.patch_resource == 204 and self.patchable_attributes:
|
||||||
logger.debug.assert_called_with(
|
client_logger.debug.assert_called_with(
|
||||||
'Resource %s updated', self.sample['resource_id'])
|
'Resource %s updated', self.sample['resource_id'])
|
||||||
else:
|
elif self.measure == 200:
|
||||||
logger.debug.assert_called_with(
|
client_logger.debug.assert_called_with(
|
||||||
"Measure posted on metric %s of resource %s",
|
"Measure posted on metric %s of resource %s",
|
||||||
self.sample['counter_name'],
|
self.sample['counter_name'],
|
||||||
self.sample['resource_id'])
|
self.sample['resource_id'])
|
||||||
|
Loading…
Reference in New Issue
Block a user