monasca-agent/tests_to_fix/test_transaction.py

111 lines
3.7 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from datetime import timedelta, datetime
from monasca_agent.forwarder.transaction import Transaction, TransactionManager
from monasca_agent.forwarder.daemon import MAX_QUEUE_SIZE, THROTTLING_DELAY
class memTransaction(Transaction):
def __init__(self, size, manager):
Transaction.__init__(self)
self._trManager = manager
self._size = size
self._flush_count = 0
self.is_flushable = False
def flush(self):
self._flush_count += 1
if self.is_flushable:
self._trManager.tr_success(self)
else:
self._trManager.tr_error(self)
self._trManager.flush_next()
class TestTransaction(unittest.TestCase):
def setUp(self):
pass
def testMemoryLimit(self):
"""Test memory limit as well as simple flush"""
# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, timedelta(seconds=0), {'dimensions': {}})
step = 10
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
for i in range(step):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
trManager.flush()
# There should be exactly step transaction in the list, with
# a flush count of 1
self.assertEqual(len(trManager._transactions), step)
for tr in trManager._transactions:
self.assertEqual(tr._flush_count, 1)
# Try to add one more
tr = memTransaction(oneTrSize + 10, trManager)
trManager.append(tr)
# At this point, transaction one (the oldest) should have been removed from the list
self.assertEqual(len(trManager._transactions), step)
for tr in trManager._transactions:
self.assertNotEqual(tr._id, 1)
trManager.flush()
self.assertEqual(len(trManager._transactions), step)
# Check and allow transactions to be flushed
for tr in trManager._transactions:
tr.is_flushable = True
# Last transaction has been flushed only once
if tr._id == step + 1:
self.assertEqual(tr._flush_count, 1)
else:
self.assertEqual(tr._flush_count, 2)
trManager.flush()
self.assertEqual(len(trManager._transactions), 0)
def testThrottling(self):
"""Test throttling while flushing"""
# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY, {'dimensions': {}})
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
# Add 3 transactions, make sure no memory limit is in the way
oneTrSize = MAX_QUEUE_SIZE / 10
for i in range(3):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
# Try to flush them, time it
before = datetime.now()
trManager.flush()
after = datetime.now()
self.assertTrue((after - before) > 3 * THROTTLING_DELAY - timedelta(microseconds=100000),
"before = %s after = %s" % (before, after))
if __name__ == '__main__':
unittest.main()