Fixing issue with API reconnection
The agent had issues with the token when the API would go down and come back up again Added a message queue to buffer metrics when the API was down or unreachable Removed some excess logging of messages in debug Change-Id: Ib1300bc0c66e00b06302b2b2f875f991520c9f1f
This commit is contained in:
parent
f45da9f4b8
commit
95b38bcd39
@ -9,8 +9,19 @@ password: {args.password}
|
||||
# Keystone API URL: URL for the Keystone server to use
|
||||
# Example: https://region-a.geo-1.identity.hpcloudsvc.com:35357/v3/
|
||||
keystone_url: {args.keystone_url}
|
||||
# Project name to be used by this agent
|
||||
project_name: {args.project_name}
|
||||
|
||||
# The following 2 options are for handling buffering and reconnection to the monasca-api
|
||||
# If you want the messages to be sent as fast as possible, set these two options to
|
||||
# the same number. If you have a larger system with many agents, you may want to throttle
|
||||
# the number of messages sent to the API by setting the backlog_send_rate to a lower number.
|
||||
|
||||
# Maximum number of messages to buffer when unable to communicate with the monasca-api
|
||||
max_buffer_size: 1000
|
||||
# Maximum number of messages to send at one time when communication with the monasca-api is restored
|
||||
backlog_send_rate: 1000
|
||||
|
||||
[Main]
|
||||
|
||||
# Force the hostname to whatever you want.
|
||||
|
@ -858,7 +858,9 @@ def get_mon_api_config(config):
|
||||
'password': False,
|
||||
'use_keystone': True,
|
||||
'keystone_url': '',
|
||||
'dimensions': None}
|
||||
'dimensions': None,
|
||||
'max_buffer_size': 1000,
|
||||
'backlog_send_rate': 5}
|
||||
|
||||
if config.has_option("Main", "dimensions"):
|
||||
# parse comma separated dimensions into a dimension list
|
||||
@ -874,7 +876,9 @@ def get_mon_api_config(config):
|
||||
"username": config.get,
|
||||
"password": config.get,
|
||||
"use_keystone": config.getboolean,
|
||||
"keystone_url": config.get}
|
||||
"keystone_url": config.get,
|
||||
"max_buffer_size": config.getint,
|
||||
"backlog_send_rate": config.getint}
|
||||
|
||||
for name, func in options.iteritems():
|
||||
if config.has_option("Api", name):
|
||||
|
@ -1,11 +1,13 @@
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
from monascaclient import exc as exc, client
|
||||
from monagent.common.keystone import Keystone
|
||||
from monagent.common.util import get_hostname
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
import requests
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class MonAPI(object):
|
||||
|
||||
@ -14,6 +16,8 @@ class MonAPI(object):
|
||||
this is not committed
|
||||
"""
|
||||
|
||||
LOG_INTERVAL = 10
|
||||
|
||||
def __init__(self, config):
|
||||
"""
|
||||
Initialize Mon api client connection.
|
||||
@ -22,7 +26,6 @@ class MonAPI(object):
|
||||
self.url = config['url']
|
||||
self.api_version = '2_0'
|
||||
self.default_dimensions = config['dimensions']
|
||||
self.token_expiration = 1438
|
||||
# Verify the hostname is set as a dimension
|
||||
if 'hostname' not in self.default_dimensions:
|
||||
self.default_dimensions['hostname'] = get_hostname()
|
||||
@ -38,6 +41,9 @@ class MonAPI(object):
|
||||
self.password,
|
||||
self.project_name)
|
||||
self.mon_client = None
|
||||
self.max_buffer_size = config['max_buffer_size']
|
||||
self.backlog_send_rate = config['backlog_send_rate']
|
||||
self.message_queue = deque(maxlen=self.max_buffer_size)
|
||||
|
||||
def _post(self, measurements):
|
||||
"""Does the actual http post
|
||||
@ -47,38 +53,29 @@ class MonAPI(object):
|
||||
kwargs = {
|
||||
'jsonbody': data
|
||||
}
|
||||
try:
|
||||
if not self.mon_client:
|
||||
# construct the mon client
|
||||
self.mon_client = self.get_client()
|
||||
|
||||
done = False
|
||||
while not done:
|
||||
response = self.mon_client.metrics.create(**kwargs)
|
||||
if 200 <= response.status_code <= 299:
|
||||
# Good status from web service
|
||||
log.debug("Message sent successfully: {0}"
|
||||
.format(str(data)))
|
||||
elif 400 <= response.status_code <= 499:
|
||||
# Good status from web service but some type of issue
|
||||
# with the data
|
||||
if response.status_code == 401:
|
||||
# Get a new token/client and retry
|
||||
self.mon_client.replace_token(self.keystone.refresh_token())
|
||||
continue
|
||||
if not self.mon_client:
|
||||
# construct the monasca client
|
||||
self.mon_client = self.get_client()
|
||||
|
||||
if self._send_message(**kwargs):
|
||||
if len(self.message_queue) > 0:
|
||||
messages_sent = 0
|
||||
for index in range(0, len(self.message_queue)):
|
||||
if index < self.backlog_send_rate:
|
||||
msg = self.message_queue.pop()
|
||||
|
||||
if self._send_message(**msg):
|
||||
messages_sent += 1
|
||||
else:
|
||||
self._queue_message(msg)
|
||||
break
|
||||
else:
|
||||
error_msg = "Successful web service call but there" + \
|
||||
" were issues (Status: {0}, Status Message: " + \
|
||||
"{1}, Message Content: {1})"
|
||||
log.error(error_msg.format(response.status_code,
|
||||
response.reason, response.text))
|
||||
response.raise_for_status()
|
||||
else: # Not a good status
|
||||
response.raise_for_status()
|
||||
done = True
|
||||
except exc.HTTPException as he:
|
||||
log.error("Error sending message to mon-api: {0}"
|
||||
.format(str(he.message)))
|
||||
break
|
||||
log.info("Sent {0} messages from the backlog.".format(messages_sent))
|
||||
log.info("{0} messages remaining in the queue.".format(len(self.message_queue)))
|
||||
else:
|
||||
self._queue_message(kwargs.copy())
|
||||
|
||||
def post_metrics(self, measurements):
|
||||
"""post_metrics
|
||||
@ -98,7 +95,7 @@ class MonAPI(object):
|
||||
|
||||
def get_client(self):
|
||||
"""get_client
|
||||
get a new mon-client object
|
||||
get a new monasca-client object
|
||||
"""
|
||||
token = self.keystone.refresh_token()
|
||||
# Re-create the client. This is temporary until
|
||||
@ -108,3 +105,30 @@ class MonAPI(object):
|
||||
'token': token
|
||||
}
|
||||
return client.Client(self.api_version, self.url, **kwargs)
|
||||
|
||||
def _send_message(self, **kwargs):
|
||||
try:
|
||||
self.mon_client.metrics.create(**kwargs)
|
||||
return True
|
||||
except exc.HTTPException as he:
|
||||
if 'unauthorized' in str(he):
|
||||
log.info("Invalid token detected. Getting a new token...")
|
||||
# Get a new token
|
||||
self.mon_client.replace_token(self.keystone.refresh_token())
|
||||
else:
|
||||
log.debug("Error sending message to monasca-api. Error is {0}."
|
||||
.format(str(he.message)))
|
||||
except Exception as ex:
|
||||
log.debug("Error sending message to monasca-api. Error is {0}."
|
||||
.format(str(ex.message)))
|
||||
|
||||
return False
|
||||
|
||||
def _queue_message(self, msg):
|
||||
self.message_queue.append(msg)
|
||||
queue_size = len(self.message_queue)
|
||||
if queue_size is 1 or queue_size % MonAPI.LOG_INTERVAL == 0:
|
||||
log.warn("API is down or unreachable. Queuing the messages to send later...")
|
||||
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
|
||||
self.max_buffer_size))
|
||||
log.info("A message will be logged for every {0} messages queued.".format(MonAPI.LOG_INTERVAL))
|
||||
|
@ -140,7 +140,6 @@ class AgentInputHandler(tornado.web.RequestHandler):
|
||||
# monagent.common.metrics.Measurements expressed as a dict
|
||||
msg = tornado.escape.json_decode(self.request.body)
|
||||
try:
|
||||
log.debug(msg)
|
||||
measurements = [Measurement(**m) for m in msg]
|
||||
except Exception:
|
||||
log.exception('Error parsing body of Agent Input')
|
||||
|
Loading…
Reference in New Issue
Block a user