Reuse nova batch notifier
Refactor the batch notifier currently used by the Nova notifier into a separate class. It will be reused when batching L3 HA state change events. Partially-Implements: blueprint report-ha-router-master Change-Id: I2f8cf261f48bdb632ac0bd643a337290b5297fce
This commit is contained in:
parent
d8ad3a1a63
commit
b7d39c1360
|
@ -0,0 +1,66 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
|
||||
|
||||
class BatchNotifier(object):
|
||||
def __init__(self, batch_interval, callback):
|
||||
self.pending_events = []
|
||||
self._waiting_to_send = False
|
||||
self.callback = callback
|
||||
self.batch_interval = batch_interval
|
||||
|
||||
def queue_event(self, event):
|
||||
"""Called to queue sending an event with the next batch of events.
|
||||
|
||||
Sending events individually, as they occur, has been problematic as it
|
||||
can result in a flood of sends. Previously, there was a loopingcall
|
||||
thread that would send batched events on a periodic interval. However,
|
||||
maintaining a persistent thread in the loopingcall was also
|
||||
problematic.
|
||||
|
||||
This replaces the loopingcall with a mechanism that creates a
|
||||
short-lived thread on demand when the first event is queued. That
|
||||
thread will sleep once for the same batch_duration to allow other
|
||||
events to queue up in pending_events and then will send them when it
|
||||
wakes.
|
||||
|
||||
If a thread is already alive and waiting, this call will simply queue
|
||||
the event and return leaving it up to the thread to send it.
|
||||
|
||||
:param event: the event that occurred.
|
||||
"""
|
||||
if not event:
|
||||
return
|
||||
|
||||
self.pending_events.append(event)
|
||||
|
||||
if self._waiting_to_send:
|
||||
return
|
||||
|
||||
self._waiting_to_send = True
|
||||
|
||||
def last_out_sends():
|
||||
eventlet.sleep(self.batch_interval)
|
||||
self._waiting_to_send = False
|
||||
self._notify()
|
||||
|
||||
eventlet.spawn_n(last_out_sends)
|
||||
|
||||
def _notify(self):
|
||||
if not self.pending_events:
|
||||
return
|
||||
|
||||
batched_events = self.pending_events
|
||||
self.pending_events = []
|
||||
self.callback(batched_events)
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
from keystoneclient import auth as ks_auth
|
||||
from keystoneclient.auth.identity import v2 as v2_auth
|
||||
from keystoneclient import session as ks_session
|
||||
|
@ -28,6 +27,7 @@ from neutron.common import constants
|
|||
from neutron import context
|
||||
from neutron.i18n import _LE, _LI, _LW
|
||||
from neutron import manager
|
||||
from neutron.notifiers import batch_notifier
|
||||
from neutron.openstack.common import uuidutils
|
||||
|
||||
|
||||
|
@ -105,45 +105,8 @@ class Notifier(object):
|
|||
session=session,
|
||||
region_name=cfg.CONF.nova.region_name,
|
||||
extensions=[server_external_events])
|
||||
self.pending_events = []
|
||||
self._waiting_to_send = False
|
||||
|
||||
def queue_event(self, event):
|
||||
"""Called to queue sending an event with the next batch of events.
|
||||
|
||||
Sending events individually, as they occur, has been problematic as it
|
||||
can result in a flood of sends. Previously, there was a loopingcall
|
||||
thread that would send batched events on a periodic interval. However,
|
||||
maintaining a persistent thread in the loopingcall was also
|
||||
problematic.
|
||||
|
||||
This replaces the loopingcall with a mechanism that creates a
|
||||
short-lived thread on demand when the first event is queued. That
|
||||
thread will sleep once for the same send_events_interval to allow other
|
||||
events to queue up in pending_events and then will send them when it
|
||||
wakes.
|
||||
|
||||
If a thread is already alive and waiting, this call will simply queue
|
||||
the event and return leaving it up to the thread to send it.
|
||||
|
||||
:param event: the event that occurred.
|
||||
"""
|
||||
if not event:
|
||||
return
|
||||
|
||||
self.pending_events.append(event)
|
||||
|
||||
if self._waiting_to_send:
|
||||
return
|
||||
|
||||
self._waiting_to_send = True
|
||||
|
||||
def last_out_sends():
|
||||
eventlet.sleep(cfg.CONF.send_events_interval)
|
||||
self._waiting_to_send = False
|
||||
self.send_events()
|
||||
|
||||
eventlet.spawn_n(last_out_sends)
|
||||
self.batch_notifier = batch_notifier.BatchNotifier(
|
||||
cfg.CONF.send_events_interval, self.send_events)
|
||||
|
||||
def _is_compute_port(self, port):
|
||||
try:
|
||||
|
@ -189,11 +152,11 @@ class Notifier(object):
|
|||
disassociate_returned_obj = {'floatingip': {'port_id': None}}
|
||||
event = self.create_port_changed_event(action, original_obj,
|
||||
disassociate_returned_obj)
|
||||
self.queue_event(event)
|
||||
self.batch_notifier.queue_event(event)
|
||||
|
||||
event = self.create_port_changed_event(action, original_obj,
|
||||
returned_obj)
|
||||
self.queue_event(event)
|
||||
self.batch_notifier.queue_event(event)
|
||||
|
||||
def create_port_changed_event(self, action, original_obj, returned_obj):
|
||||
port = None
|
||||
|
@ -270,16 +233,10 @@ class Notifier(object):
|
|||
|
||||
def send_port_status(self, mapper, connection, port):
|
||||
event = getattr(port, "_notify_event", None)
|
||||
self.queue_event(event)
|
||||
self.batch_notifier.queue_event(event)
|
||||
port._notify_event = None
|
||||
|
||||
def send_events(self):
|
||||
if not self.pending_events:
|
||||
return
|
||||
|
||||
batched_events = self.pending_events
|
||||
self.pending_events = []
|
||||
|
||||
def send_events(self, batched_events):
|
||||
LOG.debug("Sending events: %s", batched_events)
|
||||
try:
|
||||
response = self.nclient.server_external_events.create(
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
# Copyright (c) 2014 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.notifiers import batch_notifier
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestBatchNotifier(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestBatchNotifier, self).setUp()
|
||||
self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
|
||||
self.spawn_n = mock.patch('eventlet.spawn_n').start()
|
||||
|
||||
def test_queue_event_no_event(self):
|
||||
self.notifier.queue_event(None)
|
||||
self.assertEqual(0, len(self.notifier.pending_events))
|
||||
self.assertEqual(0, self.spawn_n.call_count)
|
||||
|
||||
def test_queue_event_first_event(self):
|
||||
self.notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(1, len(self.notifier.pending_events))
|
||||
self.assertEqual(1, self.spawn_n.call_count)
|
||||
|
||||
def test_queue_event_multiple_events(self):
|
||||
events = 6
|
||||
for i in range(0, events):
|
||||
self.notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(events, len(self.notifier.pending_events))
|
||||
self.assertEqual(1, self.spawn_n.call_count)
|
||||
|
||||
def test_queue_event_call_send_events(self):
|
||||
with mock.patch.object(self.notifier,
|
||||
'callback') as send_events:
|
||||
self.spawn_n.side_effect = lambda func: func()
|
||||
self.notifier.queue_event(mock.Mock())
|
||||
self.assertFalse(self.notifier._waiting_to_send)
|
||||
self.assertTrue(send_events.called)
|
|
@ -218,21 +218,21 @@ class TestNovaNotify(base.BaseTestCase):
|
|||
self.nova_notifier.nclient.server_external_events,
|
||||
'create') as nclient_create:
|
||||
nclient_create.return_value = 'i am a string!'
|
||||
self.nova_notifier.send_events()
|
||||
self.nova_notifier.send_events([])
|
||||
|
||||
def test_nova_send_event_rasies_404(self):
|
||||
with mock.patch.object(
|
||||
self.nova_notifier.nclient.server_external_events,
|
||||
'create') as nclient_create:
|
||||
nclient_create.side_effect = nova_exceptions.NotFound
|
||||
self.nova_notifier.send_events()
|
||||
self.nova_notifier.send_events([])
|
||||
|
||||
def test_nova_send_events_raises(self):
|
||||
with mock.patch.object(
|
||||
self.nova_notifier.nclient.server_external_events,
|
||||
'create') as nclient_create:
|
||||
nclient_create.side_effect = Exception
|
||||
self.nova_notifier.send_events()
|
||||
self.nova_notifier.send_events([])
|
||||
|
||||
def test_nova_send_events_returns_non_200(self):
|
||||
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
|
||||
|
@ -242,9 +242,8 @@ class TestNovaNotify(base.BaseTestCase):
|
|||
nclient_create.return_value = [{'code': 404,
|
||||
'name': 'network-changed',
|
||||
'server_uuid': device_id}]
|
||||
self.nova_notifier.pending_events.append(
|
||||
{'name': 'network-changed', 'server_uuid': device_id})
|
||||
self.nova_notifier.send_events()
|
||||
self.nova_notifier.send_events(
|
||||
[{'name': 'network-changed', 'server_uuid': device_id}])
|
||||
|
||||
def test_nova_send_events_return_200(self):
|
||||
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
|
||||
|
@ -254,9 +253,8 @@ class TestNovaNotify(base.BaseTestCase):
|
|||
nclient_create.return_value = [{'code': 200,
|
||||
'name': 'network-changed',
|
||||
'server_uuid': device_id}]
|
||||
self.nova_notifier.pending_events.append(
|
||||
{'name': 'network-changed', 'server_uuid': device_id})
|
||||
self.nova_notifier.send_events()
|
||||
self.nova_notifier.send_events(
|
||||
[{'name': 'network-changed', 'server_uuid': device_id}])
|
||||
|
||||
def test_nova_send_events_multiple(self):
|
||||
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
|
||||
|
@ -269,40 +267,9 @@ class TestNovaNotify(base.BaseTestCase):
|
|||
{'code': 200,
|
||||
'name': 'network-changed',
|
||||
'server_uuid': device_id}]
|
||||
self.nova_notifier.pending_events.append(
|
||||
{'name': 'network-changed', 'server_uuid': device_id})
|
||||
self.nova_notifier.pending_events.append(
|
||||
{'name': 'network-changed', 'server_uuid': device_id})
|
||||
self.nova_notifier.send_events()
|
||||
|
||||
def test_queue_event_no_event(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
self.nova_notifier.queue_event(None)
|
||||
self.assertEqual(0, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(0, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_first_event(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(1, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(1, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_multiple_events(self):
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
events = 6
|
||||
for i in range(0, events):
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertEqual(events, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(1, spawn_n.call_count)
|
||||
|
||||
def test_queue_event_call_send_events(self):
|
||||
with mock.patch.object(self.nova_notifier,
|
||||
'send_events') as send_events:
|
||||
with mock.patch('eventlet.spawn_n') as spawn_n:
|
||||
spawn_n.side_effect = lambda func: func()
|
||||
self.nova_notifier.queue_event(mock.Mock())
|
||||
self.assertFalse(self.nova_notifier._waiting_to_send)
|
||||
send_events.assert_called_once_with()
|
||||
self.nova_notifier.send_events([
|
||||
{'name': 'network-changed', 'server_uuid': device_id},
|
||||
{'name': 'network-changed', 'server_uuid': device_id}])
|
||||
|
||||
def test_reassociate_floatingip_without_disassociate_event(self):
|
||||
returned_obj = {'floatingip':
|
||||
|
@ -311,12 +278,15 @@ class TestNovaNotify(base.BaseTestCase):
|
|||
self.nova_notifier._waiting_to_send = True
|
||||
self.nova_notifier.send_network_change(
|
||||
'update_floatingip', original_obj, returned_obj)
|
||||
self.assertEqual(2, len(self.nova_notifier.pending_events))
|
||||
self.assertEqual(
|
||||
2, len(self.nova_notifier.batch_notifier.pending_events))
|
||||
|
||||
returned_obj_non = {'floatingip': {'port_id': None}}
|
||||
event_dis = self.nova_notifier.create_port_changed_event(
|
||||
'update_floatingip', original_obj, returned_obj_non)
|
||||
event_assoc = self.nova_notifier.create_port_changed_event(
|
||||
'update_floatingip', original_obj, returned_obj)
|
||||
self.assertEqual(self.nova_notifier.pending_events[0], event_dis)
|
||||
self.assertEqual(self.nova_notifier.pending_events[1], event_assoc)
|
||||
self.assertEqual(
|
||||
self.nova_notifier.batch_notifier.pending_events[0], event_dis)
|
||||
self.assertEqual(
|
||||
self.nova_notifier.batch_notifier.pending_events[1], event_assoc)
|
||||
|
|
Loading…
Reference in New Issue