Merge "Turn nova notifier into a proper rate limiter"
This commit is contained in:
commit
3e4dbb5d16
|
@ -11,14 +11,17 @@
|
|||
# under the License.
|
||||
|
||||
import eventlet
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
class BatchNotifier(object):
|
||||
def __init__(self, batch_interval, callback):
|
||||
self.pending_events = []
|
||||
self._waiting_to_send = False
|
||||
self.callback = callback
|
||||
self.batch_interval = batch_interval
|
||||
self._lock_identifier = 'notifier-%s' % uuidutils.generate_uuid()
|
||||
|
||||
def queue_event(self, event):
|
||||
"""Called to queue sending an event with the next batch of events.
|
||||
|
@ -30,13 +33,12 @@ class BatchNotifier(object):
|
|||
problematic.
|
||||
|
||||
This replaces the loopingcall with a mechanism that creates a
|
||||
short-lived thread on demand when the first event is queued. That
|
||||
thread will sleep once for the same batch_duration to allow other
|
||||
events to queue up in pending_events and then will send them when it
|
||||
wakes.
|
||||
short-lived thread on demand whenever an event is queued. That thread
|
||||
will wait for a lock, send all queued events and then sleep for
|
||||
'batch_interval' seconds to allow other events to queue up.
|
||||
|
||||
If a thread is already alive and waiting, this call will simply queue
|
||||
the event and return leaving it up to the thread to send it.
|
||||
This effectively acts as a rate limiter to only allow 1 batch per
|
||||
'batch_interval' seconds.
|
||||
|
||||
:param event: the event that occurred.
|
||||
"""
|
||||
|
@ -45,17 +47,14 @@ class BatchNotifier(object):
|
|||
|
||||
self.pending_events.append(event)
|
||||
|
||||
if self._waiting_to_send:
|
||||
return
|
||||
|
||||
self._waiting_to_send = True
|
||||
|
||||
def last_out_sends():
|
||||
eventlet.sleep(self.batch_interval)
|
||||
self._waiting_to_send = False
|
||||
@utils.synchronized(self._lock_identifier)
|
||||
def synced_send():
|
||||
self._notify()
|
||||
# sleeping after send while holding the lock allows subsequent
|
||||
# events to batch up
|
||||
eventlet.sleep(self.batch_interval)
|
||||
|
||||
eventlet.spawn_n(last_out_sends)
|
||||
eventlet.spawn_n(synced_send)
|
||||
|
||||
def _notify(self):
|
||||
if not self.pending_events:
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from neutron.notifiers import batch_notifier
|
||||
|
@ -23,7 +24,8 @@ class TestBatchNotifier(base.BaseTestCase):
|
|||
def setUp(self):
|
||||
super(TestBatchNotifier, self).setUp()
|
||||
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
|
||||
self.spawn_n = mock.patch('eventlet.spawn_n').start()
|
||||
self.spawn_n_p = mock.patch('eventlet.spawn_n')
|
||||
self.spawn_n = self.spawn_n_p.start()
|
||||
|
||||
def test_queue_event_no_event(self):
|
||||
self.notifier.queue_event(None)
|
||||
|
@ -36,16 +38,26 @@ class TestBatchNotifier(base.BaseTestCase):
|
|||
self.assertEqual(1, self.spawn_n.call_count)
|
||||
|
||||
def test_queue_event_multiple_events(self):
|
||||
self.spawn_n_p.stop()
|
||||
c_mock = mock.patch.object(self.notifier, 'callback').start()
|
||||
events = 6
|
||||
for i in range(0, events):
|
||||
self.notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(events, len(self.notifier.pending_events))
|
||||
self.assertEqual(1, self.spawn_n.call_count)
|
||||
eventlet.sleep(0) # yield to let coro execute
|
||||
|
||||
while self.notifier.pending_events:
|
||||
# wait for coroutines to finish
|
||||
eventlet.sleep(0.1)
|
||||
self.assertEqual(2, c_mock.call_count)
|
||||
self.assertEqual(6, sum(len(c[0][0]) for c in c_mock.call_args_list))
|
||||
self.assertEqual(0, len(self.notifier.pending_events))
|
||||
|
||||
def test_queue_event_call_send_events(self):
|
||||
with mock.patch.object(self.notifier,
|
||||
'callback') as send_events:
|
||||
self.spawn_n.side_effect = lambda func: func()
|
||||
self.notifier.queue_event(mock.Mock())
|
||||
self.assertFalse(self.notifier._waiting_to_send)
|
||||
while self.notifier.pending_events:
|
||||
# wait for coroutines to finish
|
||||
eventlet.sleep(0.1)
|
||||
self.assertTrue(send_events.called)
|
||||
|
|
Loading…
Reference in New Issue