Refactor forwarder
Take out the use of Transaction Manager and Metric Transactions. It now queues the measurements in a list and flushes them to the API on a configured time or size. Change-Id: Icbb4681b578c12dd17b53ba5c582dc64412ad533
This commit is contained in:
parent
673f3cdf74
commit
633dd18196
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
"""
|
||||
Licensed under Simplified BSD License (see LICENSE)
|
||||
(C) Boxed Ice 2010 all rights reserved
|
||||
|
@ -7,7 +7,6 @@
|
|||
"""
|
||||
|
||||
# Standard imports
|
||||
import datetime
|
||||
import logging
|
||||
import signal
|
||||
import socket
|
||||
|
@ -35,81 +34,50 @@ import monasca_agent.common.config as cfg
|
|||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
import monasca_agent.forwarder.api.monasca_api as mon
|
||||
import monasca_agent.forwarder.transaction as transaction
|
||||
|
||||
log = logging.getLogger('forwarder')
|
||||
|
||||
# Maximum delay before replaying a transaction
|
||||
MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90)
|
||||
# Max amount of iterations to wait to meet min batch size before flushing
|
||||
MAX_FLUSH_ATTEMPTS = 3
|
||||
|
||||
# Maximum queue size in bytes (when this is reached, old messages are dropped)
|
||||
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB
|
||||
MIN_BATCH_SIZE = 200
|
||||
|
||||
THROTTLING_DELAY = datetime.timedelta(microseconds=1000000 / 2) # 2 msg/second
|
||||
message_batch = []
|
||||
|
||||
|
||||
class StatusHandler(tornado.web.RequestHandler):
|
||||
|
||||
def get(self):
|
||||
threshold = int(self.get_argument('threshold', -1))
|
||||
|
||||
m = transaction.MetricTransaction.get_tr_manager()
|
||||
|
||||
self.write(
|
||||
"<table><tr><td>Id</td><td>Size</td><td>Error count</td><td>Next flush</td></tr>")
|
||||
transactions = m.get_transactions()
|
||||
for tr in transactions:
|
||||
self.write("<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" %
|
||||
(tr.get_id(), tr.get_size(), tr.get_error_count(), tr.get_next_flush()))
|
||||
self.write("</table>")
|
||||
|
||||
if threshold >= 0:
|
||||
if len(transactions) > threshold:
|
||||
self.set_status(503)
|
||||
# In seconds
|
||||
FLUSH_INTERVAL = 1
|
||||
|
||||
|
||||
class AgentInputHandler(tornado.web.RequestHandler):
|
||||
|
||||
def post(self):
|
||||
"""Read the message and forward it to the intake
|
||||
The message is expected to follow the format:
|
||||
|
||||
"""Read the message and add it to the batch.
|
||||
Batch will be sent to Monasca API once the batch size or max wait time
|
||||
has been reached. Whichever one first.
|
||||
"""
|
||||
# read the message it should be a list of
|
||||
# monasca_agent.common.metrics.Measurements expressed as a dict
|
||||
global message_batch
|
||||
|
||||
msg = tornado.escape.json_decode(self.request.body)
|
||||
try:
|
||||
measurements = [metrics.Measurement(**m) for m in msg]
|
||||
message_batch.extend([metrics.Measurement(**m) for m in msg])
|
||||
except Exception:
|
||||
log.exception('Error parsing body of Agent Input')
|
||||
raise tornado.web.HTTPError(500)
|
||||
|
||||
headers = self.request.headers
|
||||
|
||||
if len(measurements) > 0:
|
||||
# Setup a transaction for this message
|
||||
tr = transaction.MetricTransaction(measurements, headers)
|
||||
else:
|
||||
raise tornado.web.HTTPError(500)
|
||||
|
||||
self.write("Transaction: %s" % tr.get_id())
|
||||
|
||||
|
||||
class Forwarder(tornado.web.Application):
|
||||
|
||||
def __init__(self, port, agent_config, skip_ssl_validation=False,
|
||||
use_simple_http_client=False):
|
||||
|
||||
self._unflushed_iterations = 0
|
||||
self._endpoint = mon.MonascaAPI(agent_config)
|
||||
|
||||
self._ioloop = None
|
||||
|
||||
self._port = int(port)
|
||||
self._agent_config = agent_config
|
||||
self.flush_interval = (int(agent_config.get('check_freq')) / 2) * 1000
|
||||
self._metrics = {}
|
||||
transaction.MetricTransaction.set_application(self)
|
||||
transaction.MetricTransaction.set_endpoints(mon.MonascaAPI(agent_config))
|
||||
self._tr_manager = transaction.TransactionManager(MAX_WAIT_FOR_REPLAY,
|
||||
MAX_QUEUE_SIZE,
|
||||
THROTTLING_DELAY,
|
||||
agent_config)
|
||||
transaction.MetricTransaction.set_tr_manager(self._tr_manager)
|
||||
self._flush_interval = FLUSH_INTERVAL * 1000
|
||||
self._non_local_traffic = agent_config.get("non_local_traffic", False)
|
||||
|
||||
logging.getLogger().setLevel(agent_config.get('log_level', logging.INFO))
|
||||
|
||||
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
|
||||
'skip_ssl_validation', False)
|
||||
|
@ -117,13 +85,6 @@ class Forwarder(tornado.web.Application):
|
|||
if self.skip_ssl_validation:
|
||||
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
|
||||
|
||||
def _post_metrics(self):
|
||||
|
||||
if len(self._metrics) > 0:
|
||||
transaction.MetricTransaction(self._metrics, headers={'Content-Type': 'application/json'})
|
||||
self._metrics = {}
|
||||
|
||||
# todo why is the tornado logging method overridden? Perhaps ditch this.
|
||||
def log_request(self, handler):
|
||||
"""Override the tornado logging method.
|
||||
If everything goes well, log level is DEBUG.
|
||||
|
@ -139,11 +100,9 @@ class Forwarder(tornado.web.Application):
|
|||
log_method("%d %s %.2fms", handler.get_status(),
|
||||
handler._request_summary(), request_time)
|
||||
|
||||
def run(self):
|
||||
def _add_tornado_handlers(self):
|
||||
handlers = [
|
||||
(r"/intake/?", AgentInputHandler),
|
||||
(r"/api/v1/series/?", AgentInputHandler),
|
||||
(r"/status/?", StatusHandler),
|
||||
(r"/intake/?", AgentInputHandler)
|
||||
]
|
||||
|
||||
settings = dict(
|
||||
|
@ -153,14 +112,12 @@ class Forwarder(tornado.web.Application):
|
|||
log_function=self.log_request
|
||||
)
|
||||
|
||||
non_local_traffic = self._agent_config.get("non_local_traffic", False)
|
||||
|
||||
tornado.web.Application.__init__(self, handlers, **settings)
|
||||
http_server = tornado.httpserver.HTTPServer(self)
|
||||
|
||||
def _bind_http_server(self, http_server):
|
||||
try:
|
||||
# non_local_traffic must be == True to match, not just some non-false value
|
||||
if non_local_traffic is True:
|
||||
if self._non_local_traffic is True:
|
||||
http_server.listen(self._port)
|
||||
else:
|
||||
# localhost in lieu of 127.0.0.1 to support IPv6
|
||||
|
@ -186,26 +143,43 @@ class Forwarder(tornado.web.Application):
|
|||
|
||||
log.info("Listening on port %d" % self._port)
|
||||
|
||||
# Register callbacks
|
||||
self.mloop = util.get_tornado_ioloop()
|
||||
def _post_metrics(self):
|
||||
global message_batch
|
||||
self._endpoint.post_metrics(message_batch)
|
||||
log.info("wrote {}".format(len(message_batch)))
|
||||
message_batch = []
|
||||
self._unflushed_iterations = 0
|
||||
|
||||
logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO))
|
||||
|
||||
def flush_trs():
|
||||
def flush(self):
|
||||
if not message_batch:
|
||||
return
|
||||
if len(message_batch) >= MIN_BATCH_SIZE or self._unflushed_iterations >= MAX_FLUSH_ATTEMPTS:
|
||||
self._post_metrics()
|
||||
self._tr_manager.flush()
|
||||
else:
|
||||
self._unflushed_iterations += 1
|
||||
|
||||
tr_sched = tornado.ioloop.PeriodicCallback(
|
||||
flush_trs, self.flush_interval, io_loop=self.mloop)
|
||||
def run(self):
|
||||
log.info("Forwarder RUN")
|
||||
self._add_tornado_handlers()
|
||||
|
||||
# Start everything
|
||||
tr_sched.start()
|
||||
http_server = tornado.httpserver.HTTPServer(self)
|
||||
self._bind_http_server(http_server)
|
||||
|
||||
self._ioloop = util.get_tornado_ioloop()
|
||||
|
||||
callback = tornado.ioloop.PeriodicCallback(self.flush,
|
||||
self._flush_interval,
|
||||
io_loop=self._ioloop)
|
||||
|
||||
callback.start()
|
||||
|
||||
self._ioloop.start()
|
||||
|
||||
self.mloop.start()
|
||||
log.info("Stopped")
|
||||
|
||||
def stop(self):
|
||||
self.mloop.stop()
|
||||
if self._ioloop:
|
||||
self._ioloop.stop()
|
||||
|
||||
|
||||
def init_forwarder(skip_ssl_validation=False, use_simple_http_client=False):
|
||||
|
|
|
@ -1,289 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# stdlib
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from operator import attrgetter
|
||||
import sys
|
||||
import time
|
||||
|
||||
# project
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
FLUSH_LOGGING_PERIOD = 20
|
||||
FLUSH_LOGGING_INITIAL = 5
|
||||
|
||||
|
||||
class Transaction(object):
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self._id = None
|
||||
self._error_count = 0
|
||||
self._next_flush = datetime.now()
|
||||
self._size = None
|
||||
|
||||
def get_id(self):
|
||||
return self._id
|
||||
|
||||
def set_id(self, new_id):
|
||||
assert self._id is None
|
||||
self._id = new_id
|
||||
|
||||
def inc_error_count(self):
|
||||
self._error_count += 1
|
||||
|
||||
def get_error_count(self):
|
||||
return self._error_count
|
||||
|
||||
def get_size(self):
|
||||
if self._size is None:
|
||||
self._size = sys.getsizeof(self)
|
||||
|
||||
return self._size
|
||||
|
||||
def get_next_flush(self):
|
||||
return self._next_flush
|
||||
|
||||
def compute_next_flush(self, max_delay):
|
||||
# Transactions are replayed, try to send them faster for newer transactions
|
||||
# Send them every MAX_WAIT_FOR_REPLAY at most
|
||||
td = timedelta(seconds=self._error_count * 20)
|
||||
if td > max_delay:
|
||||
td = max_delay
|
||||
|
||||
newdate = datetime.now() + td
|
||||
self._next_flush = newdate.replace(microsecond=0)
|
||||
|
||||
def time_to_flush(self, now=datetime.now()):
|
||||
return self._next_flush < now
|
||||
|
||||
def flush(self):
|
||||
raise NotImplementedError("To be implemented in a subclass")
|
||||
|
||||
|
||||
class MetricTransaction(Transaction):
|
||||
|
||||
_application = None
|
||||
_trManager = None
|
||||
_endpoints = []
|
||||
|
||||
@classmethod
|
||||
def set_application(cls, app):
|
||||
cls._application = app
|
||||
|
||||
@classmethod
|
||||
def set_tr_manager(cls, manager):
|
||||
cls._trManager = manager
|
||||
|
||||
@classmethod
|
||||
def get_tr_manager(cls):
|
||||
return cls._trManager
|
||||
|
||||
@classmethod
|
||||
def set_endpoints(cls, endpoint):
|
||||
# todo we only have one endpoint option, generalize it better
|
||||
# the immediate use case for two endpoints could be our own monitoring boxes, they could send to
|
||||
# the main api and mini-mon api
|
||||
cls._endpoints.append(endpoint)
|
||||
|
||||
def __init__(self, data, headers):
|
||||
self._data = data
|
||||
self._headers = headers
|
||||
|
||||
# Call after data has been set (size is computed in Transaction's init)
|
||||
Transaction.__init__(self)
|
||||
|
||||
# Insert the transaction in the Manager
|
||||
self._trManager.append(self)
|
||||
log.debug("Created transaction %d" % self.get_id())
|
||||
|
||||
def __sizeof__(self):
|
||||
return sys.getsizeof(self._data)
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
for endpoint in self._endpoints:
|
||||
endpoint.post_metrics(self._data)
|
||||
except Exception:
|
||||
log.exception('Error flushing metrics to remote endpoints')
|
||||
self._trManager.tr_error(self)
|
||||
else:
|
||||
self._trManager.tr_success(self)
|
||||
self._trManager.flush_next()
|
||||
|
||||
|
||||
class TransactionManager(util.Dimensions):
|
||||
"""Holds any transaction derived object list and make sure they
|
||||
are all committed, without exceeding parameters (throttling, memory consumption)
|
||||
"""
|
||||
|
||||
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay, agent_config):
|
||||
super(TransactionManager, self).__init__(agent_config)
|
||||
self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay
|
||||
self._MAX_QUEUE_SIZE = max_queue_size
|
||||
self._THROTTLING_DELAY = throttling_delay
|
||||
|
||||
self._flush_without_ioloop = False # useful for tests
|
||||
|
||||
self._transactions = [] # List of all non committed transactions
|
||||
self._total_count = 0 # Maintain size/count not to recompute it everytime
|
||||
self._total_size = 0
|
||||
self._flush_count = 0
|
||||
self._transactions_received = 0
|
||||
self._transactions_flushed = 0
|
||||
|
||||
# Global counter to assign a number to each transaction: we may have an issue
|
||||
# if this overlaps
|
||||
self._counter = 0
|
||||
|
||||
self._trs_to_flush = None # Current transactions being flushed
|
||||
self._last_flush = datetime.now() # Last flush (for throttling)
|
||||
|
||||
# Track an initial status message.
|
||||
check_status.ForwarderStatus().persist()
|
||||
|
||||
def get_transactions(self):
|
||||
return self._transactions
|
||||
|
||||
def print_queue_stats(self):
|
||||
log.debug("Queue size: at %s, %s transaction(s), %s KB" %
|
||||
(time.time(), self._total_count, (self._total_size / 1024)))
|
||||
|
||||
def get_tr_id(self):
|
||||
self._counter += 1
|
||||
return self._counter
|
||||
|
||||
def append(self, tr):
|
||||
|
||||
# Give the transaction an id
|
||||
tr.set_id(self.get_tr_id())
|
||||
|
||||
# Check the size
|
||||
tr_size = tr.get_size()
|
||||
|
||||
log.debug("New transaction to add, total size of queue would be: %s KB" %
|
||||
((self._total_size + tr_size) / 1024))
|
||||
|
||||
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
|
||||
log.warn("Queue is too big, removing old transactions...")
|
||||
new_trs = sorted(self._transactions, key=attrgetter('_next_flush'), reverse=True)
|
||||
for tr2 in new_trs:
|
||||
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
|
||||
self._transactions.remove(tr2)
|
||||
self._total_count -= 1
|
||||
self._total_size = self._total_size - tr2.get_size()
|
||||
log.warn("Removed transaction %s from queue" % tr2.get_id())
|
||||
|
||||
# Done
|
||||
self._transactions.append(tr)
|
||||
self._total_count += 1
|
||||
self._transactions_received += 1
|
||||
self._total_size = self._total_size + tr_size
|
||||
|
||||
log.debug("Transaction %s added" % (tr.get_id()))
|
||||
self.print_queue_stats()
|
||||
|
||||
def flush(self):
|
||||
|
||||
if self._trs_to_flush is not None:
|
||||
log.debug("A flush is already in progress, not doing anything")
|
||||
return
|
||||
|
||||
to_flush = []
|
||||
# Do we have something to do ?
|
||||
now = datetime.now()
|
||||
for tr in self._transactions:
|
||||
if tr.time_to_flush(now):
|
||||
to_flush.append(tr)
|
||||
|
||||
count = len(to_flush)
|
||||
should_log = self._flush_count + 1 <= FLUSH_LOGGING_INITIAL or \
|
||||
(self._flush_count + 1) % FLUSH_LOGGING_PERIOD == 0
|
||||
if count > 0:
|
||||
if should_log:
|
||||
log.info("Flushing %s transaction%s during flush #%s" %
|
||||
(count, util.plural(count), str(self._flush_count + 1)))
|
||||
else:
|
||||
log.debug("Flushing %s transaction%s during flush #%s" %
|
||||
(count, util.plural(count), str(self._flush_count + 1)))
|
||||
|
||||
timer = util.Timer()
|
||||
self._trs_to_flush = to_flush
|
||||
self.flush_next()
|
||||
# The emit time is reported on the next run.
|
||||
dimensions = self._set_dimensions({'component': 'monasca-agent', 'service': 'monitoring'})
|
||||
emit_measurement = metrics.Measurement('monasca.emit_time_sec', time.time(), timer.step(), dimensions)
|
||||
MetricTransaction([emit_measurement], headers={'Content-Type': 'application/json'})
|
||||
else:
|
||||
if should_log:
|
||||
log.info("No transaction to flush during flush #%s" % str(self._flush_count + 1))
|
||||
else:
|
||||
log.debug("No transaction to flush during flush #%s" % str(self._flush_count + 1))
|
||||
|
||||
if self._flush_count + 1 == FLUSH_LOGGING_INITIAL:
|
||||
log.info("First flushes done, next flushes will be logged every %s flushes." %
|
||||
FLUSH_LOGGING_PERIOD)
|
||||
|
||||
self._flush_count += 1
|
||||
|
||||
check_status.ForwarderStatus(queue_length=self._total_count,
|
||||
queue_size=self._total_size,
|
||||
flush_count=self._flush_count,
|
||||
transactions_received=self._transactions_received,
|
||||
transactions_flushed=self._transactions_flushed).persist()
|
||||
|
||||
def flush_next(self):
|
||||
|
||||
if len(self._trs_to_flush) > 0:
|
||||
|
||||
td = self._last_flush + self._THROTTLING_DELAY - datetime.now()
|
||||
# Python 2.7 has this built in, python < 2.7 don't...
|
||||
if hasattr(td, 'total_seconds'):
|
||||
delay = td.total_seconds()
|
||||
else:
|
||||
delay = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6
|
||||
|
||||
if delay <= 0:
|
||||
tr = self._trs_to_flush.pop()
|
||||
self._last_flush = datetime.now()
|
||||
log.debug("Flushing transaction %d" % tr.get_id())
|
||||
try:
|
||||
tr.flush()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
self.tr_error(tr)
|
||||
self.flush_next()
|
||||
else:
|
||||
# Wait a little bit more
|
||||
tornado_ioloop = util.get_tornado_ioloop()
|
||||
if tornado_ioloop._running:
|
||||
tornado_ioloop.add_timeout(time.time() + delay, lambda: self.flush_next())
|
||||
elif self._flush_without_ioloop:
|
||||
# Tornado is no started (ie, unittests), do it manually: BLOCKING
|
||||
time.sleep(delay)
|
||||
self.flush_next()
|
||||
else:
|
||||
self._trs_to_flush = None
|
||||
|
||||
def tr_error(self, tr):
|
||||
tr.inc_error_count()
|
||||
tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY)
|
||||
log.warn(
|
||||
"Transaction %d in error (%s error%s), it will be replayed after %s" %
|
||||
(tr.get_id(),
|
||||
tr.get_error_count(),
|
||||
util.plural(tr.get_error_count()),
|
||||
tr.get_next_flush()))
|
||||
|
||||
def tr_success(self, tr):
|
||||
log.debug("Transaction %d completed" % tr.get_id())
|
||||
self._transactions.remove(tr)
|
||||
self._total_count += -1
|
||||
self._total_size += - tr.get_size()
|
||||
self._transactions_flushed += 1
|
||||
self.print_queue_stats()
|
Loading…
Reference in New Issue