diff --git a/neutron/agent/l3/ha.py b/neutron/agent/l3/ha.py index 79587910d23..899be9b7f00 100644 --- a/neutron/agent/l3/ha.py +++ b/neutron/agent/l3/ha.py @@ -182,9 +182,6 @@ class AgentMixin(object): ri.disable_radvd() def notify_server(self, batched_events): - eventlet.spawn_n(self._notify_server, batched_events) - - def _notify_server(self, batched_events): translated_states = dict((router_id, TRANSLATION_MAP[state]) for router_id, state in batched_events) LOG.debug('Updating server with HA routers states %s', diff --git a/neutron/notifiers/batch_notifier.py b/neutron/notifiers/batch_notifier.py index 3e5c6f9666e..6bdbaa8b091 100644 --- a/neutron/notifiers/batch_notifier.py +++ b/neutron/notifiers/batch_notifier.py @@ -10,17 +10,17 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + import eventlet -from neutron_lib.utils import runtime -from oslo_utils import uuidutils class BatchNotifier(object): def __init__(self, batch_interval, callback): - self.pending_events = [] + self._pending_events = eventlet.Queue() self.callback = callback self.batch_interval = batch_interval - self._lock_identifier = 'notifier-%s' % uuidutils.generate_uuid() + self._mutex = threading.Lock() def queue_event(self, event): """Called to queue sending an event with the next batch of events. @@ -33,32 +33,35 @@ class BatchNotifier(object): This replaces the loopingcall with a mechanism that creates a short-lived thread on demand whenever an event is queued. That thread - will wait for a lock, send all queued events and then sleep for - 'batch_interval' seconds to allow other events to queue up. + will check if the lock is released, send all queued events and then + sleep for 'batch_interval' seconds. If at the end of this sleep time, + other threads have added new events to the event queue, the same thread + will process them. - This effectively acts as a rate limiter to only allow 1 batch per - 'batch_interval' seconds. + At the same time, other threads will be able to add new events to the + queue and will spawn new "synced_send" threads to process them. But if + the mutex is locked, the spawned thread will end immediately. :param event: the event that occurred. """ if not event: return - self.pending_events.append(event) + self._pending_events.put(event) - @runtime.synchronized(self._lock_identifier) def synced_send(): - self._notify() - # sleeping after send while holding the lock allows subsequent - # events to batch up - eventlet.sleep(self.batch_interval) + if not self._mutex.locked(): + with self._mutex: + while not self._pending_events.empty(): + self._notify() + # sleeping after send while holding the lock allows + # subsequent events to batch up + eventlet.sleep(self.batch_interval) eventlet.spawn_n(synced_send) def _notify(self): - if not self.pending_events: - return - - batched_events = self.pending_events - self.pending_events = [] + batched_events = [] + while not self._pending_events.empty(): + batched_events.append(self._pending_events.get()) self.callback(batched_events) diff --git a/neutron/tests/unit/notifiers/test_batch_notifier.py b/neutron/tests/unit/notifiers/test_batch_notifier.py index b33f5c1c3ec..83e000b9c21 100644 --- a/neutron/tests/unit/notifiers/test_batch_notifier.py +++ b/neutron/tests/unit/notifiers/test_batch_notifier.py @@ -16,6 +16,7 @@ import eventlet import mock +from neutron.common import utils from neutron.notifiers import batch_notifier from neutron.tests import base @@ -23,41 +24,54 @@ from neutron.tests import base class TestBatchNotifier(base.BaseTestCase): def setUp(self): super(TestBatchNotifier, self).setUp() - self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x) - self.spawn_n_p = mock.patch('eventlet.spawn_n') - self.spawn_n = self.spawn_n_p.start() + self._received_events = eventlet.Queue() + self.notifier = batch_notifier.BatchNotifier(2, self._queue_events) + self.spawn_n_p = mock.patch.object(eventlet, 'spawn_n') + + def _queue_events(self, events): + for event in events: + self._received_events.put(event) def test_queue_event_no_event(self): + spawn_n = self.spawn_n_p.start() self.notifier.queue_event(None) - self.assertEqual(0, len(self.notifier.pending_events)) - self.assertEqual(0, self.spawn_n.call_count) + self.assertEqual(0, len(self.notifier._pending_events.queue)) + self.assertEqual(0, spawn_n.call_count) def test_queue_event_first_event(self): + spawn_n = self.spawn_n_p.start() self.notifier.queue_event(mock.Mock()) - self.assertEqual(1, len(self.notifier.pending_events)) - self.assertEqual(1, self.spawn_n.call_count) + self.assertEqual(1, len(self.notifier._pending_events.queue)) + self.assertEqual(1, spawn_n.call_count) - def test_queue_event_multiple_events(self): - self.spawn_n_p.stop() - c_mock = mock.patch.object(self.notifier, 'callback').start() - events = 6 - for i in range(0, events): - self.notifier.queue_event(mock.Mock()) + def test_queue_event_multiple_events_notify_method(self): + def _batch_notifier_dequeue(): + while not self.notifier._pending_events.empty(): + self.notifier._pending_events.get() + + c_mock = mock.patch.object(self.notifier, '_notify', + side_effect=_batch_notifier_dequeue).start() + events = 20 + for i in range(events): + self.notifier.queue_event('Event %s' % i) eventlet.sleep(0) # yield to let coro execute - while self.notifier.pending_events: - # wait for coroutines to finish - eventlet.sleep(0.1) + utils.wait_until_true(self.notifier._pending_events.empty, + timeout=5) + # Called twice: when the first thread calls "synced_send" and then, + # in the same loop, when self._pending_events is not empty(). All + # self.notifier.queue_event calls are done in just one + # "batch_interval" (2 secs). self.assertEqual(2, c_mock.call_count) - self.assertEqual(6, sum(len(c[0][0]) for c in c_mock.call_args_list)) - self.assertEqual(0, len(self.notifier.pending_events)) - def test_queue_event_call_send_events(self): - with mock.patch.object(self.notifier, - 'callback') as send_events: - self.spawn_n.side_effect = lambda func: func() - self.notifier.queue_event(mock.Mock()) - while self.notifier.pending_events: - # wait for coroutines to finish - eventlet.sleep(0.1) - self.assertTrue(send_events.called) + def test_queue_event_multiple_events_callback_method(self): + events = 20 + for i in range(events): + self.notifier.queue_event('Event %s' % i) + eventlet.sleep(0) # yield to let coro execute + + utils.wait_until_true(self.notifier._pending_events.empty, + timeout=5) + expected = ['Event %s' % i for i in range(events)] + # Check the events have been handled in the same input order. + self.assertEqual(expected, list(self._received_events.queue)) diff --git a/neutron/tests/unit/notifiers/test_nova.py b/neutron/tests/unit/notifiers/test_nova.py index f07bc77b5e0..6ea3571d41f 100644 --- a/neutron/tests/unit/notifiers/test_nova.py +++ b/neutron/tests/unit/notifiers/test_nova.py @@ -301,7 +301,7 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier.send_network_change( 'update_floatingip', original_obj, returned_obj) self.assertEqual( - 2, len(self.nova_notifier.batch_notifier.pending_events)) + 2, len(self.nova_notifier.batch_notifier._pending_events.queue)) returned_obj_non = {'floatingip': {'port_id': None}} event_dis = self.nova_notifier.create_port_changed_event( @@ -309,9 +309,10 @@ class TestNovaNotify(base.BaseTestCase): event_assoc = self.nova_notifier.create_port_changed_event( 'update_floatingip', original_obj, returned_obj) self.assertEqual( - self.nova_notifier.batch_notifier.pending_events[0], event_dis) + self.nova_notifier.batch_notifier._pending_events.get(), event_dis) self.assertEqual( - self.nova_notifier.batch_notifier.pending_events[1], event_assoc) + self.nova_notifier.batch_notifier._pending_events.get(), + event_assoc) def test_delete_port_notify(self): device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87' @@ -360,6 +361,7 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier.notify_port_active_direct(port) self.assertEqual( - 1, len(self.nova_notifier.batch_notifier.pending_events)) - self.assertEqual(expected_event, - self.nova_notifier.batch_notifier.pending_events[0]) + 1, len(self.nova_notifier.batch_notifier._pending_events.queue)) + self.assertEqual( + expected_event, + self.nova_notifier.batch_notifier._pending_events.get())