Misc inspection fixes
This commit is contained in:
parent
8674c1cde1
commit
2efcf81565
|
@ -11,7 +11,6 @@ import pickle
|
|||
import platform
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
import time
|
||||
|
||||
# project
|
||||
|
|
|
@ -38,7 +38,7 @@ from api import MonAPI
|
|||
from monagent.common.check_status import ForwarderStatus
|
||||
from monagent.common.config import get_config
|
||||
from monagent.common.metrics import Measurement
|
||||
from monagent.common.util import Watchdog, get_hostname, get_tornado_ioloop
|
||||
from monagent.common.util import Watchdog, get_tornado_ioloop
|
||||
from transaction import Transaction, TransactionManager
|
||||
|
||||
log = logging.getLogger('forwarder')
|
||||
|
|
|
@ -95,8 +95,8 @@ class TransactionManager(object):
|
|||
return self._transactions
|
||||
|
||||
def print_queue_stats(self):
|
||||
log.debug("Queue size: at %s, %s transaction(s), %s KB" %
|
||||
(time.time(), self._total_count, (self._total_size/1024)))
|
||||
log.debug("Queue size: at %s, %s transaction(s), %s KB" %
|
||||
(time.time(), self._total_count, (self._total_size/1024)))
|
||||
|
||||
def get_tr_id(self):
|
||||
self._counter += 1
|
||||
|
@ -146,22 +146,25 @@ class TransactionManager(object):
|
|||
to_flush.append(tr)
|
||||
|
||||
count = len(to_flush)
|
||||
should_log = self._flush_count +1 <= FLUSH_LOGGING_INITIAL or (self._flush_count + 1) % FLUSH_LOGGING_PERIOD == 0
|
||||
should_log = self._flush_count + 1 <= FLUSH_LOGGING_INITIAL or \
|
||||
(self._flush_count + 1) % FLUSH_LOGGING_PERIOD == 0
|
||||
if count > 0:
|
||||
if should_log:
|
||||
log.info("Flushing %s transaction%s during flush #%s" % (count,plural(count), str(self._flush_count + 1)))
|
||||
log.info("Flushing %s transaction%s during flush #%s" %
|
||||
(count, plural(count), str(self._flush_count + 1)))
|
||||
else:
|
||||
log.debug("Flushing %s transaction%s during flush #%s" % (count,plural(count), str(self._flush_count +1)))
|
||||
log.debug("Flushing %s transaction%s during flush #%s" %
|
||||
(count, plural(count), str(self._flush_count + 1)))
|
||||
|
||||
self._trs_to_flush = to_flush
|
||||
self.flush_next()
|
||||
else:
|
||||
if should_log:
|
||||
log.info("No transaction to flush during flush #%s" % str(self._flush_count +1))
|
||||
log.info("No transaction to flush during flush #%s" % str(self._flush_count + 1))
|
||||
else:
|
||||
log.debug("No transaction to flush during flush #%s" % str(self._flush_count +1))
|
||||
log.debug("No transaction to flush during flush #%s" % str(self._flush_count + 1))
|
||||
|
||||
if self._flush_count +1 == FLUSH_LOGGING_INITIAL:
|
||||
if self._flush_count + 1 == FLUSH_LOGGING_INITIAL:
|
||||
log.info("First flushes done, next flushes will be logged every %s flushes." % FLUSH_LOGGING_PERIOD)
|
||||
|
||||
self._flush_count += 1
|
||||
|
@ -179,7 +182,7 @@ class TransactionManager(object):
|
|||
|
||||
td = self._last_flush + self._THROTTLING_DELAY - datetime.now()
|
||||
# Python 2.7 has this built in, python < 2.7 don't...
|
||||
if hasattr(td,'total_seconds'):
|
||||
if hasattr(td, 'total_seconds'):
|
||||
delay = td.total_seconds()
|
||||
else:
|
||||
delay = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10.0**6
|
||||
|
@ -190,16 +193,15 @@ class TransactionManager(object):
|
|||
log.debug("Flushing transaction %d" % tr.get_id())
|
||||
try:
|
||||
tr.flush()
|
||||
except Exception,e :
|
||||
except Exception, e:
|
||||
log.exception(e)
|
||||
self.tr_error(tr)
|
||||
self.flush_next()
|
||||
else:
|
||||
# Wait a little bit more
|
||||
tornado_ioloop = get_tornado_ioloop()
|
||||
if tornado_ioloop._running:
|
||||
tornado_ioloop.add_timeout(time.time() + delay,
|
||||
lambda: self.flush_next())
|
||||
if tornado_ioloop._running:
|
||||
tornado_ioloop.add_timeout(time.time() + delay, lambda: self.flush_next())
|
||||
elif self._flush_without_ioloop:
|
||||
# Tornado is no started (ie, unittests), do it manually: BLOCKING
|
||||
time.sleep(delay)
|
||||
|
@ -207,17 +209,16 @@ class TransactionManager(object):
|
|||
else:
|
||||
self._trs_to_flush = None
|
||||
|
||||
def tr_error(self,tr):
|
||||
def tr_error(self, tr):
|
||||
tr.inc_error_count()
|
||||
tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY)
|
||||
log.warn("Transaction %d in error (%s error%s), it will be replayed after %s" %
|
||||
(tr.get_id(), tr.get_error_count(), plural(tr.get_error_count()),
|
||||
tr.get_next_flush()))
|
||||
(tr.get_id(), tr.get_error_count(), plural(tr.get_error_count()), tr.get_next_flush()))
|
||||
|
||||
def tr_success(self,tr):
|
||||
def tr_success(self, tr):
|
||||
log.debug("Transaction %d completed" % tr.get_id())
|
||||
self._transactions.remove(tr)
|
||||
self._total_count += -1
|
||||
self._total_count += -1
|
||||
self._total_size += - tr.get_size()
|
||||
self._transactions_flushed += 1
|
||||
self.print_queue_stats()
|
||||
|
|
Loading…
Reference in New Issue