Files
monasca-agent/tests/test_transaction.py
Tim Kuhlman 19cd1c053e Remove emit_time from the collector and put it into the forwarder.
Fixed the service dimension for api checks on various openstack services
so that it matches that of the other service checks, allowing for it to
be grouped together.

Modified the config so a config in the working dir can be used this is
helpful for testing. Additionally I fixed up few tests and removed
some no longer used, there is a lot of work with test left unfinished

Change-Id: Ic5b127db41c174735f1436bb511f00e271274cb0
2015-02-04 14:54:00 -07:00

99 lines
3.2 KiB
Python

import unittest
from datetime import timedelta, datetime
from monasca_agent.forwarder.transaction import Transaction, TransactionManager
from monasca_agent.forwarder.daemon import MAX_QUEUE_SIZE, THROTTLING_DELAY
class memTransaction(Transaction):
def __init__(self, size, manager):
Transaction.__init__(self)
self._trManager = manager
self._size = size
self._flush_count = 0
self.is_flushable = False
def flush(self):
self._flush_count += 1
if self.is_flushable:
self._trManager.tr_success(self)
else:
self._trManager.tr_error(self)
self._trManager.flush_next()
class TestTransaction(unittest.TestCase):
def setUp(self):
pass
def testMemoryLimit(self):
"""Test memory limit as well as simple flush"""
# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, timedelta(seconds=0), {'dimensions': {}})
step = 10
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
for i in xrange(step):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
trManager.flush()
# There should be exactly step transaction in the list, with
# a flush count of 1
self.assertEqual(len(trManager._transactions), step)
for tr in trManager._transactions:
self.assertEqual(tr._flush_count, 1)
# Try to add one more
tr = memTransaction(oneTrSize + 10, trManager)
trManager.append(tr)
# At this point, transaction one (the oldest) should have been removed from the list
self.assertEqual(len(trManager._transactions), step)
for tr in trManager._transactions:
self.assertNotEqual(tr._id, 1)
trManager.flush()
self.assertEqual(len(trManager._transactions), step)
# Check and allow transactions to be flushed
for tr in trManager._transactions:
tr.is_flushable = True
# Last transaction has been flushed only once
if tr._id == step + 1:
self.assertEqual(tr._flush_count, 1)
else:
self.assertEqual(tr._flush_count, 2)
trManager.flush()
self.assertEqual(len(trManager._transactions), 0)
def testThrottling(self):
"""Test throttling while flushing"""
# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY, {'dimensions': {}})
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
# Add 3 transactions, make sure no memory limit is in the way
oneTrSize = MAX_QUEUE_SIZE / 10
for i in xrange(3):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
# Try to flush them, time it
before = datetime.now()
trManager.flush()
after = datetime.now()
self.assertTrue((after - before) > 3 * THROTTLING_DELAY - timedelta(microseconds=100000),
"before = %s after = %s" % (before, after))
if __name__ == '__main__':
unittest.main()