Merge "Initialize correctly collector"
This commit is contained in:
commit
6d301dd952
@ -59,9 +59,8 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
class CollectorService(cotyledon.Service):
|
||||
"""Listener for the collector service."""
|
||||
def run(self):
|
||||
"""Bind the UDP socket and handle incoming data."""
|
||||
super(CollectorService, self).run()
|
||||
def __init__(self, worker_id):
|
||||
super(CollectorService, self).__init__(worker_id)
|
||||
# ensure dispatcher is configured before starting other services
|
||||
dispatcher_managers = dispatcher.load_dispatcher_manager()
|
||||
(self.meter_manager, self.event_manager) = dispatcher_managers
|
||||
@ -69,6 +68,7 @@ class CollectorService(cotyledon.Service):
|
||||
self.event_listener = None
|
||||
self.udp_thread = None
|
||||
|
||||
def run(self):
|
||||
if cfg.CONF.collector.udp_address:
|
||||
self.udp_thread = utils.spawn_thread(self.start_udp)
|
||||
|
||||
|
@ -76,6 +76,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
),
|
||||
'not-so-secret')
|
||||
|
||||
self.mock_dispatcher = self._setup_fake_dispatcher()
|
||||
self.srv = collector.CollectorService(0)
|
||||
|
||||
def _setup_messaging(self, enabled=True):
|
||||
@ -115,7 +116,6 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_udp_receive_base(self):
|
||||
self._setup_messaging(False)
|
||||
mock_dispatcher = self._setup_fake_dispatcher()
|
||||
|
||||
udp_socket = self._make_fake_socket(self.sample)
|
||||
|
||||
@ -128,13 +128,12 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
self._verify_udp_socket(udp_socket)
|
||||
mock_record = mock_dispatcher.record_metering_data
|
||||
mock_record = self.mock_dispatcher.record_metering_data
|
||||
mock_record.assert_called_once_with(self.sample)
|
||||
|
||||
def test_udp_socket_ipv6(self):
|
||||
self._setup_messaging(False)
|
||||
self.CONF.set_override('udp_address', '::1', group='collector')
|
||||
self._setup_fake_dispatcher()
|
||||
sock = self._make_fake_socket(self.sample)
|
||||
|
||||
with mock.patch.object(socket, 'socket') as mock_socket:
|
||||
@ -147,8 +146,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_udp_receive_storage_error(self):
|
||||
self._setup_messaging(False)
|
||||
mock_dispatcher = self._setup_fake_dispatcher()
|
||||
mock_record = mock_dispatcher.record_metering_data
|
||||
mock_record = self.mock_dispatcher.record_metering_data
|
||||
mock_record.side_effect = self._raise_error
|
||||
|
||||
udp_socket = self._make_fake_socket(self.sample)
|
||||
@ -168,7 +166,6 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_udp_receive_bad_decoding(self):
|
||||
self._setup_messaging(False)
|
||||
self._setup_fake_dispatcher()
|
||||
udp_socket = self._make_fake_socket(self.sample)
|
||||
with mock.patch('socket.socket', return_value=udp_socket):
|
||||
with mock.patch('msgpack.loads', self._raise_error):
|
||||
@ -183,7 +180,6 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
def test_only_udp(self, udp_start):
|
||||
"""Check that only UDP is started if messaging transport is unset."""
|
||||
self._setup_messaging(False)
|
||||
self._setup_fake_dispatcher()
|
||||
udp_socket = self._make_fake_socket(self.sample)
|
||||
real_start = oslo_messaging.MessageHandlingServer.start
|
||||
with mock.patch.object(oslo_messaging.MessageHandlingServer,
|
||||
@ -198,7 +194,6 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_udp_receive_valid_encoding(self):
|
||||
self._setup_messaging(False)
|
||||
mock_dispatcher = self._setup_fake_dispatcher()
|
||||
self.data_sent = []
|
||||
with mock.patch('socket.socket',
|
||||
return_value=self._make_fake_socket(self.utf8_msg)):
|
||||
@ -207,16 +202,15 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
self.srv.udp_thread.join(5)
|
||||
self.assertFalse(self.srv.udp_thread.is_alive())
|
||||
self.assertTrue(utils.verify_signature(
|
||||
mock_dispatcher.method_calls[0][1][0],
|
||||
self.mock_dispatcher.method_calls[0][1][0],
|
||||
"not-so-secret"))
|
||||
|
||||
def _test_collector_requeue(self, listener, batch_listener=False):
|
||||
|
||||
mock_dispatcher = self._setup_fake_dispatcher()
|
||||
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
|
||||
mock_record = mock_dispatcher.record_metering_data
|
||||
mock_record = self.mock_dispatcher.record_metering_data
|
||||
mock_record.side_effect = Exception('boom')
|
||||
mock_dispatcher.record_events.side_effect = Exception('boom')
|
||||
self.mock_dispatcher.record_events.side_effect = Exception('boom')
|
||||
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
|
Loading…
Reference in New Issue
Block a user