From b7d39c1360c6eeab8e011299becdc8417a31dc83 Mon Sep 17 00:00:00 2001 From: Assaf Muller Date: Mon, 13 Oct 2014 19:24:01 +0300 Subject: [PATCH] Reuse nova batch notifier Refactor the batch notifier currently used by the Nova notifier into a separate class. It will be reused when batching L3 HA state change events. Partially-Implements: blueprint report-ha-router-master Change-Id: I2f8cf261f48bdb632ac0bd643a337290b5297fce --- neutron/notifiers/batch_notifier.py | 66 +++++++++++++++++++ neutron/notifiers/nova.py | 57 ++-------------- .../unit/notifiers/test_batch_notifier.py | 51 ++++++++++++++ .../unit/notifiers/test_notifiers_nova.py | 62 +++++------------ 4 files changed, 140 insertions(+), 96 deletions(-) create mode 100644 neutron/notifiers/batch_notifier.py create mode 100644 neutron/tests/unit/notifiers/test_batch_notifier.py diff --git a/neutron/notifiers/batch_notifier.py b/neutron/notifiers/batch_notifier.py new file mode 100644 index 00000000000..0396042dd7f --- /dev/null +++ b/neutron/notifiers/batch_notifier.py @@ -0,0 +1,66 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet + + +class BatchNotifier(object): + def __init__(self, batch_interval, callback): + self.pending_events = [] + self._waiting_to_send = False + self.callback = callback + self.batch_interval = batch_interval + + def queue_event(self, event): + """Called to queue sending an event with the next batch of events. + + Sending events individually, as they occur, has been problematic as it + can result in a flood of sends. Previously, there was a loopingcall + thread that would send batched events on a periodic interval. However, + maintaining a persistent thread in the loopingcall was also + problematic. + + This replaces the loopingcall with a mechanism that creates a + short-lived thread on demand when the first event is queued. That + thread will sleep once for the same batch_duration to allow other + events to queue up in pending_events and then will send them when it + wakes. + + If a thread is already alive and waiting, this call will simply queue + the event and return leaving it up to the thread to send it. + + :param event: the event that occurred. + """ + if not event: + return + + self.pending_events.append(event) + + if self._waiting_to_send: + return + + self._waiting_to_send = True + + def last_out_sends(): + eventlet.sleep(self.batch_interval) + self._waiting_to_send = False + self._notify() + + eventlet.spawn_n(last_out_sends) + + def _notify(self): + if not self.pending_events: + return + + batched_events = self.pending_events + self.pending_events = [] + self.callback(batched_events) diff --git a/neutron/notifiers/nova.py b/neutron/notifiers/nova.py index c31111f64de..4bad6dcbadd 100644 --- a/neutron/notifiers/nova.py +++ b/neutron/notifiers/nova.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet from keystoneclient import auth as ks_auth from keystoneclient.auth.identity import v2 as v2_auth from keystoneclient import session as ks_session @@ -28,6 +27,7 @@ from neutron.common import constants from neutron import context from neutron.i18n import _LE, _LI, _LW from neutron import manager +from neutron.notifiers import batch_notifier from neutron.openstack.common import uuidutils @@ -105,45 +105,8 @@ class Notifier(object): session=session, region_name=cfg.CONF.nova.region_name, extensions=[server_external_events]) - self.pending_events = [] - self._waiting_to_send = False - - def queue_event(self, event): - """Called to queue sending an event with the next batch of events. - - Sending events individually, as they occur, has been problematic as it - can result in a flood of sends. Previously, there was a loopingcall - thread that would send batched events on a periodic interval. However, - maintaining a persistent thread in the loopingcall was also - problematic. - - This replaces the loopingcall with a mechanism that creates a - short-lived thread on demand when the first event is queued. That - thread will sleep once for the same send_events_interval to allow other - events to queue up in pending_events and then will send them when it - wakes. - - If a thread is already alive and waiting, this call will simply queue - the event and return leaving it up to the thread to send it. - - :param event: the event that occurred. - """ - if not event: - return - - self.pending_events.append(event) - - if self._waiting_to_send: - return - - self._waiting_to_send = True - - def last_out_sends(): - eventlet.sleep(cfg.CONF.send_events_interval) - self._waiting_to_send = False - self.send_events() - - eventlet.spawn_n(last_out_sends) + self.batch_notifier = batch_notifier.BatchNotifier( + cfg.CONF.send_events_interval, self.send_events) def _is_compute_port(self, port): try: @@ -189,11 +152,11 @@ class Notifier(object): disassociate_returned_obj = {'floatingip': {'port_id': None}} event = self.create_port_changed_event(action, original_obj, disassociate_returned_obj) - self.queue_event(event) + self.batch_notifier.queue_event(event) event = self.create_port_changed_event(action, original_obj, returned_obj) - self.queue_event(event) + self.batch_notifier.queue_event(event) def create_port_changed_event(self, action, original_obj, returned_obj): port = None @@ -270,16 +233,10 @@ class Notifier(object): def send_port_status(self, mapper, connection, port): event = getattr(port, "_notify_event", None) - self.queue_event(event) + self.batch_notifier.queue_event(event) port._notify_event = None - def send_events(self): - if not self.pending_events: - return - - batched_events = self.pending_events - self.pending_events = [] - + def send_events(self, batched_events): LOG.debug("Sending events: %s", batched_events) try: response = self.nclient.server_external_events.create( diff --git a/neutron/tests/unit/notifiers/test_batch_notifier.py b/neutron/tests/unit/notifiers/test_batch_notifier.py new file mode 100644 index 00000000000..23bede8c326 --- /dev/null +++ b/neutron/tests/unit/notifiers/test_batch_notifier.py @@ -0,0 +1,51 @@ +# Copyright (c) 2014 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.notifiers import batch_notifier +from neutron.tests import base + + +class TestBatchNotifier(base.BaseTestCase): + def setUp(self): + super(TestBatchNotifier, self).setUp() + self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x) + self.spawn_n = mock.patch('eventlet.spawn_n').start() + + def test_queue_event_no_event(self): + self.notifier.queue_event(None) + self.assertEqual(0, len(self.notifier.pending_events)) + self.assertEqual(0, self.spawn_n.call_count) + + def test_queue_event_first_event(self): + self.notifier.queue_event(mock.Mock()) + self.assertEqual(1, len(self.notifier.pending_events)) + self.assertEqual(1, self.spawn_n.call_count) + + def test_queue_event_multiple_events(self): + events = 6 + for i in range(0, events): + self.notifier.queue_event(mock.Mock()) + self.assertEqual(events, len(self.notifier.pending_events)) + self.assertEqual(1, self.spawn_n.call_count) + + def test_queue_event_call_send_events(self): + with mock.patch.object(self.notifier, + 'callback') as send_events: + self.spawn_n.side_effect = lambda func: func() + self.notifier.queue_event(mock.Mock()) + self.assertFalse(self.notifier._waiting_to_send) + self.assertTrue(send_events.called) diff --git a/neutron/tests/unit/notifiers/test_notifiers_nova.py b/neutron/tests/unit/notifiers/test_notifiers_nova.py index afae75ec3c5..49ccb975ae7 100644 --- a/neutron/tests/unit/notifiers/test_notifiers_nova.py +++ b/neutron/tests/unit/notifiers/test_notifiers_nova.py @@ -218,21 +218,21 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier.nclient.server_external_events, 'create') as nclient_create: nclient_create.return_value = 'i am a string!' - self.nova_notifier.send_events() + self.nova_notifier.send_events([]) def test_nova_send_event_rasies_404(self): with mock.patch.object( self.nova_notifier.nclient.server_external_events, 'create') as nclient_create: nclient_create.side_effect = nova_exceptions.NotFound - self.nova_notifier.send_events() + self.nova_notifier.send_events([]) def test_nova_send_events_raises(self): with mock.patch.object( self.nova_notifier.nclient.server_external_events, 'create') as nclient_create: nclient_create.side_effect = Exception - self.nova_notifier.send_events() + self.nova_notifier.send_events([]) def test_nova_send_events_returns_non_200(self): device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87' @@ -242,9 +242,8 @@ class TestNovaNotify(base.BaseTestCase): nclient_create.return_value = [{'code': 404, 'name': 'network-changed', 'server_uuid': device_id}] - self.nova_notifier.pending_events.append( - {'name': 'network-changed', 'server_uuid': device_id}) - self.nova_notifier.send_events() + self.nova_notifier.send_events( + [{'name': 'network-changed', 'server_uuid': device_id}]) def test_nova_send_events_return_200(self): device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87' @@ -254,9 +253,8 @@ class TestNovaNotify(base.BaseTestCase): nclient_create.return_value = [{'code': 200, 'name': 'network-changed', 'server_uuid': device_id}] - self.nova_notifier.pending_events.append( - {'name': 'network-changed', 'server_uuid': device_id}) - self.nova_notifier.send_events() + self.nova_notifier.send_events( + [{'name': 'network-changed', 'server_uuid': device_id}]) def test_nova_send_events_multiple(self): device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87' @@ -269,40 +267,9 @@ class TestNovaNotify(base.BaseTestCase): {'code': 200, 'name': 'network-changed', 'server_uuid': device_id}] - self.nova_notifier.pending_events.append( - {'name': 'network-changed', 'server_uuid': device_id}) - self.nova_notifier.pending_events.append( - {'name': 'network-changed', 'server_uuid': device_id}) - self.nova_notifier.send_events() - - def test_queue_event_no_event(self): - with mock.patch('eventlet.spawn_n') as spawn_n: - self.nova_notifier.queue_event(None) - self.assertEqual(0, len(self.nova_notifier.pending_events)) - self.assertEqual(0, spawn_n.call_count) - - def test_queue_event_first_event(self): - with mock.patch('eventlet.spawn_n') as spawn_n: - self.nova_notifier.queue_event(mock.Mock()) - self.assertEqual(1, len(self.nova_notifier.pending_events)) - self.assertEqual(1, spawn_n.call_count) - - def test_queue_event_multiple_events(self): - with mock.patch('eventlet.spawn_n') as spawn_n: - events = 6 - for i in range(0, events): - self.nova_notifier.queue_event(mock.Mock()) - self.assertEqual(events, len(self.nova_notifier.pending_events)) - self.assertEqual(1, spawn_n.call_count) - - def test_queue_event_call_send_events(self): - with mock.patch.object(self.nova_notifier, - 'send_events') as send_events: - with mock.patch('eventlet.spawn_n') as spawn_n: - spawn_n.side_effect = lambda func: func() - self.nova_notifier.queue_event(mock.Mock()) - self.assertFalse(self.nova_notifier._waiting_to_send) - send_events.assert_called_once_with() + self.nova_notifier.send_events([ + {'name': 'network-changed', 'server_uuid': device_id}, + {'name': 'network-changed', 'server_uuid': device_id}]) def test_reassociate_floatingip_without_disassociate_event(self): returned_obj = {'floatingip': @@ -311,12 +278,15 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier._waiting_to_send = True self.nova_notifier.send_network_change( 'update_floatingip', original_obj, returned_obj) - self.assertEqual(2, len(self.nova_notifier.pending_events)) + self.assertEqual( + 2, len(self.nova_notifier.batch_notifier.pending_events)) returned_obj_non = {'floatingip': {'port_id': None}} event_dis = self.nova_notifier.create_port_changed_event( 'update_floatingip', original_obj, returned_obj_non) event_assoc = self.nova_notifier.create_port_changed_event( 'update_floatingip', original_obj, returned_obj) - self.assertEqual(self.nova_notifier.pending_events[0], event_dis) - self.assertEqual(self.nova_notifier.pending_events[1], event_assoc) + self.assertEqual( + self.nova_notifier.batch_notifier.pending_events[0], event_dis) + self.assertEqual( + self.nova_notifier.batch_notifier.pending_events[1], event_assoc)