L2Gateway Service Plugin

This patch set implements RPC calls that are initiated by the
L2 gateway agent destined to the L2 gateway service plugin

Author: Phani Pawan <ppawan@hp.com>
Co-Authored-By: Manjunath Patil <mpatil@hp.com>

Change-Id: If0db38aad3706265b89b972efe7b29cffcb5e10e
stable/ocata
Phani Pawan 8 years ago
parent d7956c9027
commit 79dc418b0c
  1. 5
      etc/l2gw_plugin.ini
  2. 7
      networking_l2gw/services/l2gateway/common/config.py
  3. 232
      networking_l2gw/services/l2gateway/ovsdb/data.py
  4. 56
      networking_l2gw/services/l2gateway/plugin.py
  5. 366
      networking_l2gw/tests/unit/services/l2gateway/ovsdb/test_data.py
  6. 64
      networking_l2gw/tests/unit/services/l2gateway/test_plugin.py

@ -15,3 +15,8 @@
# checks for the monitoring L2 gateway agent
# periodic_monitoring_interval =
# Example: periodic_monitoring_interval = 5
# (StrOpt) Callback class of the l2 gateway plugin
# where the RPCs from the agent are going to get invoked
# l2gw_callback_class =
# Example: l2gw_callback_class = "networking_l2gw.services.l2gateway.ovsdb.data.L2GatewayOVSDBCallbacks"

@ -56,7 +56,12 @@ L2GW_OPTS = [
cfg.IntOpt('periodic_monitoring_interval',
default=5,
help=_('Periodic interval at which the plugin '
'checks for the monitoring L2 gateway agent'))
'checks for the monitoring L2 gateway agent')),
cfg.StrOpt('l2gw_callback_class',
default='networking_l2gw.services.l2gateway.ovsdb.'
'data.L2GatewayOVSDBCallbacks',
help=_('L2 gateway plugin callback class where the'
'RPCs from the agent are going to get invoked'))
]

