Merge "Replace loopingcall in notifier with a delayed send" into stable/icehouse
This commit is contained in:
commit
01fa7d1381
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import novaclient.v1_1.client as nclient
|
||||
from novaclient.v1_1.contrib import server_external_events
|
||||
from oslo.config import cfg
|
||||
@ -22,7 +23,6 @@ from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import uuidutils
|
||||
|
||||
|
||||
@ -52,8 +52,44 @@ class Notifier(object):
|
||||
region_name=cfg.CONF.nova_region_name,
|
||||
extensions=[server_external_events])
|
||||
self.pending_events = []
|
||||
event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
|
||||
event_sender.start(interval=cfg.CONF.send_events_interval)
|
||||
self._waiting_to_send = False
|
||||
|
||||
def queue_event(self, event):
|
||||
"""Called to queue sending an event with the next batch of events.
|
||||
|
||||
Sending events individually, as they occur, has been problematic as it
|
||||
can result in a flood of sends. Previously, there was a loopingcall
|
||||
thread that would send batched events on a periodic interval. However,
|
||||
maintaining a persistent thread in the loopingcall was also
|
||||
problematic.
|
||||
|
||||
This replaces the loopingcall with a mechanism that creates a
|
||||
short-lived thread on demand when the first event is queued. That
|
||||
thread will sleep once for the same send_events_interval to allow other
|
||||
events to queue up in pending_events and then will send them when it
|
||||
wakes.
|
||||
|
||||
If a thread is already alive and waiting, this call will simply queue
|
||||
the event and return leaving it up to the thread to send it.
|
||||
|
||||
:param event: the event that occured.
|
||||
"""
|
||||
if not event:
|
||||
return
|
||||
|
||||
self.pending_events.append(event)
|
||||
|
||||
if self._waiting_to_send:
|
||||
return
|
||||
|
||||
self._waiting_to_send = True
|
||||
|
||||
def last_out_sends():
|
||||
eventlet.sleep(cfg.CONF.send_events_interval)
|
||||
self._waiting_to_send = False
|
||||
self.send_events()
|
||||
|
||||
eventlet.spawn_n(last_out_sends)
|
||||
|
||||
def _is_compute_port(self, port):
|
||||
try:
|
||||
@ -91,8 +127,7 @@ class Notifier(object):
|
||||
|
||||
event = self.create_port_changed_event(action, original_obj,
|
||||
returned_obj)
|
||||
if event:
|
||||
self.pending_events.append(event)
|
||||
self.queue_event(event)
|
||||
|
||||
def create_port_changed_event(self, action, original_obj, returned_obj):
|
||||
port = None
|
||||
@ -169,8 +204,7 @@ class Notifier(object):
|
||||
|
||||
def send_port_status(self, mapper, connection, port):
|
||||
event = getattr(port, "_notify_event", None)
|
||||
if event:
|
||||
self.pending_events.append(event)
|
||||
self.queue_event(event)
|
||||
port._notify_event = None
|
||||
|
||||
def send_events(self):
|
||||
|
@ -266,3 +266,32 @@ class TestNovaNotify(base.BaseTestCase):
|
||||
self.nova_notifier.pending_events.append(
|
||||
{'name': 'network-changed', 'server_uuid': device_id})
|
||||
self.nova_notifier.send_events()
|
||||
|
||||
def test_queue_event_no_event(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
self.nova_notifier.queue_event(None)
|
||||
self.assertEqual(0, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(0, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_first_event(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(1, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(1, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_multiple_events(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
events = 6
|
||||
for i in range(0, events):
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(events, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(1, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_call_send_events(self):
|
||||
with mock.patch.object(self.nova_notifier,
|
||||
'send_events') as send_events:
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
spawn_n.side_effect = lambda func: func()
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertFalse(self.nova_notifier._waiting_to_send)
|
||||
send_events.assert_called_once_with()
|
||||
|
Loading…
Reference in New Issue
Block a user