Intergrate with osc-lib based client
Commits integrates monasca-agent with monascaclient, that is currently using osc-lib. That requires some changes in the initialization model of the client. Details: * no need to check if token has expired (underlying library [osc-lib] will request new token if it is about to expire * initializing the client happens once therefore keystone.Keystone instance can be created lazily (it is singleton, so it might happen it is already available) Extra: * marked all variables as private Depends-On: I1712a24739438e2d8331a495f18f357749a633c5 Depends-On: Iec97e50089ed31ae7ad8244b37cec128817871a5 Depends-On: I579f6bcd5975a32af2a255be41c9b6c4043fa1dc Change-Id: Ifee5b88ccb632222310aafb1081ecb9c9d085150
This commit is contained in:
parent
b71fd4bef4
commit
af806edd55
@ -155,18 +155,14 @@ def get_client(**kwargs):
|
|||||||
|
|
||||||
There are two ways to call this method:
|
There are two ways to call this method:
|
||||||
|
|
||||||
using existing session object (:py:class:`keystoneauth1.session.Session`
|
using existing session object (:py:class:`keystoneauth1.session.Session`::
|
||||||
|
|
||||||
.. code-block:: python
|
>>> s = session.Session(**args)
|
||||||
|
>>> c = get_client(session=s)
|
||||||
|
|
||||||
s = session.Session(**args)
|
initializing new keystone client from credentials::
|
||||||
c = get_client(session=s)
|
|
||||||
|
|
||||||
initializing new keystone client from credentials
|
>>> c = get_client({'username':'mini-mon', 'password':'test', ...})
|
||||||
|
|
||||||
.. code-block:: python
|
|
||||||
|
|
||||||
c = get_client({'username':'mini-mon', 'password':'test', ...})
|
|
||||||
|
|
||||||
:param kwargs: list of arguments passed to method
|
:param kwargs: list of arguments passed to method
|
||||||
:type kwargs: dict
|
:type kwargs: dict
|
||||||
@ -273,9 +269,6 @@ class Keystone(object):
|
|||||||
|
|
||||||
return ks
|
return ks
|
||||||
|
|
||||||
def get_credential_args(self):
|
|
||||||
return self._config
|
|
||||||
|
|
||||||
def get_monasca_url(self):
|
def get_monasca_url(self):
|
||||||
"""Retrieves monasca endpoint url.
|
"""Retrieves monasca endpoint url.
|
||||||
|
|
||||||
@ -318,3 +311,12 @@ class Keystone(object):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
return self._init_client().auth_token
|
return self._init_client().auth_token
|
||||||
|
|
||||||
|
def get_session(self):
|
||||||
|
"""Returns session of this client.
|
||||||
|
|
||||||
|
:return: session instance
|
||||||
|
:rtype: keystoneauth1.session.Session
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self._init_client().session
|
||||||
|
@ -5,11 +5,12 @@ import collections
|
|||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
import time
|
from osc_lib import exceptions
|
||||||
|
|
||||||
|
from monascaclient import client
|
||||||
|
|
||||||
from monasca_agent.common import keystone
|
from monasca_agent.common import keystone
|
||||||
import monascaclient.client
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -22,34 +23,31 @@ class MonascaAPI(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
LOG_INTERVAL = 10 # messages
|
LOG_INTERVAL = 10 # messages
|
||||||
MIN_BACKOFF = 10 # seconds
|
|
||||||
MAX_BACKOFF = 60 # seconds
|
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
"""Initialize Mon api client connection."""
|
"""Initialize Mon api client connection."""
|
||||||
self.config = config
|
self._config = config
|
||||||
self.url = config.get('url', None)
|
self._mon_client = None
|
||||||
self.api_version = '2_0'
|
self._api_version = '2_0'
|
||||||
self.keystone = keystone.Keystone(config)
|
|
||||||
self.mon_client = None
|
|
||||||
self._failure_reason = None
|
self._failure_reason = None
|
||||||
self._resume_time = None
|
|
||||||
self._log_interval_remaining = 1
|
self._log_interval_remaining = 1
|
||||||
self._current_number_measurements = 0
|
self._current_number_measurements = 0
|
||||||
self.max_buffer_size = int(config['max_buffer_size'])
|
self._max_buffer_size = int(config['max_buffer_size'])
|
||||||
self.max_measurement_buffer_size = int(config['max_measurement_buffer_size'])
|
self._max_measurement_buffer_size = int(
|
||||||
|
config['max_measurement_buffer_size'])
|
||||||
|
|
||||||
if self.max_buffer_size > -1:
|
if self._max_buffer_size > -1:
|
||||||
log.debug("'max_buffer_size' is deprecated. Please use"
|
log.debug("'max_buffer_size' is deprecated. Please use"
|
||||||
" 'max_measurement_buffer_size' instead")
|
" 'max_measurement_buffer_size' instead")
|
||||||
if self.max_measurement_buffer_size > -1:
|
if self._max_measurement_buffer_size > -1:
|
||||||
log.debug("Overriding 'max_buffer_size' option with new"
|
log.debug("Overriding 'max_buffer_size' option with new"
|
||||||
" 'max_measurment_buffer_size' option")
|
" 'max_measurment_buffer_size' option")
|
||||||
self.max_buffer_size = -1
|
self._max_buffer_size = -1
|
||||||
|
|
||||||
self.backlog_send_rate = int(config['backlog_send_rate'])
|
self.backlog_send_rate = int(config['backlog_send_rate'])
|
||||||
if self.max_buffer_size > -1:
|
if self._max_buffer_size > -1:
|
||||||
self.message_queue = collections.deque(maxlen=self.max_buffer_size)
|
self.message_queue = collections.deque(maxlen=self._max_buffer_size)
|
||||||
else:
|
else:
|
||||||
self.message_queue = collections.deque()
|
self.message_queue = collections.deque()
|
||||||
self.write_timeout = int(config['write_timeout'])
|
self.write_timeout = int(config['write_timeout'])
|
||||||
@ -70,9 +68,9 @@ class MonascaAPI(object):
|
|||||||
if tenant:
|
if tenant:
|
||||||
kwargs['tenant_id'] = tenant
|
kwargs['tenant_id'] = tenant
|
||||||
|
|
||||||
if not self.mon_client:
|
if not self._mon_client:
|
||||||
self.mon_client = self.get_monclient()
|
self._mon_client = self._get_mon_client()
|
||||||
if not self.mon_client:
|
if not self._mon_client:
|
||||||
# Keystone is down, queue the message
|
# Keystone is down, queue the message
|
||||||
self._queue_message(kwargs.copy(), "Keystone API is down or unreachable")
|
self._queue_message(kwargs.copy(), "Keystone API is down or unreachable")
|
||||||
return
|
return
|
||||||
@ -121,46 +119,28 @@ class MonascaAPI(object):
|
|||||||
for tenant in tenant_group:
|
for tenant in tenant_group:
|
||||||
self._post(tenant_group[tenant], tenant)
|
self._post(tenant_group[tenant], tenant)
|
||||||
|
|
||||||
def get_monclient(self):
|
def _get_mon_client(self):
|
||||||
"""get_monclient
|
k = keystone.Keystone(self._config)
|
||||||
get a new monasca-client object
|
endpoint = k.get_monasca_url()
|
||||||
"""
|
session = k.get_session()
|
||||||
token = self.keystone.get_token()
|
c = client.Client(
|
||||||
if token:
|
api_version=self._api_version,
|
||||||
# Create the client.
|
endpoint=endpoint,
|
||||||
kwargs = self.keystone.get_credential_args()
|
session=session,
|
||||||
kwargs['token'] = token
|
timeout=self.write_timeout,
|
||||||
if not self.url:
|
**keystone.get_args(self._config)
|
||||||
self.url = self.keystone.get_monasca_url()
|
)
|
||||||
|
return c
|
||||||
return monascaclient.client.Client(self.api_version, self.url, write_timeout=self.write_timeout, **kwargs)
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _send_message(self, **kwargs):
|
def _send_message(self, **kwargs):
|
||||||
if self._resume_time:
|
|
||||||
if time.time() > self._resume_time:
|
|
||||||
self._resume_time = None
|
|
||||||
log.debug("Getting new token...")
|
|
||||||
# Get a new keystone client and token
|
|
||||||
if self.keystone.refresh_token():
|
|
||||||
self.mon_client.replace_token(self.keystone.get_token())
|
|
||||||
else:
|
|
||||||
# Return without posting so the monasca client doesn't keep requesting new tokens
|
|
||||||
return False
|
|
||||||
try:
|
try:
|
||||||
self.mon_client.metrics.create(**kwargs)
|
self._mon_client.metrics.create(**kwargs)
|
||||||
return True
|
return True
|
||||||
except monascaclient.exc.HTTPException as ex:
|
except exceptions.ClientException as ex:
|
||||||
if ex.code == 401:
|
log.exception("ClientException: error sending "
|
||||||
# monasca client should already have retried once with a new token before returning this exception
|
"message to monasca-api.")
|
||||||
self._failure_reason = 'Invalid token detected. Waiting to get new token from Keystone'
|
self._failure_reason = ('Error sending message to '
|
||||||
wait_time = random.randint(MonascaAPI.MIN_BACKOFF, MonascaAPI.MAX_BACKOFF + 1)
|
'the Monasca API: {0}').format(str(ex))
|
||||||
self._resume_time = time.time() + wait_time
|
|
||||||
log.info("Invalid token detected. Waiting %d seconds before getting new token.", wait_time)
|
|
||||||
else:
|
|
||||||
log.exception("HTTPException: error sending message to monasca-api.")
|
|
||||||
self._failure_reason = 'Error sending message to the Monasca API: {0}'.format(str(ex.message))
|
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("Error sending message to Monasca API.")
|
log.exception("Error sending message to Monasca API.")
|
||||||
self._failure_reason = 'The Monasca API is DOWN or unreachable'
|
self._failure_reason = 'The Monasca API is DOWN or unreachable'
|
||||||
@ -168,7 +148,7 @@ class MonascaAPI(object):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def _queue_message(self, msg, reason):
|
def _queue_message(self, msg, reason):
|
||||||
if self.max_buffer_size == 0 or self.max_measurement_buffer_size == 0:
|
if self._max_buffer_size == 0 or self._max_measurement_buffer_size == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.message_queue.append(json.dumps(msg))
|
self.message_queue.append(json.dumps(msg))
|
||||||
@ -176,16 +156,16 @@ class MonascaAPI(object):
|
|||||||
for value in msg.values():
|
for value in msg.values():
|
||||||
self._current_number_measurements += len(value)
|
self._current_number_measurements += len(value)
|
||||||
|
|
||||||
if self.max_measurement_buffer_size > -1:
|
if self._max_measurement_buffer_size > -1:
|
||||||
while self._current_number_measurements > self.max_measurement_buffer_size:
|
while self._current_number_measurements > self._max_measurement_buffer_size:
|
||||||
self._remove_oldest_from_queue()
|
self._remove_oldest_from_queue()
|
||||||
|
|
||||||
if self._log_interval_remaining <= 1:
|
if self._log_interval_remaining <= 1:
|
||||||
log.warn("{0}. Queuing the messages to send later...".format(reason))
|
log.warn("{0}. Queuing the messages to send later...".format(reason))
|
||||||
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
|
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
|
||||||
self.max_buffer_size))
|
self._max_buffer_size))
|
||||||
log.info("Current measurements in queue: {0} of {1}".format(
|
log.info("Current measurements in queue: {0} of {1}".format(
|
||||||
self._current_number_measurements, self.max_measurement_buffer_size))
|
self._current_number_measurements, self._max_measurement_buffer_size))
|
||||||
|
|
||||||
log.info("A message will be logged for every {0} messages queued.".format(MonascaAPI.LOG_INTERVAL))
|
log.info("A message will be logged for every {0} messages queued.".format(MonascaAPI.LOG_INTERVAL))
|
||||||
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
|
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
|
||||||
|
Loading…
Reference in New Issue
Block a user