Remove emit_time from the collector and put it into the forwarder.
Fixed the service dimension for api checks on various openstack services so that it matches that of the other service checks, allowing for it to be grouped together. Modified the config so a config in the working dir can be used this is helpful for testing. Additionally I fixed up few tests and removed some no longer used, there is a lot of work with test left unfinished Change-Id: Ic5b127db41c174735f1436bb511f00e271274cb0
This commit is contained in:
parent
f37150d0a8
commit
19cd1c053e
|
@ -17,7 +17,6 @@ import yaml
|
|||
|
||||
import monasca_agent.common.aggregator as aggregator
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.config as config
|
||||
import monasca_agent.common.exceptions as exceptions
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@ log = logging.getLogger(__name__)
|
|||
|
||||
MAX_THREADS_COUNT = 50
|
||||
MAX_COLLECTION_TIME = 30
|
||||
MAX_EMIT_TIME = 5
|
||||
MAX_CPU_PCT = 10
|
||||
FLUSH_LOGGING_PERIOD = 10
|
||||
FLUSH_LOGGING_INITIAL = 5
|
||||
|
@ -32,7 +31,6 @@ class Collector(util.Dimensions):
|
|||
def __init__(self, agent_config, emitter, checksd=None):
|
||||
super(Collector, self).__init__(agent_config)
|
||||
self.agent_config = agent_config
|
||||
self.emit_duration = None
|
||||
self.os = util.get_os()
|
||||
self.plugins = None
|
||||
self.emitter = emitter
|
||||
|
@ -90,17 +88,17 @@ class Collector(util.Dimensions):
|
|||
log.exception("Error persisting collector status")
|
||||
|
||||
if self.run_count <= FLUSH_LOGGING_INITIAL or self.run_count % FLUSH_LOGGING_PERIOD == 0:
|
||||
log.info("Finished run #%s. Collection time: %ss. Emit time: %ss" %
|
||||
(self.run_count, round(collect_duration, 2), round(self.emit_duration, 2)))
|
||||
log.info("Finished run #%s. Collection time: %ss." %
|
||||
(self.run_count, round(collect_duration, 2)))
|
||||
if self.run_count == FLUSH_LOGGING_INITIAL:
|
||||
log.info("First flushes done, next flushes will be logged every %s flushes." %
|
||||
FLUSH_LOGGING_PERIOD)
|
||||
|
||||
else:
|
||||
log.debug("Finished run #%s. Collection time: %ss. Emit time: %ss" %
|
||||
(self.run_count, round(collect_duration, 2), round(self.emit_duration, 2)))
|
||||
log.debug("Finished run #%s. Collection time: %ss." %
|
||||
(self.run_count, round(collect_duration, 2),))
|
||||
|
||||
def collector_stats(self, num_metrics, num_events, collection_time, emit_time):
|
||||
def collector_stats(self, num_metrics, num_events, collection_time):
|
||||
metrics = {}
|
||||
thread_count = threading.active_count()
|
||||
metrics['monasca.thread_count'] = thread_count
|
||||
|
@ -112,11 +110,6 @@ class Collector(util.Dimensions):
|
|||
log.info("Collection time (s) is high: %.1f, metrics count: %d, events count: %d" %
|
||||
(collection_time, num_metrics, num_events))
|
||||
|
||||
metrics['monasca.emit_time_sec'] = emit_time
|
||||
if emit_time is not None and emit_time > MAX_EMIT_TIME:
|
||||
log.info("Emit time (s) is high: %.1f, metrics count: %d, events count: %d" %
|
||||
(emit_time, num_metrics, num_events))
|
||||
|
||||
return metrics
|
||||
|
||||
def run(self):
|
||||
|
@ -157,16 +150,15 @@ class Collector(util.Dimensions):
|
|||
collect_duration = timer.step()
|
||||
|
||||
dimensions = {'component': 'monasca-agent'}
|
||||
# Add in metrics on the collector run, emit_duration is from the previous run
|
||||
# Add in metrics on the collector run
|
||||
for name, value in self.collector_stats(len(metrics_list), len(events),
|
||||
collect_duration, self.emit_duration).iteritems():
|
||||
collect_duration).iteritems():
|
||||
metrics_list.append(metrics.Measurement(name,
|
||||
timestamp,
|
||||
value,
|
||||
self._set_dimensions(dimensions),
|
||||
None))
|
||||
emitter_statuses = self._emit(metrics_list)
|
||||
self.emit_duration = timer.step()
|
||||
|
||||
# Persist the status of the collection run.
|
||||
self._set_status(checks_statuses, emitter_statuses, collect_duration)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import ConfigParser as parser
|
||||
import logging
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
import six
|
||||
|
@ -14,14 +15,12 @@ except ImportError:
|
|||
|
||||
import monasca_agent.common.singleton as singleton
|
||||
|
||||
DEFAULT_CONFIG_DIR = '/etc/monasca/agent'
|
||||
DEFAULT_CONFIG_FILE = DEFAULT_CONFIG_DIR + '/agent.conf'
|
||||
DEFAULT_CONFIG_FILE = '/etc/monasca/agent/agent.conf'
|
||||
DEFAULT_CHECK_FREQUENCY = 15 # seconds
|
||||
DEFAULT_LOG_DIR = '/var/log/monasca/agent'
|
||||
DEFAULT_STATSD_FREQUENCY = 20 # seconds
|
||||
DEFAULT_STATSD_INTERVAL = 10 # seconds
|
||||
LOGGING_MAX_BYTES = 5 * 1024 * 1024
|
||||
DEFAULT_CONFIG_DIR = '/etc/monasca/agent'
|
||||
DEFAULT_LOG_DIR = '/var/log/monasca/agent'
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -32,7 +31,15 @@ class Config(object):
|
|||
|
||||
def __init__(self, configFile=None):
|
||||
self._config = None
|
||||
self._configFile = configFile
|
||||
if configFile is not None:
|
||||
self._configFile = configFile
|
||||
elif os.path.exists(DEFAULT_CONFIG_FILE):
|
||||
self._configFile = DEFAULT_CONFIG_FILE
|
||||
elif os.path.exists(os.getcwd() + '/agent.conf'):
|
||||
self._configFile = os.getcwd() + '/agent.conf'
|
||||
else:
|
||||
log.error('No config file found at {} nor in the working directory.'.format(DEFAULT_CONFIG_FILE))
|
||||
|
||||
self._read_config()
|
||||
|
||||
def get_config(self, sections='Main'):
|
||||
|
@ -59,11 +66,8 @@ class Config(object):
|
|||
'''Read in the config file.'''
|
||||
|
||||
file_config = parser.SafeConfigParser()
|
||||
configFile = DEFAULT_CONFIG_FILE
|
||||
if self._configFile:
|
||||
configFile = self._configFile
|
||||
log.debug("Loading config file from {}".format(configFile))
|
||||
file_config.readfp(self._skip_leading_wsp(open(configFile)))
|
||||
log.debug("Loading config file from {}".format(self._configFile))
|
||||
file_config.readfp(self._skip_leading_wsp(open(self._configFile)))
|
||||
self._config = self._retrieve_sections(file_config)
|
||||
|
||||
# Process and update any special case configuration
|
||||
|
@ -80,7 +84,7 @@ class Config(object):
|
|||
'dimensions': None,
|
||||
'listen_port': None,
|
||||
'version': self.get_version(),
|
||||
'additional_checksd': DEFAULT_CONFIG_DIR + '/checks_d/',
|
||||
'additional_checksd': os.path.join(os.path.dirname(self._configFile), '/checks_d/'),
|
||||
'system_metrics': None,
|
||||
'ignore_filesystem_types': None,
|
||||
'device_blacklist_re': None,
|
||||
|
@ -188,7 +192,7 @@ class Config(object):
|
|||
self._config['Main']['ignore_filesystem_types'] = file_system_list
|
||||
|
||||
def get_confd_path(self):
|
||||
path = os.path.join(DEFAULT_CONFIG_DIR, 'conf.d')
|
||||
path = os.path.join(os.path.dirname(self._configFile), 'conf.d')
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
raise PathNotFound(path)
|
||||
|
|
|
@ -301,9 +301,12 @@ class Paths(object):
|
|||
raise PathNotFound(bad_path)
|
||||
|
||||
def _unix_confd_path(self):
|
||||
path = os.path.join(configuration.DEFAULT_CONFIG_DIR, 'conf.d')
|
||||
path = os.path.join(os.path.dirname(configuration.DEFAULT_CONFIG_FILE), 'conf.d')
|
||||
path2 = os.path.join(os.getcwd(), 'conf.d')
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
elif os.path.exists(path2):
|
||||
return path2
|
||||
raise PathNotFound(path)
|
||||
|
||||
def _windows_confd_path(self):
|
||||
|
|
|
@ -14,8 +14,6 @@ import datetime
|
|||
|
||||
# set up logging before importing any other components
|
||||
import monasca_agent.common.util as util
|
||||
import monasca_agent.forwarder.api.mon as mon
|
||||
import monasca_agent.common.config as cfg
|
||||
|
||||
util.initialize_logging('forwarder')
|
||||
|
||||
|
@ -31,10 +29,12 @@ import tornado.escape
|
|||
import tornado.options
|
||||
|
||||
# agent import
|
||||
import monasca_agent.common.config as cfg
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
import transaction
|
||||
import monasca_agent.forwarder.api.mon as mon
|
||||
import monasca_agent.forwarder.transaction as transaction
|
||||
|
||||
log = logging.getLogger('forwarder')
|
||||
|
||||
|
@ -50,64 +50,12 @@ MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB
|
|||
THROTTLING_DELAY = datetime.timedelta(microseconds=1000000 / 2) # 2 msg/second
|
||||
|
||||
|
||||
class MetricTransaction(transaction.Transaction):
|
||||
|
||||
_application = None
|
||||
_trManager = None
|
||||
_endpoints = []
|
||||
|
||||
@classmethod
|
||||
def set_application(cls, app):
|
||||
cls._application = app
|
||||
|
||||
@classmethod
|
||||
def set_tr_manager(cls, manager):
|
||||
cls._trManager = manager
|
||||
|
||||
@classmethod
|
||||
def get_tr_manager(cls):
|
||||
return cls._trManager
|
||||
|
||||
@classmethod
|
||||
def set_endpoints(cls, endpoint):
|
||||
# todo we only have one endpoint option, generalize it better
|
||||
# the immediate use case for two endpoints could be our own monitoring boxes, they could send to
|
||||
# the main api and mini-mon api
|
||||
cls._endpoints.append(endpoint)
|
||||
|
||||
def __init__(self, data, headers):
|
||||
self._data = data
|
||||
self._headers = headers
|
||||
|
||||
# Call after data has been set (size is computed in Transaction's init)
|
||||
transaction.Transaction.__init__(self)
|
||||
|
||||
# Insert the transaction in the Manager
|
||||
self._trManager.append(self)
|
||||
log.debug("Created transaction %d" % self.get_id())
|
||||
self._trManager.flush()
|
||||
|
||||
def __sizeof__(self):
|
||||
return sys.getsizeof(self._data)
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
for endpoint in self._endpoints:
|
||||
endpoint.post_metrics(self._data)
|
||||
except Exception:
|
||||
log.exception('Error flushing metrics to remote endpoints')
|
||||
self._trManager.tr_error(self)
|
||||
else:
|
||||
self._trManager.tr_success(self)
|
||||
self._trManager.flush_next()
|
||||
|
||||
|
||||
class StatusHandler(tornado.web.RequestHandler):
|
||||
|
||||
def get(self):
|
||||
threshold = int(self.get_argument('threshold', -1))
|
||||
|
||||
m = MetricTransaction.get_tr_manager()
|
||||
m = transaction.MetricTransaction.get_tr_manager()
|
||||
|
||||
self.write(
|
||||
"<table><tr><td>Id</td><td>Size</td><td>Error count</td><td>Next flush</td></tr>")
|
||||
|
@ -142,7 +90,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
|
|||
|
||||
if len(measurements) > 0:
|
||||
# Setup a transaction for this message
|
||||
tr = MetricTransaction(measurements, headers)
|
||||
tr = transaction.MetricTransaction(measurements, headers)
|
||||
else:
|
||||
raise tornado.web.HTTPError(500)
|
||||
|
||||
|
@ -156,10 +104,13 @@ class Forwarder(tornado.web.Application):
|
|||
self._port = int(port)
|
||||
self._agent_config = agent_config
|
||||
self._metrics = {}
|
||||
MetricTransaction.set_application(self)
|
||||
MetricTransaction.set_endpoints(mon.MonAPI(agent_config))
|
||||
self._tr_manager = transaction.TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
|
||||
MetricTransaction.set_tr_manager(self._tr_manager)
|
||||
transaction.MetricTransaction.set_application(self)
|
||||
transaction.MetricTransaction.set_endpoints(mon.MonAPI(agent_config))
|
||||
self._tr_manager = transaction.TransactionManager(MAX_WAIT_FOR_REPLAY,
|
||||
MAX_QUEUE_SIZE,
|
||||
THROTTLING_DELAY,
|
||||
agent_config)
|
||||
transaction.MetricTransaction.set_tr_manager(self._tr_manager)
|
||||
|
||||
self._watchdog = None
|
||||
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
|
||||
|
@ -176,7 +127,7 @@ class Forwarder(tornado.web.Application):
|
|||
def _post_metrics(self):
|
||||
|
||||
if len(self._metrics) > 0:
|
||||
MetricTransaction(self._metrics, headers={'Content-Type': 'application/json'})
|
||||
transaction.MetricTransaction(self._metrics, headers={'Content-Type': 'application/json'})
|
||||
self._metrics = {}
|
||||
|
||||
# todo why is the tornado logging method overridden? Perhaps ditch this.
|
||||
|
|
|
@ -6,8 +6,9 @@ import logging
|
|||
from operator import attrgetter
|
||||
|
||||
# project
|
||||
from monasca_agent.common.check_status import ForwarderStatus
|
||||
from monasca_agent.common.util import get_tornado_ioloop, plural
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -63,12 +64,64 @@ class Transaction(object):
|
|||
raise NotImplementedError("To be implemented in a subclass")
|
||||
|
||||
|
||||
class TransactionManager(object):
|
||||
class MetricTransaction(Transaction):
|
||||
|
||||
_application = None
|
||||
_trManager = None
|
||||
_endpoints = []
|
||||
|
||||
@classmethod
|
||||
def set_application(cls, app):
|
||||
cls._application = app
|
||||
|
||||
@classmethod
|
||||
def set_tr_manager(cls, manager):
|
||||
cls._trManager = manager
|
||||
|
||||
@classmethod
|
||||
def get_tr_manager(cls):
|
||||
return cls._trManager
|
||||
|
||||
@classmethod
|
||||
def set_endpoints(cls, endpoint):
|
||||
# todo we only have one endpoint option, generalize it better
|
||||
# the immediate use case for two endpoints could be our own monitoring boxes, they could send to
|
||||
# the main api and mini-mon api
|
||||
cls._endpoints.append(endpoint)
|
||||
|
||||
def __init__(self, data, headers):
|
||||
self._data = data
|
||||
self._headers = headers
|
||||
|
||||
# Call after data has been set (size is computed in Transaction's init)
|
||||
Transaction.__init__(self)
|
||||
|
||||
# Insert the transaction in the Manager
|
||||
self._trManager.append(self)
|
||||
log.debug("Created transaction %d" % self.get_id())
|
||||
|
||||
def __sizeof__(self):
|
||||
return sys.getsizeof(self._data)
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
for endpoint in self._endpoints:
|
||||
endpoint.post_metrics(self._data)
|
||||
except Exception:
|
||||
log.exception('Error flushing metrics to remote endpoints')
|
||||
self._trManager.tr_error(self)
|
||||
else:
|
||||
self._trManager.tr_success(self)
|
||||
self._trManager.flush_next()
|
||||
|
||||
|
||||
class TransactionManager(util.Dimensions):
|
||||
|
||||
"""Holds any transaction derived object list and make sure they
|
||||
are all commited, without exceeding parameters (throttling, memory consumption) """
|
||||
|
||||
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay):
|
||||
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay, agent_config):
|
||||
super(TransactionManager, self).__init__(agent_config)
|
||||
self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay
|
||||
self._MAX_QUEUE_SIZE = max_queue_size
|
||||
self._THROTTLING_DELAY = throttling_delay
|
||||
|
@ -90,7 +143,7 @@ class TransactionManager(object):
|
|||
self._last_flush = datetime.now() # Last flush (for throttling)
|
||||
|
||||
# Track an initial status message.
|
||||
ForwarderStatus().persist()
|
||||
check_status.ForwarderStatus().persist()
|
||||
|
||||
def get_transactions(self):
|
||||
return self._transactions
|
||||
|
@ -152,13 +205,18 @@ class TransactionManager(object):
|
|||
if count > 0:
|
||||
if should_log:
|
||||
log.info("Flushing %s transaction%s during flush #%s" %
|
||||
(count, plural(count), str(self._flush_count + 1)))
|
||||
(count, util.plural(count), str(self._flush_count + 1)))
|
||||
else:
|
||||
log.debug("Flushing %s transaction%s during flush #%s" %
|
||||
(count, plural(count), str(self._flush_count + 1)))
|
||||
(count, util.plural(count), str(self._flush_count + 1)))
|
||||
|
||||
timer = util.Timer()
|
||||
self._trs_to_flush = to_flush
|
||||
self.flush_next()
|
||||
# The emit time is reported on the next run.
|
||||
dimensions = self._set_dimensions({'component': 'monasca-agent'})
|
||||
emit_measurement = metrics.Measurement('monasca.emit_time_sec', time.time(), timer.step(), dimensions)
|
||||
MetricTransaction([emit_measurement], headers={'Content-Type': 'application/json'})
|
||||
else:
|
||||
if should_log:
|
||||
log.info("No transaction to flush during flush #%s" % str(self._flush_count + 1))
|
||||
|
@ -171,12 +229,11 @@ class TransactionManager(object):
|
|||
|
||||
self._flush_count += 1
|
||||
|
||||
ForwarderStatus(
|
||||
queue_length=self._total_count,
|
||||
queue_size=self._total_size,
|
||||
flush_count=self._flush_count,
|
||||
transactions_received=self._transactions_received,
|
||||
transactions_flushed=self._transactions_flushed).persist()
|
||||
check_status.ForwarderStatus(queue_length=self._total_count,
|
||||
queue_size=self._total_size,
|
||||
flush_count=self._flush_count,
|
||||
transactions_received=self._transactions_received,
|
||||
transactions_flushed=self._transactions_flushed).persist()
|
||||
|
||||
def flush_next(self):
|
||||
|
||||
|
@ -201,7 +258,7 @@ class TransactionManager(object):
|
|||
self.flush_next()
|
||||
else:
|
||||
# Wait a little bit more
|
||||
tornado_ioloop = get_tornado_ioloop()
|
||||
tornado_ioloop = util.get_tornado_ioloop()
|
||||
if tornado_ioloop._running:
|
||||
tornado_ioloop.add_timeout(time.time() + delay, lambda: self.flush_next())
|
||||
elif self._flush_without_ioloop:
|
||||
|
@ -218,9 +275,8 @@ class TransactionManager(object):
|
|||
"Transaction %d in error (%s error%s), it will be replayed after %s" %
|
||||
(tr.get_id(),
|
||||
tr.get_error_count(),
|
||||
plural(
|
||||
tr.get_error_count()),
|
||||
tr.get_next_flush()))
|
||||
util.plural(tr.get_error_count()),
|
||||
tr.get_next_flush()))
|
||||
|
||||
def tr_success(self, tr):
|
||||
log.debug("Transaction %d completed" % tr.get_id())
|
||||
|
|
|
@ -52,7 +52,7 @@ class ServicePlugin(Plugin):
|
|||
# Setup an active http_status check on the API
|
||||
log.info("\tConfiguring an http_check for the {0} API.".format(self.service_name))
|
||||
config.merge(service_api_check(self.service_name + '-api', self.service_api_url,
|
||||
self.search_pattern, self.service_name + '_api'))
|
||||
self.search_pattern, self.service_name))
|
||||
|
||||
return config
|
||||
|
||||
|
|
|
@ -4,12 +4,12 @@ import os
|
|||
import signal
|
||||
|
||||
from monasca_agent.collector.checks import AgentCheck
|
||||
from monasca_agent.common.config import get_checksd_path
|
||||
from monasca_agent.common.util import load_check_directory
|
||||
from monasca_agent.common.util import get_os
|
||||
|
||||
|
||||
def load_check(name, config, agent_config):
|
||||
checksd_path = get_checksd_path(get_os())
|
||||
checksd_path = load_check_directory()
|
||||
if checksd_path not in sys.path:
|
||||
sys.path.append(checksd_path)
|
||||
|
||||
|
@ -61,7 +61,7 @@ def kill_subprocess(process_obj):
|
|||
|
||||
|
||||
def get_check(name, config_str):
|
||||
checksd_path = get_checksd_path(get_os())
|
||||
checksd_path = load_check_directory()
|
||||
if checksd_path not in sys.path:
|
||||
sys.path.append(checksd_path)
|
||||
check_module = __import__(name)
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
import logging
|
||||
import unittest
|
||||
|
||||
from nose.plugins.attrib import attr
|
||||
|
||||
from collector.dogstream.cassandra import parse_cassandra
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestCassandraDogstream(unittest.TestCase):
|
||||
|
||||
@attr('cassandra')
|
||||
def testStart(self):
|
||||
events = parse_cassandra(
|
||||
logger,
|
||||
" INFO [main] 2012-12-11 21:46:26,995 StorageService.java (line 687) Bootstrap/Replace/Move completed! Now serving reads.")
|
||||
self.assertTrue(events is None)
|
||||
|
||||
@attr('cassandra')
|
||||
def testInfo(self):
|
||||
events = parse_cassandra(
|
||||
logger,
|
||||
" INFO [CompactionExecutor:35] 2012-12-02 21:15:03,738 AutoSavingCache.java (line 268) Saved KeyCache (5 items) in 3 ms")
|
||||
self.assertTrue(events is None)
|
||||
|
||||
@attr('cassandra')
|
||||
def testWarn(self):
|
||||
events = parse_cassandra(
|
||||
logger,
|
||||
" WARN [MemoryMeter:1] 2012-12-03 20:07:47,158 Memtable.java (line 197) setting live ratio to minimum of 1.0 instead of 0.9416553595658074")
|
||||
self.assertTrue(events is None)
|
||||
|
||||
@attr('cassandra')
|
||||
def testError(self):
|
||||
for line in """\
|
||||
ERROR [CompactionExecutor:518] 2012-12-11 21:35:29,686 AbstractCassandraDaemon.java (line 135) Exception in thread Thread[CompactionExecutor:518,1,RMI Runtime]
|
||||
java.util.concurrent.RejectedExecutionException
|
||||
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
|
||||
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
|
||||
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:215)
|
||||
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:397)
|
||||
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:470)
|
||||
at org.apache.cassandra.io.sstable.SSTableDeletingTask.schedule(SSTableDeletingTask.java:67)
|
||||
at org.apache.cassandra.io.sstable.SSTableReader.releaseReference(SSTableReader.java:806)
|
||||
at org.apache.cassandra.db.DataTracker.removeOldSSTablesSize(DataTracker.java:358)
|
||||
at org.apache.cassandra.db.DataTracker.postReplace(DataTracker.java:330)
|
||||
at org.apache.cassandra.db.DataTracker.replace(DataTracker.java:324)
|
||||
at org.apache.cassandra.db.DataTracker.replaceCompactedSSTables(DataTracker.java:253)
|
||||
at org.apache.cassandra.db.ColumnFamilyStore.replaceCompactedSSTables(ColumnFamilyStore.java:992)
|
||||
at org.apache.cassandra.db.compaction.CompactionTask.execute(CompactionTask.java:200)
|
||||
at org.apache.cassandra.db.compaction.CompactionManager$1.runMayThrow(CompactionManager.java:154)
|
||||
at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:30)
|
||||
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
|
||||
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
|
||||
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
|
||||
at java.lang.Thread.run(Thread.java:662)""".splitlines():
|
||||
events = parse_cassandra(logger, line)
|
||||
self.assertTrue(events is None)
|
||||
|
||||
@attr('cassandra')
|
||||
def testCompactionStart(self):
|
||||
events = parse_cassandra(
|
||||
logger,
|
||||
" INFO [CompactionExecutor:2] 2012-12-11 21:46:27,012 CompactionTask.java (line 109) Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-11-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-9-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-12-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-10-Data.db')]")
|
||||
self.assertEqual(events,
|
||||
[{'alert_type': 'info',
|
||||
'event_type': 'cassandra.compaction',
|
||||
'timestamp': 1355262387,
|
||||
'msg_title': "Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-1",
|
||||
'msg_text': "Compacting [SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-11-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-9-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-12-Data.db'), SSTableReader(path='/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-10-Data.db')]",
|
||||
'auto_priority': 0}])
|
||||
|
||||
@attr('cassandra')
|
||||
def testCompactionEnd(self):
|
||||
events = parse_cassandra(
|
||||
logger,
|
||||
"INFO [CompactionExecutor:2] 2012-12-11 21:46:27,095 CompactionTask.java (line 221) Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 to 583 (~66% of original) bytes for 4 keys at 0.007831MB/s. Time: 71ms.")
|
||||
self.assertEqual(events,
|
||||
[{'alert_type': 'info',
|
||||
'event_type': 'cassandra.compaction',
|
||||
'timestamp': 1355262387,
|
||||
'msg_title': 'Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 ',
|
||||
'msg_text': 'Compacted to [/var/lib/cassandra/data/system/LocationInfo/system-LocationInfo-he-13-Data.db,]. 880 to 583 (~66% of original) bytes for 4 keys at 0.007831MB/s. Time: 71ms.',
|
||||
'auto_priority': 0}])
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -7,7 +7,6 @@ from nose.plugins.skip import SkipTest
|
|||
|
||||
from monasca_agent.common.aggregator import MetricsAggregator
|
||||
from monasca_agent.statsd import Server
|
||||
from monasca_agent.common.util import PidFile
|
||||
from monasca_agent.common.config import get_logging_config
|
||||
from monasca_agent.collector.jmxfetch import JMXFetch
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import unittest
|
|||
import logging
|
||||
logger = logging.getLogger()
|
||||
from monasca_agent.common.exceptions import UnknownValue, CheckException, Infinity
|
||||
from monasca_agent.collector.checks import Check
|
||||
from monasca_agent.collector.checks.check import Check
|
||||
from monasca_agent.common.aggregator import MetricsAggregator
|
||||
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import unittest
|
|||
import os.path
|
||||
import tempfile
|
||||
|
||||
from monasca_agent.common.config import get_config
|
||||
from monasca_agent.common.config import Config
|
||||
from monasca_agent.common.util import PidFile, is_valid_hostname
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@ class TestConfig(unittest.TestCase):
|
|||
def testWhiteSpaceConfig(self):
|
||||
"""Leading whitespace confuse ConfigParser
|
||||
"""
|
||||
agent_config = get_config(
|
||||
agent_config = Config.get_config(
|
||||
cfg_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "badconfig.conf"))
|
||||
self.assertEqual(agent_config["api_key"], "1234")
|
||||
|
||||
|
|
|
@ -1,795 +0,0 @@
|
|||
from __future__ import print_function
|
||||
import logging
|
||||
import unittest
|
||||
from tempfile import NamedTemporaryFile
|
||||
import re
|
||||
import os
|
||||
|
||||
from collector.checks.datadog import Dogstreams, EventDefaults, point_sorter
|
||||
from collector.dogstream import cassandra, supervisord_log, common
|
||||
|
||||
|
||||
log = logging.getLogger('datadog.test')
|
||||
NAGIOS_TEST_HOST = os.path.join(os.path.dirname(__file__), "host-perfdata")
|
||||
NAGIOS_TEST_SVC = os.path.join(os.path.dirname(__file__), "service-perfdata")
|
||||
NAGIOS_TEST_HOST_TEMPLATE = "[HOSTPERFDATA]\t$TIMET$\t$HOSTNAME$\t$HOSTEXECUTIONTIME$\t$HOSTOUTPUT$\t$HOSTPERFDATA$"
|
||||
NAGIOS_TEST_SVC_TEMPLATE = "[SERVICEPERFDATA]\t$TIMET$\t$HOSTNAME$\t$SERVICEDESC$\t$SERVICEEXECUTIONTIME$\t$SERVICELATENCY$\t$SERVICEOUTPUT$\t$SERVICEPERFDATA$"
|
||||
|
||||
|
||||
def parse_ancient_function_plugin(logger, line):
|
||||
"""Ancient stateless parser"""
|
||||
res = line.split()
|
||||
res[3] = {'metric_type': 'gauge'}
|
||||
|
||||
|
||||
def parse_function_plugin(logger, line, state):
|
||||
"""Simple stateful parser"""
|
||||
try:
|
||||
acc = state["test_acc"] + 1
|
||||
except KeyError:
|
||||
acc = 1
|
||||
state["test_acc"] = acc
|
||||
res = line.split()
|
||||
res[2] = acc
|
||||
res[3] = {'metric_type': 'counter'}
|
||||
return tuple(res)
|
||||
|
||||
|
||||
class ParseClassPlugin(object):
|
||||
|
||||
"""Class-based stateful parser"""
|
||||
|
||||
def __init__(self, logger=None, user_args=(), **kwargs):
|
||||
self.logger = logger
|
||||
self.args = '.'.join(user_args)
|
||||
self.acc = 0
|
||||
self.logger.info('Completed initialization')
|
||||
|
||||
def parse_line(self, line):
|
||||
self.logger.info('Parsing line %r; counter is %r', line, self.acc)
|
||||
self.acc += 1
|
||||
res = line.split()
|
||||
res[0] = self.args + ':' + res[0]
|
||||
res[2] = self.acc
|
||||
res[3] = {'metric_type': 'counter'}
|
||||
return tuple(res)
|
||||
|
||||
|
||||
import time
|
||||
from datetime import datetime
|
||||
import calendar
|
||||
|
||||
log_event_pattern = re.compile("".join([
|
||||
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ", # iso timestamp
|
||||
r"\[(?P<alert_type>(ERROR)|(RECOVERY))\] - ", # alert type
|
||||
r"(?P<msg_title>(?P<host>[^ ]*).*)"
|
||||
]))
|
||||
alert_types = {
|
||||
"ERROR": "error",
|
||||
"RECOVERY": "success"
|
||||
}
|
||||
|
||||
|
||||
def parse_events(logger, line):
|
||||
""" Expecting lines like this:
|
||||
2012-05-14 12:46:01 [ERROR] - host0 is down (broke its collarbone)
|
||||
"""
|
||||
match = log_event_pattern.match(line)
|
||||
if match:
|
||||
groups = match.groupdict()
|
||||
groups.update({
|
||||
'alert_type': alert_types.get(groups['alert_type'], ''),
|
||||
'timestamp': calendar.timegm(
|
||||
datetime.strptime(groups['timestamp'], '%Y-%m-%d %H:%M:%S').timetuple()),
|
||||
'msg_text': line
|
||||
})
|
||||
|
||||
return groups
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def repr_event_parser(logger, line):
|
||||
return eval(line)
|
||||
|
||||
|
||||
class TailTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.log_file = NamedTemporaryFile()
|
||||
self.logger = logging.getLogger('test.dogstream')
|
||||
|
||||
def _write_log(self, log_data):
|
||||
for data in log_data:
|
||||
print(data, file=self.log_file)
|
||||
self.log_file.flush()
|
||||
|
||||
def tearDown(self):
|
||||
self.log_file.close()
|
||||
|
||||
|
||||
class TestDogstream(TailTestCase):
|
||||
gauge = {'metric_type': 'gauge'}
|
||||
counter = {'metric_type': 'counter'}
|
||||
|
||||
def setUp(self):
|
||||
TailTestCase.setUp(self)
|
||||
|
||||
self.config = {
|
||||
'dogstreams': self.log_file.name,
|
||||
'check_freq': 5,
|
||||
}
|
||||
log.info("Test config: %s" % self.config)
|
||||
self.dogstream = Dogstreams.init(self.logger, self.config)
|
||||
self.maxDiff = None
|
||||
|
||||
def test_dogstream_gauge(self):
|
||||
log_data = [
|
||||
# bucket 0
|
||||
('test.metric.a', '1000000000', '10', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000001', '20', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000002', '3', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000003', '4', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000004', '5', 'metric_type=gauge'),
|
||||
|
||||
# bucket 1
|
||||
('test.metric.a', '1000000005', '12', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000006', '7', 'metric_type=gauge'),
|
||||
('test.metric.a', '1000000007', '8', 'metric_type=gauge'),
|
||||
]
|
||||
|
||||
expected_output = {
|
||||
"dogstream": [
|
||||
('test.metric.a', 1000000000, 5.0, self.gauge),
|
||||
('test.metric.a', 1000000005, 8.0, self.gauge),
|
||||
]
|
||||
}
|
||||
|
||||
self._write_log((' '.join(data) for data in log_data))
|
||||
|
||||
actual_output = self.dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
for metric, timestamp, val, attr in expected_output['dogstream']:
|
||||
assert isinstance(val, float)
|
||||
|
||||
def test_dogstream_counter(self):
|
||||
log_data = [
|
||||
# bucket 0
|
||||
('test.metric.a', '1000000000', '10', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000001', '20', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000002', '3', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000003', '4', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000004', '5', 'metric_type=counter'),
|
||||
|
||||
# bucket 1
|
||||
('test.metric.a', '1000000005', '12', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000006', '7', 'metric_type=counter'),
|
||||
('test.metric.a', '1000000007', '8', 'metric_type=counter'),
|
||||
]
|
||||
|
||||
expected_output = {
|
||||
"dogstream": [
|
||||
('test.metric.a', 1000000000, 42, self.counter),
|
||||
('test.metric.a', 1000000005, 27, self.counter),
|
||||
]
|
||||
}
|
||||
|
||||
self._write_log((' '.join(data) for data in log_data))
|
||||
|
||||
actual_output = self.dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
for metric, timestamp, val, attr in expected_output['dogstream']:
|
||||
assert isinstance(val, (int, long))
|
||||
|
||||
def test_dogstream_bad_input(self):
|
||||
log_data = [
|
||||
('test.metric.e1000000000 1metric_type=gauge'),
|
||||
('1000000001 1 metric_type=gauge tag=staging'),
|
||||
('test_metric.e 1 1000000002 metric_type=gauge'),
|
||||
('test_metric.e 1000000002 10 metric_type=gauge'),
|
||||
]
|
||||
expected_output = {"dogstream": [('test_metric.e', 1000000000, 10, self.gauge)]}
|
||||
|
||||
self._write_log(log_data)
|
||||
|
||||
actual_output = self.dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_dogstream_ancient_function_plugin(self):
|
||||
"""Ensure that pre-stateful plugins still work"""
|
||||
log_data = [
|
||||
'test.metric.simple 1000000000 1 metric_type=gauge',
|
||||
'test.metric.simple 1100000000 1 metric_type=gauge'
|
||||
]
|
||||
expected_output = {
|
||||
"dogstream": [
|
||||
('test.metric.simple', 1000000000, 1, self.gauge),
|
||||
('test.metric.simple', 1100000000, 1, self.gauge)]
|
||||
}
|
||||
self._write_log(log_data)
|
||||
plugdog = Dogstreams.init(
|
||||
self.logger, {
|
||||
'dogstreams': '%s:tests.test_datadog:parse_ancient_function_plugin' %
|
||||
self.log_file.name})
|
||||
actual_output = plugdog.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_dogstream_function_plugin(self):
|
||||
"""Ensure that non-class-based stateful plugins work"""
|
||||
log_data = [
|
||||
'test.metric.accumulator 1000000000 1 metric_type=counter',
|
||||
'test.metric.accumulator 1100000000 1 metric_type=counter'
|
||||
]
|
||||
expected_output = {
|
||||
"dogstream": [
|
||||
('test.metric.accumulator', 1000000000, 1, self.counter),
|
||||
('test.metric.accumulator', 1100000000, 2, self.counter)]
|
||||
}
|
||||
self._write_log(log_data)
|
||||
|
||||
statedog = Dogstreams.init(
|
||||
self.logger,
|
||||
{'dogstreams': '%s:tests.test_datadog:parse_function_plugin' % self.log_file.name})
|
||||
actual_output = statedog.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_dogstream_new_plugin(self):
|
||||
"""Ensure that class-based stateful plugins work"""
|
||||
log_data = [
|
||||
'test.metric.accumulator 1000000000 1 metric_type=counter',
|
||||
'test.metric.accumulator 1100000000 1 metric_type=counter'
|
||||
]
|
||||
expected_output = {
|
||||
"dogstream": [
|
||||
('foo.bar:test.metric.accumulator', 1000000000, 1, self.counter),
|
||||
('foo.bar:test.metric.accumulator', 1100000000, 2, self.counter)]
|
||||
}
|
||||
self._write_log(log_data)
|
||||
|
||||
statedog = Dogstreams.init(
|
||||
self.logger,
|
||||
{'dogstreams': '%s:tests.test_datadog:ParseClassPlugin:foo:bar' % self.log_file.name})
|
||||
actual_output = statedog.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_dogstream_events(self):
|
||||
log_data = [
|
||||
'2012-05-14 12:46:01 [ERROR] - host0 is down (broke its collarbone)',
|
||||
'2012-05-14 12:48:07 [ERROR] - host1 is down (got a bloody nose)',
|
||||
'2012-05-14 12:52:03 [RECOVERY] - host0 is up (collarbone healed)',
|
||||
'2012-05-14 12:59:09 [RECOVERY] - host1 is up (nose stopped bleeding)',
|
||||
]
|
||||
expected_output = {"dogstreamEvents": [{"timestamp": 1336999561,
|
||||
"alert_type": "error",
|
||||
"host": "host0",
|
||||
"msg_title": "host0 is down (broke its collarbone)",
|
||||
"msg_text": "2012-05-14 12:46:01 [ERROR] - host0 is down (broke its collarbone)",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
{"timestamp": 1336999687,
|
||||
"alert_type": "error",
|
||||
"host": "host1",
|
||||
"msg_title": "host1 is down (got a bloody nose)",
|
||||
"msg_text": "2012-05-14 12:48:07 [ERROR] - host1 is down (got a bloody nose)",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
{"timestamp": 1336999923,
|
||||
"alert_type": "success",
|
||||
"host": "host0",
|
||||
"msg_title": "host0 is up (collarbone healed)",
|
||||
"msg_text": "2012-05-14 12:52:03 [RECOVERY] - host0 is up (collarbone healed)",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
{"timestamp": 1337000349,
|
||||
"alert_type": "success",
|
||||
"host": "host1",
|
||||
"msg_title": "host1 is up (nose stopped bleeding)",
|
||||
"msg_text": "2012-05-14 12:59:09 [RECOVERY] - host1 is up (nose stopped bleeding)",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
]}
|
||||
|
||||
self._write_log(log_data)
|
||||
|
||||
dogstream = Dogstreams.init(
|
||||
self.logger, {'dogstreams': '%s:tests.test_datadog:parse_events' % self.log_file.name})
|
||||
actual_output = dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_dogstream_events_validation(self):
|
||||
log_data = [
|
||||
{"msg_title": "title", "timestamp": 1336999561},
|
||||
{"msg_text": "body", "timestamp": 1336999561},
|
||||
{"none of the above": "should get filtered out", "timestamp": 1336999561},
|
||||
]
|
||||
|
||||
expected_output = {
|
||||
"dogstreamEvents": [
|
||||
{
|
||||
"timestamp": 1336999561,
|
||||
"msg_title": "title",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
|
||||
{
|
||||
"timestamp": 1336999561,
|
||||
"msg_text": "body",
|
||||
"event_type": EventDefaults.EVENT_TYPE,
|
||||
"aggregation_key": EventDefaults.EVENT_OBJECT,
|
||||
"event_object": EventDefaults.EVENT_OBJECT,
|
||||
},
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
self._write_log([repr(d) for d in log_data])
|
||||
|
||||
dogstream = Dogstreams.init(
|
||||
self.logger,
|
||||
{'dogstreams': '%s:tests.test_datadog:repr_event_parser' % self.log_file.name})
|
||||
actual_output = dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_cassandra_parser(self):
|
||||
|
||||
log_data = """ INFO [CompactionExecutor:1594] 2012-05-12 21:05:12,924 Saved test_data-Encodings-KeyCache (86400 items) in 85 ms
|
||||
INFO [CompactionExecutor:1595] 2012-05-12 21:05:15,144 Saved test_data-Metrics-KeyCache (86400 items) in 96 ms
|
||||
INFO [CompactionExecutor:1596] 2012-05-12 21:10:48,058 Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6528-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6531-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6529-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6530-Data.db')]
|
||||
INFO [CompactionExecutor:1596] 2012-05-12 21:10:54,851 Compacted to [/var/cassandra/a-hc-65-Data.db,]. 102,079,134 to 101,546,397
|
||||
INFO [CompactionExecutor:1598] 2012-05-12 22:05:04,313 Saved test_data-ResourcesMetadata-KeyCache (1 items) in 10 ms
|
||||
INFO [CompactionExecutor:1599] 2012-05-12 22:05:14,813 Saved test_data-Encodings-KeyCache (86400 items) in 83 ms
|
||||
INFO [CompactionExecutor:1630] 2012-05-13 13:05:44,963 Saved test_data-Metrics-KeyCache (86400 items) in 77 ms
|
||||
INFO [CompactionExecutor:1631] 2012-05-13 13:15:01,923 Nothing to compact in data_log. Use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)
|
||||
INFO [CompactionExecutor:1632] 2012-05-13 13:15:01,927 Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6527-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6522-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6532-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6517-Data.db')]
|
||||
INFO [CompactionExecutor:1632] 2012-05-13 13:27:17,685 Compacting large row test_data/series:6c6f677c32 (782001077 bytes) incrementally
|
||||
INFO [CompactionExecutor:34] 2012-05-14 18:00:41,281 Saved test_data-Encodings-KeyCache (86400 items) in 78 ms
|
||||
INFO 13:27:17,685 Compacting large row test_data/series:6c6f677c32 (782001077 bytes) incrementally
|
||||
"""
|
||||
alert_type = cassandra.ALERT_TYPES["INFO"]
|
||||
event_type = cassandra.EVENT_TYPE
|
||||
event_object = EventDefaults.EVENT_OBJECT
|
||||
|
||||
expected_output = {"dogstreamEvents": [{"timestamp": cassandra.parse_date("2012-05-12 21:10:48,058"),
|
||||
"msg_title": "Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6528-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6531-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6529-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6530-Data.db')]"[0:common.MAX_TITLE_LEN],
|
||||
"msg_text": "Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6528-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6531-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6529-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6530-Data.db')]",
|
||||
"alert_type": alert_type,
|
||||
"auto_priority": 0,
|
||||
"event_type": event_type,
|
||||
"aggregation_key": event_object,
|
||||
"event_object": event_object,
|
||||
},
|
||||
{"timestamp": cassandra.parse_date("2012-05-12 21:10:54,851"),
|
||||
"msg_title": "Compacted to [/var/cassandra/a-hc-65-Data.db,]. 102,079,134 to 101,546,397",
|
||||
"alert_type": alert_type,
|
||||
"auto_priority": 0,
|
||||
"event_type": event_type,
|
||||
"aggregation_key": event_object,
|
||||
"event_object": event_object,
|
||||
},
|
||||
{"timestamp": cassandra.parse_date("2012-05-13 13:15:01,927"),
|
||||
"msg_title": "Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6527-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6522-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6532-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6517-Data.db')]"[0:common.MAX_TITLE_LEN],
|
||||
"msg_text": "Compacting [SSTableReader(path='/var/cassandra/data/test_data/series-hc-6527-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6522-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6532-Data.db'), SSTableReader(path='/var/cassandra/data/test_data/series-hc-6517-Data.db')]",
|
||||
"alert_type": alert_type,
|
||||
"event_type": event_type,
|
||||
"auto_priority": 0,
|
||||
"aggregation_key": event_object,
|
||||
"event_object": event_object,
|
||||
},
|
||||
{"timestamp": cassandra.parse_date("2012-05-13 13:27:17,685"),
|
||||
"msg_title": "Compacting large row test_data/series:6c6f677c32 (782001077 bytes) incrementally",
|
||||
"alert_type": alert_type,
|
||||
"event_type": event_type,
|
||||
"auto_priority": 0,
|
||||
"aggregation_key": event_object,
|
||||
"event_object": event_object,
|
||||
},
|
||||
{"timestamp": cassandra.parse_date(datetime.utcnow().strftime("%Y-%m-%d") + " 13:27:17,685"),
|
||||
"msg_title": "Compacting large row test_data/series:6c6f677c32 (782001077 bytes) incrementally",
|
||||
"alert_type": alert_type,
|
||||
"event_type": event_type,
|
||||
"auto_priority": 0,
|
||||
"aggregation_key": event_object,
|
||||
"event_object": event_object,
|
||||
},
|
||||
]}
|
||||
|
||||
self._write_log(log_data.split("\n"))
|
||||
|
||||
dogstream = Dogstreams.init(
|
||||
self.logger,
|
||||
{'dogstreams': '%s:dogstream.cassandra:parse_cassandra' % self.log_file.name})
|
||||
actual_output = dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_supervisord_parser(self):
|
||||
log_data = """2012-07-16 22:30:48,335 INFO spawned: 'monitor' with pid 20216
|
||||
2012-07-14 03:02:47,325 INFO success: foo_bar entered RUNNING state, process has stayed up for > than 2 seconds (startsecs)
|
||||
2012-07-17 02:53:04,600 CRIT Server 'inet_http_server' running without any HTTP authentication checking
|
||||
2012-07-14 04:54:34,193 WARN received SIGTERM indicating exit request
|
||||
"""
|
||||
event_type = supervisord_log.EVENT_TYPE
|
||||
|
||||
expected_output = {
|
||||
"dogstreamEvents": [
|
||||
{
|
||||
"alert_type": "info", "event_type": event_type,
|
||||
"aggregation_key": "monitor",
|
||||
"event_object": "monitor",
|
||||
"msg_title": "spawned: 'monitor' with pid 20216",
|
||||
"timestamp": int(time.mktime(datetime(2012, 7, 16, 22, 30, 48).timetuple())),
|
||||
}, {
|
||||
"alert_type": "success", "event_type": event_type,
|
||||
"aggregation_key": "foo_bar",
|
||||
"event_object": "foo_bar",
|
||||
"msg_title": "success: foo_bar entered RUNNING state, "
|
||||
"process has stayed up for > than 2 seconds (startsecs)",
|
||||
"timestamp": int(time.mktime(datetime(2012, 7, 14, 3, 2, 47).timetuple())),
|
||||
}, {
|
||||
"alert_type": "error", "event_type": event_type,
|
||||
"aggregation_key": "inet_http_server",
|
||||
"event_object": "inet_http_server",
|
||||
"msg_title": "Server 'inet_http_server' running without any HTTP authentication checking",
|
||||
"timestamp": int(time.mktime(datetime(2012, 7, 17, 2, 53, 4).timetuple())),
|
||||
}, {
|
||||
"alert_type": "warning", "event_type": event_type,
|
||||
"aggregation_key": "SIGTERM",
|
||||
"event_object": "SIGTERM",
|
||||
"msg_title": "received SIGTERM indicating exit request",
|
||||
"timestamp": int(time.mktime(datetime(2012, 7, 14, 4, 54, 34).timetuple())),
|
||||
},
|
||||
]}
|
||||
self._write_log(log_data.split("\n"))
|
||||
|
||||
dogstream = Dogstreams.init(
|
||||
self.logger,
|
||||
{'dogstreams': '%s:dogstream.supervisord_log:parse_supervisord' % self.log_file.name})
|
||||
actual_output = dogstream.check(self.config, move_end=False)
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
|
||||
class TestNagiosPerfData(TailTestCase):
|
||||
|
||||
def setUp(self):
|
||||
TailTestCase.setUp(self)
|
||||
self.nagios_config = NamedTemporaryFile()
|
||||
self.nagios_config.flush()
|
||||
|
||||
self.agent_config = {
|
||||
'nagios_perf_cfg': self.nagios_config.name,
|
||||
'check_freq': 5,
|
||||
}
|
||||
|
||||
def _write_nagios_config(self, config_data):
|
||||
for data in config_data:
|
||||
print(data, file=self.nagios_config)
|
||||
self.nagios_config.flush()
|
||||
|
||||
def tearDown(self):
|
||||
TailTestCase.tearDown(self)
|
||||
self.nagios_config.close()
|
||||
|
||||
def test_service_perfdata(self):
|
||||
from collector.checks.datadog import NagiosServicePerfData
|
||||
|
||||
self._write_nagios_config([
|
||||
"service_perfdata_file=%s" % self.log_file.name,
|
||||
"service_perfdata_file_template=DATATYPE::SERVICEPERFDATA\tTIMET::$TIMET$\tHOSTNAME::$HOSTNAME$\tSERVICEDESC::$SERVICEDESC$\tSERVICEPERFDATA::$SERVICEPERFDATA$\tSERVICECHECKCOMMAND::$SERVICECHECKCOMMAND$\tHOSTSTATE::$HOSTSTATE$\tHOSTSTATETYPE::$HOSTSTATETYPE$\tSERVICESTATE::$SERVICESTATE$\tSERVICESTATETYPE::$SERVICESTATETYPE$",
|
||||
])
|
||||
|
||||
dogstream = Dogstreams.init(self.logger, self.agent_config)
|
||||
self.assertEqual([NagiosServicePerfData], [d.__class__ for d in dogstream.dogstreams])
|
||||
|
||||
log_data = [
|
||||
(
|
||||
"DATATYPE::SERVICEPERFDATA",
|
||||
"TIMET::1000000000",
|
||||
"HOSTNAME::myhost0",
|
||||
"SERVICEDESC::Pgsql Backends",
|
||||
"SERVICEPERFDATA::" + " ".join([
|
||||
"time=0.06",
|
||||
"db0=33;180;190;0;200",
|
||||
"db1=1;150;190;0;200",
|
||||
"db2=0;120;290;1;200",
|
||||
"db3=0;110;195;5;100"
|
||||
]),
|
||||
"SERVICECHECKCOMMAND::check_nrpe_1arg!check_postgres_backends",
|
||||
"HOSTSTATE::UP",
|
||||
"HOSTSTATETYPE::HARD",
|
||||
"SERVICESTATE::OK",
|
||||
"SERVICESTATETYPE::HARD",
|
||||
),
|
||||
]
|
||||
|
||||
expected_output = [
|
||||
('nagios.pgsql_backends.time', 1000000000, 0.06, {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost0',
|
||||
}),
|
||||
('nagios.pgsql_backends.db0', 1000000000, 33., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost0',
|
||||
'warn': '180',
|
||||
'crit': '190',
|
||||
'min': '0',
|
||||
'max': '200',
|
||||
}),
|
||||
('nagios.pgsql_backends.db1', 1000000000, 1., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost0',
|
||||
'warn': '150',
|
||||
'crit': '190',
|
||||
'min': '0',
|
||||
'max': '200',
|
||||
}),
|
||||
('nagios.pgsql_backends.db2', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost0',
|
||||
'warn': '120',
|
||||
'crit': '290',
|
||||
'min': '1',
|
||||
'max': '200',
|
||||
}),
|
||||
('nagios.pgsql_backends.db3', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost0',
|
||||
'warn': '110',
|
||||
'crit': '195',
|
||||
'min': '5',
|
||||
'max': '100',
|
||||
}),
|
||||
]
|
||||
expected_output.sort(key=point_sorter)
|
||||
|
||||
self._write_log(('\t'.join(data) for data in log_data))
|
||||
|
||||
actual_output = dogstream.check(self.agent_config, move_end=False)['dogstream']
|
||||
actual_output.sort(key=point_sorter)
|
||||
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_service_perfdata_special_cases(self):
|
||||
from collector.checks.datadog import NagiosServicePerfData
|
||||
|
||||
self._write_nagios_config([
|
||||
"service_perfdata_file=%s" % self.log_file.name,
|
||||
"service_perfdata_file_template=DATATYPE::SERVICEPERFDATA\tTIMET::$TIMET$\tHOSTNAME::$HOSTNAME$\tSERVICEDESC::$SERVICEDESC$\tSERVICEPERFDATA::$SERVICEPERFDATA$\tSERVICECHECKCOMMAND::$SERVICECHECKCOMMAND$\tHOSTSTATE::$HOSTSTATE$\tHOSTSTATETYPE::$HOSTSTATETYPE$\tSERVICESTATE::$SERVICESTATE$\tSERVICESTATETYPE::$SERVICESTATETYPE$",
|
||||
])
|
||||
|
||||
dogstream = Dogstreams.init(self.logger, self.agent_config)
|
||||
self.assertEqual([NagiosServicePerfData], [d.__class__ for d in dogstream.dogstreams])
|
||||
|
||||
log_data = [
|
||||
(
|
||||
"DATATYPE::SERVICEPERFDATA",
|
||||
"TIMET::1000000000",
|
||||
"HOSTNAME::myhost2",
|
||||
"SERVICEDESC::Disk Space",
|
||||
"SERVICEPERFDATA::" + " ".join([
|
||||
"/=5477MB;6450;7256;0;8063",
|
||||
"/dev=0MB;2970;3341;0;3713",
|
||||
"/dev/shm=0MB;3080;3465;0;3851",
|
||||
"/var/run=0MB;3080;3465;0;3851",
|
||||
"/var/lock=0MB;3080;3465;0;3851",
|
||||
"/lib/init/rw=0MB;3080;3465;0;3851",
|
||||
"/mnt=290MB;338636;380966;0;423296",
|
||||
"/data=39812MB;40940;46057;0;51175",
|
||||
]),
|
||||
"SERVICECHECKCOMMAND::check_all_disks!20%!10%",
|
||||
"HOSTSTATE::UP",
|
||||
"HOSTSTATETYPE::HARD",
|
||||
"SERVICESTATE::OK",
|
||||
"SERVICESTATETYPE::HARD",
|
||||
)
|
||||
]
|
||||
|
||||
expected_output = [
|
||||
('nagios.disk_space', 1000000000, 5477., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/',
|
||||
'unit': 'MB',
|
||||
'warn': '6450',
|
||||
'crit': '7256',
|
||||
'min': '0',
|
||||
'max': '8063',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/dev',
|
||||
'unit': 'MB',
|
||||
'warn': '2970',
|
||||
'crit': '3341',
|
||||
'min': '0',
|
||||
'max': '3713',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/dev/shm',
|
||||
'unit': 'MB',
|
||||
'warn': '3080',
|
||||
'crit': '3465',
|
||||
'min': '0',
|
||||
'max': '3851',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/var/run',
|
||||
'unit': 'MB',
|
||||
'warn': '3080',
|
||||
'crit': '3465',
|
||||
'min': '0',
|
||||
'max': '3851',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/var/lock',
|
||||
'unit': 'MB',
|
||||
'warn': '3080',
|
||||
'crit': '3465',
|
||||
'min': '0',
|
||||
'max': '3851',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/lib/init/rw',
|
||||
'unit': 'MB',
|
||||
'warn': '3080',
|
||||
'crit': '3465',
|
||||
'min': '0',
|
||||
'max': '3851',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 290., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/mnt',
|
||||
'unit': 'MB',
|
||||
'warn': '338636',
|
||||
'crit': '380966',
|
||||
'min': '0',
|
||||
'max': '423296',
|
||||
}),
|
||||
('nagios.disk_space', 1000000000, 39812., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost2',
|
||||
'device_name': '/data',
|
||||
'unit': 'MB',
|
||||
'warn': '40940',
|
||||
'crit': '46057',
|
||||
'min': '0',
|
||||
'max': '51175',
|
||||
}),
|
||||
]
|
||||
expected_output.sort(key=point_sorter)
|
||||
|
||||
self._write_log(('\t'.join(data) for data in log_data))
|
||||
|
||||
actual_output = dogstream.check(self.agent_config, move_end=False)['dogstream']
|
||||
actual_output.sort(key=point_sorter)
|
||||
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_host_perfdata(self):
|
||||
from collector.checks.datadog import NagiosHostPerfData
|
||||
|
||||
self._write_nagios_config([
|
||||
"host_perfdata_file=%s" % self.log_file.name,
|
||||
"host_perfdata_file_template=DATATYPE::HOSTPERFDATA\tTIMET::$TIMET$\tHOSTNAME::$HOSTNAME$\tHOSTPERFDATA::$HOSTPERFDATA$\tHOSTCHECKCOMMAND::$HOSTCHECKCOMMAND$\tHOSTSTATE::$HOSTSTATE$\tHOSTSTATETYPE::$HOSTSTATETYPE$",
|
||||
])
|
||||
|
||||
dogstream = Dogstreams.init(self.logger, self.agent_config)
|
||||
self.assertEqual([NagiosHostPerfData], [d.__class__ for d in dogstream.dogstreams])
|
||||
|
||||
log_data = [
|
||||
(
|
||||
"DATATYPE::HOSTPERFDATA",
|
||||
"TIMET::1000000010",
|
||||
"HOSTNAME::myhost1",
|
||||
"HOSTPERFDATA::" + " ".join([
|
||||
"rta=0.978000ms;5000.000000;5000.000000;0.000000",
|
||||
"pl=0%;100;100;0",
|
||||
]),
|
||||
"HOSTCHECKCOMMAND::check-host-alive",
|
||||
"HOSTSTATE::UP",
|
||||
"HOSTSTATETYPE::HARD",
|
||||
),
|
||||
]
|
||||
|
||||
expected_output = [
|
||||
('nagios.host.rta', 1000000010, 0.978, {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost1',
|
||||
'unit': 'ms',
|
||||
'warn': '5000.000000',
|
||||
'crit': '5000.000000',
|
||||
'min': '0.000000'
|
||||
}),
|
||||
('nagios.host.pl', 1000000010, 0., {
|
||||
'metric_type': 'gauge',
|
||||
'host_name': 'myhost1',
|
||||
'unit': '%',
|
||||
'warn': '100',
|
||||
'crit': '100',
|
||||
'min': '0'
|
||||
}),
|
||||
]
|
||||
expected_output.sort(key=point_sorter)
|
||||
|
||||
self._write_log(('\t'.join(data) for data in log_data))
|
||||
|
||||
actual_output = dogstream.check(self.agent_config, move_end=False)['dogstream']
|
||||
actual_output.sort(key=point_sorter)
|
||||
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_alt_service_perfdata(self):
|
||||
from collector.checks.datadog import NagiosServicePerfData
|
||||
|
||||
self._write_nagios_config([
|
||||
"service_perfdata_file=%s" % NAGIOS_TEST_SVC,
|
||||
"service_perfdata_file_template=%s" % NAGIOS_TEST_SVC_TEMPLATE,
|
||||
])
|
||||
|
||||
dogstream = Dogstreams.init(self.logger, self.agent_config)
|
||||
self.assertEqual([NagiosServicePerfData], [d.__class__ for d in dogstream.dogstreams])
|
||||
actual_output = dogstream.check(self.agent_config, move_end=False)
|
||||
|
||||
expected_output = {'dogstream': [('nagios.current_users.users', 1339511440, 1.0,
|
||||
{'metric_type': 'gauge', 'warn': '20',
|
||||
'host_name': 'localhost', 'crit': '50', 'min': '0'}),
|
||||
('nagios.ping.pl', 1339511500, 0.0,
|
||||
{'warn': '20', 'metric_type': 'gauge',
|
||||
'host_name': 'localhost', 'min': '0', 'crit': '60',
|
||||
'unit': '%'}),
|
||||
('nagios.ping.rta', 1339511500, 0.065,
|
||||
{'warn': '100.000000', 'metric_type': 'gauge',
|
||||
'host_name': 'localhost',
|
||||
'min': '0.000000', 'crit': '500.000000',
|
||||
'unit': 'ms'}),
|
||||
('nagios.root_partition', 1339511560, 2470.0,
|
||||
{'min': '0', 'max': '7315', 'device_name': '/',
|
||||
'warn': '5852', 'metric_type': 'gauge',
|
||||
'host_name': 'localhost', 'crit': '6583',
|
||||
'unit': 'MB'})]}
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
def test_alt_host_perfdata(self):
|
||||
from collector.checks.datadog import NagiosHostPerfData
|
||||
|
||||
self._write_nagios_config([
|
||||
"host_perfdata_file=%s" % NAGIOS_TEST_HOST,
|
||||
"host_perfdata_file_template=%s" % NAGIOS_TEST_HOST_TEMPLATE,
|
||||
])
|
||||
|
||||
dogstream = Dogstreams.init(self.logger, self.agent_config)
|
||||
self.assertEqual([NagiosHostPerfData], [d.__class__ for d in dogstream.dogstreams])
|
||||
actual_output = dogstream.check(self.agent_config, move_end=False)
|
||||
|
||||
expected_output = {'dogstream': [('nagios.host.pl', 1339511440, 0.0,
|
||||
{'warn': '80', 'metric_type': 'gauge',
|
||||
'host_name': 'localhost', 'min': '0', 'crit': '100',
|
||||
'unit': '%'}),
|
||||
('nagios.host.rta', 1339511440, 0.048,
|
||||
{'warn': '3000.000000', 'metric_type': 'gauge',
|
||||
'host_name': 'localhost', 'min': '0.000000',
|
||||
'crit': '5000.000000', 'unit': 'ms'})]}
|
||||
self.assertEqual(expected_output, actual_output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
|
||||
unittest.main()
|
|
@ -2,7 +2,7 @@ import unittest
|
|||
from datetime import timedelta, datetime
|
||||
|
||||
from monasca_agent.forwarder.transaction import Transaction, TransactionManager
|
||||
from monasca_agent.forwarder import MAX_QUEUE_SIZE, THROTTLING_DELAY
|
||||
from monasca_agent.forwarder.daemon import MAX_QUEUE_SIZE, THROTTLING_DELAY
|
||||
|
||||
|
||||
class memTransaction(Transaction):
|
||||
|
@ -34,7 +34,7 @@ class TestTransaction(unittest.TestCase):
|
|||
"""Test memory limit as well as simple flush"""
|
||||
|
||||
# No throttling, no delay for replay
|
||||
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, timedelta(seconds=0))
|
||||
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, timedelta(seconds=0), {'dimensions': {}})
|
||||
|
||||
step = 10
|
||||
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
|
||||
|
@ -77,7 +77,7 @@ class TestTransaction(unittest.TestCase):
|
|||
"""Test throttling while flushing"""
|
||||
|
||||
# No throttling, no delay for replay
|
||||
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
|
||||
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY, {'dimensions': {}})
|
||||
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
|
||||
|
||||
# Add 3 transactions, make sure no memory limit is in the way
|
||||
|
|
Loading…
Reference in New Issue