Browse Source

Merge "Refactor the L3 agent batch notifier" into stable/queens

changes/15/722215/1
Zuul 2 months ago
committed by Gerrit Code Review
parent
commit
cf9bd62d4d
4 changed files with 73 additions and 57 deletions
  1. +0
    -3
      neutron/agent/l3/ha.py
  2. +22
    -19
      neutron/notifiers/batch_notifier.py
  3. +43
    -29
      neutron/tests/unit/notifiers/test_batch_notifier.py
  4. +8
    -6
      neutron/tests/unit/notifiers/test_nova.py

+ 0
- 3
neutron/agent/l3/ha.py View File

@@ -178,9 +178,6 @@ class AgentMixin(object):
ri.disable_radvd()

def notify_server(self, batched_events):
eventlet.spawn_n(self._notify_server, batched_events)

def _notify_server(self, batched_events):
translated_states = dict((router_id, TRANSLATION_MAP[state]) for
router_id, state in batched_events)
LOG.debug('Updating server with HA routers states %s',


+ 22
- 19
neutron/notifiers/batch_notifier.py View File

@@ -10,17 +10,17 @@
# License for the specific language governing permissions and limitations
# under the License.

import threading

import eventlet
from neutron_lib.utils import runtime
from oslo_utils import uuidutils


class BatchNotifier(object):
def __init__(self, batch_interval, callback):
self.pending_events = []
self._pending_events = eventlet.Queue()
self.callback = callback
self.batch_interval = batch_interval
self._lock_identifier = 'notifier-%s' % uuidutils.generate_uuid()
self._mutex = threading.Lock()

def queue_event(self, event):
"""Called to queue sending an event with the next batch of events.
@@ -33,32 +33,35 @@ class BatchNotifier(object):

This replaces the loopingcall with a mechanism that creates a
short-lived thread on demand whenever an event is queued. That thread
will wait for a lock, send all queued events and then sleep for
'batch_interval' seconds to allow other events to queue up.
will check if the lock is released, send all queued events and then
sleep for 'batch_interval' seconds. If at the end of this sleep time,
other threads have added new events to the event queue, the same thread
will process them.

This effectively acts as a rate limiter to only allow 1 batch per
'batch_interval' seconds.
At the same time, other threads will be able to add new events to the
queue and will spawn new "synced_send" threads to process them. But if
the mutex is locked, the spawned thread will end immediately.

:param event: the event that occurred.
"""
if not event:
return

self.pending_events.append(event)
self._pending_events.put(event)

@runtime.synchronized(self._lock_identifier)
def synced_send():
self._notify()
# sleeping after send while holding the lock allows subsequent
# events to batch up
eventlet.sleep(self.batch_interval)
if not self._mutex.locked():
with self._mutex:
while not self._pending_events.empty():
self._notify()
# sleeping after send while holding the lock allows
# subsequent events to batch up
eventlet.sleep(self.batch_interval)

eventlet.spawn_n(synced_send)

def _notify(self):
if not self.pending_events:
return

batched_events = self.pending_events
self.pending_events = []
batched_events = []
while not self._pending_events.empty():
batched_events.append(self._pending_events.get())
self.callback(batched_events)

+ 43
- 29
neutron/tests/unit/notifiers/test_batch_notifier.py View File

@@ -16,6 +16,7 @@
import eventlet
import mock

from neutron.common import utils
from neutron.notifiers import batch_notifier
from neutron.tests import base

@@ -23,41 +24,54 @@ from neutron.tests import base
class TestBatchNotifier(base.BaseTestCase):
def setUp(self):
super(TestBatchNotifier, self).setUp()
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
self.spawn_n_p = mock.patch('eventlet.spawn_n')
self.spawn_n = self.spawn_n_p.start()
self._received_events = eventlet.Queue()
self.notifier = batch_notifier.BatchNotifier(2, self._queue_events)
self.spawn_n_p = mock.patch.object(eventlet, 'spawn_n')

def _queue_events(self, events):
for event in events:
self._received_events.put(event)

def test_queue_event_no_event(self):
spawn_n = self.spawn_n_p.start()
self.notifier.queue_event(None)
self.assertEqual(0, len(self.notifier.pending_events))
self.assertEqual(0, self.spawn_n.call_count)
self.assertEqual(0, len(self.notifier._pending_events.queue))
self.assertEqual(0, spawn_n.call_count)

def test_queue_event_first_event(self):
spawn_n = self.spawn_n_p.start()
self.notifier.queue_event(mock.Mock())
self.assertEqual(1, len(self.notifier.pending_events))
self.assertEqual(1, self.spawn_n.call_count)

def test_queue_event_multiple_events(self):
self.spawn_n_p.stop()
c_mock = mock.patch.object(self.notifier, 'callback').start()
events = 6
for i in range(0, events):
self.notifier.queue_event(mock.Mock())
self.assertEqual(1, len(self.notifier._pending_events.queue))
self.assertEqual(1, spawn_n.call_count)

def test_queue_event_multiple_events_notify_method(self):
def _batch_notifier_dequeue():
while not self.notifier._pending_events.empty():
self.notifier._pending_events.get()

c_mock = mock.patch.object(self.notifier, '_notify',
side_effect=_batch_notifier_dequeue).start()
events = 20
for i in range(events):
self.notifier.queue_event('Event %s' % i)
eventlet.sleep(0) # yield to let coro execute

while self.notifier.pending_events:
# wait for coroutines to finish
eventlet.sleep(0.1)
utils.wait_until_true(self.notifier._pending_events.empty,
timeout=5)
# Called twice: when the first thread calls "synced_send" and then,
# in the same loop, when self._pending_events is not empty(). All
# self.notifier.queue_event calls are done in just one
# "batch_interval" (2 secs).
self.assertEqual(2, c_mock.call_count)
self.assertEqual(6, sum(len(c[0][0]) for c in c_mock.call_args_list))
self.assertEqual(0, len(self.notifier.pending_events))
def test_queue_event_call_send_events(self):
with mock.patch.object(self.notifier,
'callback') as send_events:
self.spawn_n.side_effect = lambda func: func()
self.notifier.queue_event(mock.Mock())
while self.notifier.pending_events:
# wait for coroutines to finish
eventlet.sleep(0.1)
self.assertTrue(send_events.called)
def test_queue_event_multiple_events_callback_method(self):
events = 20
for i in range(events):
self.notifier.queue_event('Event %s' % i)
eventlet.sleep(0) # yield to let coro execute
utils.wait_until_true(self.notifier._pending_events.empty,
timeout=5)
expected = ['Event %s' % i for i in range(events)]
# Check the events have been handled in the same input order.
self.assertEqual(expected, list(self._received_events.queue))

+ 8
- 6
neutron/tests/unit/notifiers/test_nova.py View File

@@ -301,7 +301,7 @@ class TestNovaNotify(base.BaseTestCase):
self.nova_notifier.send_network_change(
'update_floatingip', original_obj, returned_obj)
self.assertEqual(
2, len(self.nova_notifier.batch_notifier.pending_events))
2, len(self.nova_notifier.batch_notifier._pending_events.queue))

returned_obj_non = {'floatingip': {'port_id': None}}
event_dis = self.nova_notifier.create_port_changed_event(
@@ -309,9 +309,10 @@ class TestNovaNotify(base.BaseTestCase):
event_assoc = self.nova_notifier.create_port_changed_event(
'update_floatingip', original_obj, returned_obj)
self.assertEqual(
self.nova_notifier.batch_notifier.pending_events[0], event_dis)
self.nova_notifier.batch_notifier._pending_events.get(), event_dis)
self.assertEqual(
self.nova_notifier.batch_notifier.pending_events[1], event_assoc)
self.nova_notifier.batch_notifier._pending_events.get(),
event_assoc)

def test_delete_port_notify(self):
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
@@ -360,6 +361,7 @@ class TestNovaNotify(base.BaseTestCase):
self.nova_notifier.notify_port_active_direct(port)

self.assertEqual(
1, len(self.nova_notifier.batch_notifier.pending_events))
self.assertEqual(expected_event,
self.nova_notifier.batch_notifier.pending_events[0])
1, len(self.nova_notifier.batch_notifier._pending_events.queue))
self.assertEqual(
expected_event,
self.nova_notifier.batch_notifier._pending_events.get())

Loading…
Cancel
Save