use neutron notifier correctly

Change-Id: I6060f3e31d2a65ff692f66d52a176dac033e8cd1
Closes-Bug: #1714016
This commit is contained in:
Eyal Leshem 2017-08-30 17:10:04 +03:00
parent 6dda0c3f41
commit caf120bdab
4 changed files with 44 additions and 14 deletions

View File

@ -262,7 +262,10 @@ class DfLocalController(object):
def notify_port_status(self, ovs_port, status):
if self.neutron_notifier:
self.neutron_notifier.notify_port_status(ovs_port, status)
table_name = l2.LogicalPort.table_name
iface_id = ovs_port.lport
self.neutron_notifier.notify_neutron_server(table_name, iface_id,
'update', status)
def _get_delete_handler(self, table):
method_name = 'delete_{0}'.format(table)

View File

@ -31,7 +31,7 @@ class NeutronNotifierDriver(object):
"""
@abc.abstractmethod
def notify_neutron_server(self, table, key, action, value, topic):
def notify_neutron_server(self, table, key, action, value):
"""Notify the change to neutron server. Note that this method
will run in neutron server.
@ -39,6 +39,5 @@ class NeutronNotifierDriver(object):
:param key: the id of db model data
:param action: the action of data, create/update/delete
:param value: the value of db model data
:param topic: the topic of neutron server's corresponding listener
:return: None
"""

View File

@ -72,16 +72,13 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
id=host,
ppid=os.getppid(),
)
self.nb_api.register_listener_callback(self.notify_neutron_server,
update_neutron_cb = self._neutron_server_update_core_plugin
self.nb_api.register_listener_callback(update_neutron_cb,
listener.topic)
LOG.info("Register listener %s", listener.id)
self.heart_beat_reporter = HeartBeatReporter(self.nb_api, listener)
self.heart_beat_reporter.daemonize()
def notify_port_status(self, ovs_port, status):
port = ovs_port.lport
self._send_event(l2.LogicalPort.table_name, port.id, 'update', status)
def _send_event(self, table, key, action, value):
listeners = self.nb_api.get_all(core.Listener)
listeners_num = len(listeners)
@ -107,13 +104,18 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
LOG.info("Publish to neutron %s", topic)
self.nb_api.publisher.send_event(update)
def notify_neutron_server(self, table, key, action, value, topic=None):
@staticmethod
def _neutron_server_update_core_plugin(table, key, action,
value, topic=None):
if l2.LogicalPort.table_name == table and 'update' == action:
LOG.info("Process port %s status update event", key)
core_plugin = directory.get_plugin()
core_plugin.update_port_status(n_context.get_admin_context(),
key, value)
def notify_neutron_server(self, table, key, action, value):
self._send_event(table, key, action, value)
class HeartBeatReporter(object):
"""Updates heartbeat timestamp periodically with a random delay."""

View File

@ -14,6 +14,7 @@ import mock
from oslo_config import cfg
from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db.models import core
from dragonflow.db.models import l2
from dragonflow.tests import base as tests_base
@ -55,13 +56,38 @@ class TestNbApiNeutronNotifier(tests_base.BaseTestCase):
self.notifier.create_heart_beat_reporter('fake_host')
self.notifier.nb_api.register_listener_callback.assert_not_called()
def test_notify_neutron_server(self):
def test_notify_core_plugin(self):
core_plugin = mock.Mock()
with mock.patch("neutron_lib.plugins.directory.get_plugin",
return_value=core_plugin):
self.notifier.notify_neutron_server(l2.LogicalPort.table_name,
"fake_port",
"update",
"up")
tb_name = l2.LogicalPort.table_name
self.notifier._neutron_server_update_core_plugin(tb_name,
"fake_port",
"update",
"up")
core_plugin.update_port_status.assert_called_once_with(
mock.ANY, "fake_port", "up")
def _nofity_neutron_server_call_with_correct_args(self, *args):
db_common.DbUpdate.reset_mock()
self.notifier.notify_neutron_server(*args)
db_common.DbUpdate.assert_called_once_with(*args,
topic='listener_1')
def test_notify_neutron_server(self):
listener = core.Listener(id="1")
self.notifier.nb_api = mock.MagicMock()
self.notifier.nb_api.get_all.return_value = [listener]
with mock.patch("dragonflow.db.db_common.DbUpdate"):
self._nofity_neutron_server_call_with_correct_args(
"fake_table",
"fake_port",
"update",
"up")
self._nofity_neutron_server_call_with_correct_args(
"fake_table1",
"fake_port1",
"create",
"down")