@ -0,0 +1,232 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron import context as ctx
from neutron.openstack.common import log as logging
from networking_l2gw.services.l2gateway.common import constants as n_const
from networking_l2gw.services.l2gateway.common import topics
from networking_l2gw.services.l2gateway.ovsdb import lib as db
from networking_l2gw.services.l2gateway import plugin as l2gw_plugin
from oslo.config import cfg
from oslo import messaging
LOG = logging.getLogger(__name__)
class L2GatewayOVSDBCallbacks(object):
"""Implement the rpc call back functions from OVSDB."""
target = messaging.Target(version='1.0')
def __init__(self, plugin):
super(L2GatewayOVSDBCallbacks, self).__init__()
self.plugin = plugin
context = ctx.get_admin_context()
self.agent_rpc = l2gw_plugin.L2gatewayAgentApi(
topics.L2GATEWAY_AGENT, context, cfg.CONF.host)
def update_ovsdb_changes(self, context, ovsdb_data):
"""RPC to update the changes from OVSDB in the database."""
self.ovsdb = OVSDBData(
ovsdb_data.get(n_const.OVSDB_IDENTIFIER))
self.ovsdb.update_ovsdb_changes(context, ovsdb_data)
class OVSDBData(object):
"""Process the data coming from OVSDB."""
def __init__(self, ovsdb_identifier=None):
self.ovsdb_identifier = ovsdb_identifier
self._setup_entry_table()
def update_ovsdb_changes(self, context, ovsdb_data):
"""RPC to update the changes from OVSDB in the database."""
for item, value in ovsdb_data.items():
if item != n_const.OVSDB_IDENTIFIER:
self.entry_table.get(item)(context, value)
def _setup_entry_table(self):
self.entry_table = {'new_logical_switches':
self._process_new_logical_switches,
'new_physical_ports':
self._process_new_physical_ports,
'new_physical_switches':
self._process_new_physical_switches,
'new_physical_locators':
self._process_new_physical_locators,
'new_local_macs':
self._process_new_local_macs,
'new_remote_macs':
self._process_new_remote_macs,
'modified_physical_ports':
self._process_modified_physical_ports,
'deleted_logical_switches':
self._process_deleted_logical_switches,
'deleted_physical_ports':
self._process_deleted_physical_ports,
'deleted_physical_switches':
self._process_deleted_physical_switches,
'deleted_physical_locators':
self._process_deleted_physical_locators,
'deleted_local_macs':
self._process_deleted_local_macs,
'deleted_remote_macs':
self._process_deleted_remote_macs,
}
return
def _process_new_logical_switches(self,
context,
new_logical_switches):
for logical_switch in new_logical_switches:
ls_dict = logical_switch
ls_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
l_switch = db.get_logical_switch(context, ls_dict)
if not l_switch:
db.add_logical_switch(context, ls_dict)
def _process_new_physical_switches(self,
context,
new_physical_switches):
for physical_switch in new_physical_switches:
ps_dict = physical_switch
ps_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
if (ps_dict.get('tunnel_ip'))[0] == 'set':
ps_dict['tunnel_ip'] = None
p_switch = db.get_physical_switch(context, ps_dict)
if not p_switch:
db.add_physical_switch(context, ps_dict)
def _process_new_physical_ports(self,
context,
new_physical_ports):
for physical_port in new_physical_ports:
pp_dict = physical_port
pp_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
p_port = db.get_physical_port(context, pp_dict)
if not p_port:
db.add_physical_port(context, pp_dict)
if pp_dict.get('vlan_bindings'):
for vlan_binding in pp_dict.get('vlan_bindings'):
vlan_binding[
n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
vlan_binding['port_uuid'] = pp_dict.get('uuid')
v_binding = db.get_vlan_binding(context, vlan_binding)
if not v_binding:
db.add_vlan_binding(context, vlan_binding)
def _process_new_physical_locators(self,
context,
new_physical_locators):
for physical_locator in new_physical_locators:
pl_dict = physical_locator
pl_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
p_locator = db.get_physical_locator(context, pl_dict)
if not p_locator:
db.add_physical_locator(context, pl_dict)
def _process_new_local_macs(self,
context,
new_local_macs):
for local_mac in new_local_macs:
lm_dict = local_mac
lm_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
lm_dict['logical_switch_uuid'] = local_mac.get('logical_switch_id')
l_mac = db.get_ucast_mac_local(context, lm_dict)
if not l_mac:
db.add_ucast_mac_local(context, lm_dict)
def _process_new_remote_macs(self,
context,
new_remote_macs):
for remote_mac in new_remote_macs:
rm_dict = remote_mac
rm_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
r_mac = db.get_ucast_mac_remote(context, rm_dict)
if not r_mac:
db.add_ucast_mac_remote(context, rm_dict)
def _process_modified_physical_ports(self,
context,
modified_physical_ports):
for physical_port in modified_physical_ports:
pp_dict = physical_port
pp_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
modified_port = db.get_physical_port(context, pp_dict)
if modified_port:
port_vlan_bindings = physical_port.get('vlan_bindings')
vlan_bindings = db.get_all_vlan_bindings_by_physical_port(
context, pp_dict)
for vlan_binding in vlan_bindings:
db.delete_vlan_binding(context, vlan_binding)
for port_vlan_binding in port_vlan_bindings:
port_vlan_binding['port_uuid'] = pp_dict['uuid']
port_vlan_binding[
n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.add_vlan_binding(context, port_vlan_binding)
else:
db.add_physical_port(context, pp_dict)
def _process_deleted_logical_switches(self,
context,
deleted_logical_switches):
for logical_switch in deleted_logical_switches:
ls_dict = logical_switch
ls_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_logical_switch(context, ls_dict)
def _process_deleted_physical_switches(self,
context,
deleted_physical_switches):
for physical_switch in deleted_physical_switches:
ps_dict = physical_switch
ps_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_physical_switch(context, ps_dict)
def _process_deleted_physical_ports(self,
context,
deleted_physical_ports):
for physical_port in deleted_physical_ports:
pp_dict = physical_port
pp_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_physical_port(context, pp_dict)
def _process_deleted_physical_locators(self,
context,
deleted_physical_locators):
for physical_locator in deleted_physical_locators:
pl_dict = physical_locator
pl_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_physical_locator(context, pl_dict)
def _process_deleted_local_macs(self,
context,
deleted_local_macs):
for local_mac in deleted_local_macs:
lm_dict = local_mac
lm_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_ucast_mac_local(context, lm_dict)
def _process_deleted_remote_macs(self,
context,
deleted_remote_macs):
for remote_mac in deleted_remote_macs:
rm_dict = remote_mac
rm_dict[n_const.OVSDB_IDENTIFIER] = self.ovsdb_identifier
db.delete_ucast_mac_remote(context, rm_dict)

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from oslo.config import cfg
from neutron.common import rpc as n_rpc
from neutron import context as ctx
from neutron.db import agents_db
@ -19,23 +20,17 @@ from neutron.openstack.common import log as logging
from networking_l2gw.db.l2gateway import l2gateway_db
from networking_l2gw.services.l2gateway import agent_scheduler
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants
from networking_l2gw.services.l2gateway.common import topics
from oslo.config import cfg
from oslo import messaging
from oslo_utils import importutils
LOG = logging.getLogger(__name__)
class L2GatewayCallbacks(object):
"""RPC call back functions for L2gateway."""
def __init__(self, plugin):
super(L2GatewayCallbacks, self).__init__()
self.plugin = plugin
class L2gatewayAgentApi(object):
"""L2gateway plugin to agent RPC API."""
@ -44,6 +39,7 @@ class L2gatewayAgentApi(object):
def __init__(self, topic, context, host):
"""Initialize L2gateway plugin."""
self.context = context
self.host = host
target = messaging.Target(topic=topic, version=self.API_VERSION)
self.client = n_rpc.get_client(target)
@ -54,6 +50,34 @@ class L2gatewayAgentApi(object):
'set_monitor_agent',
hostname=hostname)
def add_vif_to_gateway(self, context, record_dict):
"""RPC to enter the VM MAC details to gateway."""
cctxt = self.client.prepare()
return cctxt.cast(self.context,
'add_vif_to_gateway',
record_dict=record_dict)
def delete_vif_from_gateway(self, context, record_dict):
"""RPC to delete the VM MAC details from gateway."""
cctxt = self.client.prepare()
return cctxt.cast(self.context,
'delete_vif_from_gateway',
record_dict=record_dict)
def delete_network(self, context, record_dict):
"""RPC to delete the Network from gateway."""
cctxt = self.client.prepare()
return cctxt.cast(self.context,
'delete_network',
record_dict=record_dict)
def update_connection_to_gateway(self, context, record_dict):
"""RPC to update the connection to gateway."""
cctxt = self.client.prepare()
return cctxt.cast(self.context,
'update_connection_to_gateway',
record_dict=record_dict)
class L2GatewayPlugin(l2gateway_db.L2GatewayMixin):
@ -67,7 +91,9 @@ class L2GatewayPlugin(l2gateway_db.L2GatewayMixin):
def __init__(self):
"""Do the initialization for the l2 gateway service plugin here."""
self.endpoints = [L2GatewayCallbacks(self),
config.register_l2gw_opts_helper()
l2gatewaycallback = cfg.CONF.default_callback_class
self.endpoints = [importutils.import_object(l2gatewaycallback, self),
agents_db.AgentExtRpcCallback()]
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(topics.L2GATEWAY_PLUGIN,
@ -87,6 +113,18 @@ class L2GatewayPlugin(l2gateway_db.L2GatewayMixin):
self.agentscheduler = agent_scheduler.L2GatewayAgentScheduler()
self.agentscheduler.initialize_thread()
def add_vif_to_gateway(self, context, port_dict):
pass
def delete_vif_from_gateway(self, context, port_dict):
pass
def create_l2_gateway_connection(self, context, l2_gateway_connection):
pass
def delete_l2_gateway_connection(self, context, l2_gateway_connection):
pass
def get_plugin_type(self):
"""Get type of the plugin."""
return constants.L2GW

@ -0,0 +1,366 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import contextlib
from neutron import context
from neutron.tests import base
from networking_l2gw.services.l2gateway.common import constants as n_const
from networking_l2gw.services.l2gateway.ovsdb import data
from networking_l2gw.services.l2gateway.ovsdb import lib
class TestL2GatewayOVSDBCallbacks():
def setUp(self):
super(TestL2GatewayOVSDBCallbacks, self).setUp()
self.context = context.get_admin_context()
def test_update_ovsdb_changes(self):
fake_ovsdb_data = {n_const.OVSDB_IDENTIFIER: 'fake_id'}
with mock.patch.object(data, 'OVSDBData') as ovs_data:
self.l2gw_callbacks.update_ovsdb_changes(self.context,
fake_ovsdb_data)
ovsdb_return_value = ovs_data.return_value
ovsdb_return_value.update_ovsdb_changes.assert_called_with(
self.context, fake_ovsdb_data)
class TestOVSDBData(base.BaseTestCase):
def setUp(self):
super(TestOVSDBData, self).setUp()
self.context = context.get_admin_context()
self.ovsdb_identifier = 'fake_ovsdb_id'
self.ovsdb_data = data.OVSDBData(self.ovsdb_identifier)
def test_init(self):
with mock.patch.object(data.OVSDBData,
'_setup_entry_table') as setup_entry_table:
self.ovsdb_data.__init__(self.ovsdb_identifier)
self.assertEqual(self.ovsdb_data.ovsdb_identifier,
'fake_ovsdb_id')
self.assertTrue(setup_entry_table.called)
def test_update_ovsdb_changes(self):
fake_dict = {}
fake_new_logical_switches = [fake_dict]
fake_new_physical_port = [fake_dict]
fake_new_physical_switches = [fake_dict]
fake_new_physical_locators = [fake_dict]
fake_new_local_macs = [fake_dict]
fake_new_remote_macs = [fake_dict]
fake_modified_physical_ports = [fake_dict]
fake_deleted_logical_switches = [fake_dict]
fake_deleted_physical_ports = [fake_dict]
fake_deleted_physical_switches = [fake_dict]
fake_deleted_physical_locators = [fake_dict]
fake_deleted_local_macs = [fake_dict]
fake_deleted_remote_macs = [fake_dict]
fake_ovsdb_data = {
n_const.OVSDB_IDENTIFIER: 'fake_ovsdb_id',
'new_logical_switches': fake_new_logical_switches,
'new_physical_ports': fake_new_physical_port,
'new_physical_switches': fake_new_physical_switches,
'new_physical_locators': fake_new_physical_locators,
'new_local_macs': fake_new_local_macs,
'new_remote_macs': fake_new_remote_macs,
'modified_physical_ports': fake_modified_physical_ports,
'deleted_logical_switches': fake_deleted_logical_switches,
'deleted_physical_switches': fake_deleted_physical_switches,
'deleted_physical_ports': fake_deleted_physical_ports,
'deleted_physical_locators': fake_deleted_physical_locators,
'deleted_local_macs': fake_deleted_local_macs,
'deleted_remote_macs': fake_deleted_remote_macs}
with contextlib.nested(
mock.patch.object(self.ovsdb_data,
'_process_new_logical_switches'),
mock.patch.object(self.ovsdb_data,
'_process_new_physical_ports'),
mock.patch.object(self.ovsdb_data,
'_process_new_physical_switches'),
mock.patch.object(self.ovsdb_data,
'_process_new_physical_locators'),
mock.patch.object(self.ovsdb_data,
'_process_new_local_macs'),
mock.patch.object(self.ovsdb_data,
'_process_new_remote_macs'),
mock.patch.object(self.ovsdb_data,
'_process_modified_physical_ports'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_logical_switches'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_physical_switches'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_physical_ports'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_physical_locators'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_local_macs'),
mock.patch.object(self.ovsdb_data,
'_process_deleted_remote_macs')
) as (process_new_logical_switches,
process_new_physical_ports,
process_new_physical_switches,
process_new_physical_locators,
process_new_local_macs,
process_new_remote_macs,
process_modified_physical_ports,
process_deleted_logical_switches,
process_deleted_physical_switches,
process_deleted_physical_ports,
process_deleted_physical_locators,
process_deleted_local_macs,
process_deleted_remote_macs):
self.ovsdb_data.entry_table = {
'new_logical_switches': process_new_logical_switches,
'new_physical_ports': process_new_physical_ports,
'new_physical_switches': process_new_physical_switches,
'new_physical_locators': process_new_physical_locators,
'new_local_macs': process_new_local_macs,
'new_remote_macs': process_new_remote_macs,
'modified_physical_ports': process_modified_physical_ports,
'deleted_logical_switches': process_deleted_logical_switches,
'deleted_physical_switches': process_deleted_physical_switches,
'deleted_physical_ports': process_deleted_physical_ports,
'deleted_physical_locators': process_deleted_physical_locators,
'deleted_local_macs': process_deleted_local_macs,
'deleted_remote_macs': process_deleted_remote_macs}
self.ovsdb_data.update_ovsdb_changes(
self.context, fake_ovsdb_data)
process_new_logical_switches.assert_called_with(
self.context, fake_new_logical_switches)
process_new_physical_ports.assert_called_with(
self.context, fake_new_physical_port)
process_new_physical_switches.assert_called_with(
self.context, fake_new_physical_switches)
process_new_physical_locators.assert_called_with(
self.context, fake_new_physical_locators)
process_new_local_macs.assert_called_with(
self.context, fake_new_local_macs)
process_new_remote_macs.assert_called_with(
self.context, fake_new_remote_macs)
process_modified_physical_ports.assert_called_with(
self.context, fake_modified_physical_ports)
process_deleted_logical_switches.assert_called_with(
self.context, fake_deleted_logical_switches)
process_deleted_physical_switches.assert_called_with(
self.context, fake_deleted_physical_switches)
process_deleted_physical_ports.assert_called_with(
self.context, fake_deleted_physical_ports)
process_deleted_physical_locators.assert_called_with(
self.context, fake_deleted_physical_locators)
process_deleted_local_macs.assert_called_with(
self.context, fake_deleted_local_macs)
process_deleted_remote_macs.assert_called_with(
self.context, fake_deleted_remote_macs)
def test_process_new_logical_switches(self):
fake_dict = {}
fake_new_logical_switches = [fake_dict]
with mock.patch.object(lib, 'get_logical_switch',
return_value=None) as get_ls:
with mock.patch.object(lib,
'add_logical_switch') as add_ls:
self.ovsdb_data._process_new_logical_switches(
self.context, fake_new_logical_switches)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_ls.assert_called_with(self.context, fake_dict)
add_ls.assert_called_with(self.context, fake_dict)
def test_process_new_physical_switches(self):
fake_dict = {'tunnel_ip': ['set']}
fake_new_physical_switches = [fake_dict]
with mock.patch.object(lib, 'get_physical_switch',
return_value=None) as get_ps:
with mock.patch.object(lib,
'add_physical_switch') as add_ps:
self.ovsdb_data._process_new_physical_switches(
self.context, fake_new_physical_switches)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertIsNone(fake_dict['tunnel_ip'])
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_ps.assert_called_with(self.context, fake_dict)
add_ps.assert_called_with(self.context, fake_dict)
def test_process_new_physical_ports(self):
fake_dict1 = {}
fake_dict2 = {'vlan_bindings': [fake_dict1]}
fake_new_physical_ports = [fake_dict2]
with contextlib.nested(
mock.patch.object(lib, 'get_physical_port',
return_value=None),
mock.patch.object(lib,
'add_physical_port'),
mock.patch.object(lib,
'get_vlan_binding', return_value=None),
mock.patch.object(lib,
'add_vlan_binding')) as (
get_pp, add_pp, get_vlan, add_vlan):
self.ovsdb_data._process_new_physical_ports(
self.context, fake_new_physical_ports)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict2)
self.assertEqual(fake_dict2[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_pp.assert_called_with(self.context, fake_dict2)
add_pp.assert_called_with(self.context, fake_dict2)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict1)
self.assertIn('port_uuid', fake_dict1)
get_vlan.assert_called_with(self.context, fake_dict1)
add_vlan.assert_called_with(self.context, fake_dict1)
def test_process_new_physical_locators(self):
fake_dict = {}
fake_new_physical_locators = [fake_dict]
with mock.patch.object(lib, 'get_physical_locator',
return_value=None) as get_pl:
with mock.patch.object(lib,
'add_physical_locator') as add_pl:
self.ovsdb_data._process_new_physical_locators(
self.context, fake_new_physical_locators)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_pl.assert_called_with(self.context, fake_dict)
add_pl.assert_called_with(self.context, fake_dict)
def test_process_new_local_macs(self):
fake_dict = {}
fake_new_local_macs = [fake_dict]
with mock.patch.object(lib, 'get_ucast_mac_local',
return_value=None) as get_lm:
with mock.patch.object(lib,
'add_ucast_mac_local') as add_lm:
self.ovsdb_data._process_new_local_macs(
self.context, fake_new_local_macs)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_lm.assert_called_with(self.context, fake_dict)
add_lm.assert_called_with(self.context, fake_dict)
def test_process_new_remote_macs(self):
fake_dict = {}
fake_new_remote_macs = [fake_dict]
with mock.patch.object(lib, 'get_ucast_mac_remote',
return_value=None) as get_mr:
with mock.patch.object(lib,
'add_ucast_mac_remote') as add_mr:
self.ovsdb_data._process_new_remote_macs(
self.context, fake_new_remote_macs)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_mr.assert_called_with(self.context, fake_dict)
add_mr.assert_called_with(self.context, fake_dict)
def test_process_deleted_logical_switches(self):
fake_dict = {}
fake_deleted_logical_switches = [fake_dict]
with mock.patch.object(lib, 'delete_logical_switch') as delete_ls:
self.ovsdb_data._process_deleted_logical_switches(
self.context, fake_deleted_logical_switches)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_ls.assert_called_with(self.context, fake_dict)
def test_process_deleted_physical_switches(self):
fake_dict = {}
fake_deleted_physical_switches = [fake_dict]
with mock.patch.object(lib, 'delete_physical_switch') as delete_ps:
self.ovsdb_data._process_deleted_physical_switches(
self.context, fake_deleted_physical_switches)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_ps.assert_called_with(self.context, fake_dict)
def test_process_deleted_physical_ports(self):
fake_dict = {}
fake_deleted_physical_ports = [fake_dict]
with mock.patch.object(lib, 'delete_physical_port') as delete_pp:
self.ovsdb_data._process_deleted_physical_ports(
self.context, fake_deleted_physical_ports)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_pp.assert_called_with(self.context, fake_dict)
def test_process_deleted_physical_locators(self):
fake_dict = {}
fake_deleted_physical_locators = [fake_dict]
with mock.patch.object(lib, 'delete_physical_locator') as delete_pl:
self.ovsdb_data._process_deleted_physical_locators(
self.context, fake_deleted_physical_locators)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_pl.assert_called_with(self.context, fake_dict)
def test_process_deleted_local_macs(self):
fake_dict = {}
fake_deleted_local_macs = [fake_dict]
with mock.patch.object(lib, 'delete_ucast_mac_local') as delete_ml:
self.ovsdb_data._process_deleted_local_macs(
self.context, fake_deleted_local_macs)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_ml.assert_called_with(self.context, fake_dict)
def test_process_deleted_remote_macs(self):
fake_dict = {}
fake_deleted_remote_macs = [fake_dict]
with mock.patch.object(lib, 'delete_ucast_mac_remote') as delete_mr:
self.ovsdb_data._process_deleted_remote_macs(
self.context, fake_deleted_remote_macs)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict)
self.assertEqual(fake_dict[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
delete_mr.assert_called_with(self.context, fake_dict)
def test_process_modified_physical_ports(self):
fake_dict1 = {}
fake_dict2 = {'vlan_bindings': [fake_dict1],
'uuid': 'fake_uuid'}
fake_modified_physical_ports = [fake_dict2]
with contextlib.nested(
mock.patch.object(lib, 'get_physical_port'),
mock.patch.object(lib,
'add_physical_port'),
mock.patch.object(lib,
'get_all_vlan_bindings_by_physical_port'),
mock.patch.object(lib,
'add_vlan_binding')) as (
get_pp, add_pp, get_vlan, add_vlan):
self.ovsdb_data._process_modified_physical_ports(
self.context, fake_modified_physical_ports)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict2)
self.assertEqual(fake_dict2[n_const.OVSDB_IDENTIFIER],
'fake_ovsdb_id')
get_pp.assert_called_with(self.context, fake_dict2)
self.assertFalse(add_pp.called)
get_vlan.assert_called_with(self.context, fake_dict2)
self.assertIn(n_const.OVSDB_IDENTIFIER, fake_dict1)
self.assertIn('port_uuid', fake_dict1)
add_vlan.assert_called_with(self.context, fake_dict1)

@ -0,0 +1,64 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron.common import rpc as n_rpc
from neutron.tests import base
from networking_l2gw.services.l2gateway import plugin as l2gw_plugin
class TestL2GatewayAgentApi(base.BaseTestCase):
def setUp(self):
self.client_mock_p = mock.patch.object(n_rpc, 'get_client')
self.client_mock = self.client_mock_p.start()
self.context = mock.ANY
self.topic = 'foo_topic'
self.host = 'foo_host'
self.plugin_rpc = l2gw_plugin.L2gatewayAgentApi(
self.topic, self.context, self.host)
super(TestL2GatewayAgentApi, self).setUp()
def test_add_vif_to_gateway(self):
cctxt = mock.Mock()
self.plugin_rpc.client.prepare.return_value = cctxt
self.plugin_rpc.add_vif_to_gateway(self.context, mock.ANY)
cctxt.cast.assert_called_with(
self.context, 'add_vif_to_gateway', record_dict=mock.ANY)
def test_delete_vif_from_gateway(self):
cctxt = mock.Mock()
self.plugin_rpc.client.prepare.return_value = cctxt
self.plugin_rpc.delete_vif_from_gateway(self.context, mock.ANY)
cctxt.cast.assert_called_with(
self.context, 'delete_vif_from_gateway', record_dict=mock.ANY)
def test_delete_network(self):
cctxt = mock.Mock()
self.plugin_rpc.client.prepare.return_value = cctxt
self.plugin_rpc.delete_network(self.context, mock.ANY)
cctxt.cast.assert_called_with(
self.context, 'delete_network', record_dict=mock.ANY)
def test_update_connection_to_gateway(self):
cctxt = mock.Mock()
self.plugin_rpc.client.prepare.return_value = cctxt
self.plugin_rpc.update_connection_to_gateway(self.context, mock.ANY)
cctxt.cast.assert_called_with(
self.context, 'update_connection_to_gateway',
record_dict=mock.ANY)
Loading…
Cancel
Save