Turn nova notifier into a proper rate limiter
This adjusts the batching logic in the Nova notifier to immediately send and then sleep to allow batching of subsequent calls in the batch interval. So rather than always wait for 2 seconds to elapse while batching, batching will only occur in the 2 second period after a call is made. This turns the batch notifier into a standard queuing rate limiter. The upside to this is a single port create results in an immediate notification to Nova without a delay. The downside is now that a sudden burst of 6 port creations to a previously idle server will result in 2 notification calls to Nova (1 for the first call and another for the other 5). Closes-Bug: #1564648 Change-Id: I82f403441564955345f47877151e0c457712dd2f
This commit is contained in:
parent
7df76e162a
commit
255e8a839d
@ -11,14 +11,17 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from neutron.common import utils
|
||||||
|
|
||||||
|
|
||||||
class BatchNotifier(object):
|
class BatchNotifier(object):
|
||||||
def __init__(self, batch_interval, callback):
|
def __init__(self, batch_interval, callback):
|
||||||
self.pending_events = []
|
self.pending_events = []
|
||||||
self._waiting_to_send = False
|
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
self.batch_interval = batch_interval
|
self.batch_interval = batch_interval
|
||||||
|
self._lock_identifier = 'notifier-%s' % uuidutils.generate_uuid()
|
||||||
|
|
||||||
def queue_event(self, event):
|
def queue_event(self, event):
|
||||||
"""Called to queue sending an event with the next batch of events.
|
"""Called to queue sending an event with the next batch of events.
|
||||||
@ -30,13 +33,12 @@ class BatchNotifier(object):
|
|||||||
problematic.
|
problematic.
|
||||||
|
|
||||||
This replaces the loopingcall with a mechanism that creates a
|
This replaces the loopingcall with a mechanism that creates a
|
||||||
short-lived thread on demand when the first event is queued. That
|
short-lived thread on demand whenever an event is queued. That thread
|
||||||
thread will sleep once for the same batch_duration to allow other
|
will wait for a lock, send all queued events and then sleep for
|
||||||
events to queue up in pending_events and then will send them when it
|
'batch_interval' seconds to allow other events to queue up.
|
||||||
wakes.
|
|
||||||
|
|
||||||
If a thread is already alive and waiting, this call will simply queue
|
This effectively acts as a rate limiter to only allow 1 batch per
|
||||||
the event and return leaving it up to the thread to send it.
|
'batch_interval' seconds.
|
||||||
|
|
||||||
:param event: the event that occurred.
|
:param event: the event that occurred.
|
||||||
"""
|
"""
|
||||||
@ -45,17 +47,14 @@ class BatchNotifier(object):
|
|||||||
|
|
||||||
self.pending_events.append(event)
|
self.pending_events.append(event)
|
||||||
|
|
||||||
if self._waiting_to_send:
|
@utils.synchronized(self._lock_identifier)
|
||||||
return
|
def synced_send():
|
||||||
|
|
||||||
self._waiting_to_send = True
|
|
||||||
|
|
||||||
def last_out_sends():
|
|
||||||
eventlet.sleep(self.batch_interval)
|
|
||||||
self._waiting_to_send = False
|
|
||||||
self._notify()
|
self._notify()
|
||||||
|
# sleeping after send while holding the lock allows subsequent
|
||||||
|
# events to batch up
|
||||||
|
eventlet.sleep(self.batch_interval)
|
||||||
|
|
||||||
eventlet.spawn_n(last_out_sends)
|
eventlet.spawn_n(synced_send)
|
||||||
|
|
||||||
def _notify(self):
|
def _notify(self):
|
||||||
if not self.pending_events:
|
if not self.pending_events:
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron.notifiers import batch_notifier
|
from neutron.notifiers import batch_notifier
|
||||||
@ -23,7 +24,8 @@ class TestBatchNotifier(base.BaseTestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestBatchNotifier, self).setUp()
|
super(TestBatchNotifier, self).setUp()
|
||||||
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
|
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
|
||||||
self.spawn_n = mock.patch('eventlet.spawn_n').start()
|
self.spawn_n_p = mock.patch('eventlet.spawn_n')
|
||||||
|
self.spawn_n = self.spawn_n_p.start()
|
||||||
|
|
||||||
def test_queue_event_no_event(self):
|
def test_queue_event_no_event(self):
|
||||||
self.notifier.queue_event(None)
|
self.notifier.queue_event(None)
|
||||||
@ -36,16 +38,26 @@ class TestBatchNotifier(base.BaseTestCase):
|
|||||||
self.assertEqual(1, self.spawn_n.call_count)
|
self.assertEqual(1, self.spawn_n.call_count)
|
||||||
|
|
||||||
def test_queue_event_multiple_events(self):
|
def test_queue_event_multiple_events(self):
|
||||||
|
self.spawn_n_p.stop()
|
||||||
|
c_mock = mock.patch.object(self.notifier, 'callback').start()
|
||||||
events = 6
|
events = 6
|
||||||
for i in range(0, events):
|
for i in range(0, events):
|
||||||
self.notifier.queue_event(mock.Mock())
|
self.notifier.queue_event(mock.Mock())
|
||||||
self.assertEqual(events, len(self.notifier.pending_events))
|
eventlet.sleep(0) # yield to let coro execute
|
||||||
self.assertEqual(1, self.spawn_n.call_count)
|
|
||||||
|
while self.notifier.pending_events:
|
||||||
|
# wait for coroutines to finish
|
||||||
|
eventlet.sleep(0.1)
|
||||||
|
self.assertEqual(2, c_mock.call_count)
|
||||||
|
self.assertEqual(6, sum(len(c[0][0]) for c in c_mock.call_args_list))
|
||||||
|
self.assertEqual(0, len(self.notifier.pending_events))
|
||||||
|
|
||||||
def test_queue_event_call_send_events(self):
|
def test_queue_event_call_send_events(self):
|
||||||
with mock.patch.object(self.notifier,
|
with mock.patch.object(self.notifier,
|
||||||
'callback') as send_events:
|
'callback') as send_events:
|
||||||
self.spawn_n.side_effect = lambda func: func()
|
self.spawn_n.side_effect = lambda func: func()
|
||||||
self.notifier.queue_event(mock.Mock())
|
self.notifier.queue_event(mock.Mock())
|
||||||
self.assertFalse(self.notifier._waiting_to_send)
|
while self.notifier.pending_events:
|
||||||
|
# wait for coroutines to finish
|
||||||
|
eventlet.sleep(0.1)
|
||||||
self.assertTrue(send_events.called)
|
self.assertTrue(send_events.called)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user