# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import copy import uuid from ovsdbapp.backend.ovs_idl import connection from ovsdbapp import constants as const from ovsdbapp import event as ovsdb_event from ovsdbapp.tests.functional import base from ovsdbapp.tests import utils from neutron.common.ovn import constants as ovn_const from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb \ import impl_idl_ovn as impl from neutron.services.portforwarding import constants as pf_const from neutron.tests.functional import base as n_base from neutron.tests.functional.resources.ovsdb import events OWNER = ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY PF_PLUGIN = pf_const.PORT_FORWARDING_PLUGIN class BaseOvnIdlTest(n_base.BaseLoggingTestCase, base.FunctionalTestCase): schemas = ['OVN_Southbound', 'OVN_Northbound'] def setUp(self): super(BaseOvnIdlTest, self).setUp() self.api = impl.OvsdbSbOvnIdl(self.connection['OVN_Southbound']) self.nbapi = impl.OvsdbNbOvnIdl(self.connection['OVN_Northbound']) self.handler = ovsdb_event.RowEventHandler() self.api.idl.notify = self.handler.notify class TestSbApi(BaseOvnIdlTest): def setUp(self): super(TestSbApi, self).setUp() self.data = { 'chassis': [ {'external_ids': {'ovn-bridge-mappings': 'public:br-ex,private:br-0'}}, {'external_ids': {'ovn-bridge-mappings': 'public:br-ex,public2:br-ex'}}, {'external_ids': {'ovn-bridge-mappings': 'public:br-ex'}}, ] } self.load_test_data() def load_test_data(self): with self.api.transaction(check_error=True) as txn: for chassis in self.data['chassis']: chassis['name'] = utils.get_rand_device_name('chassis') chassis['hostname'] = '%s.localdomain.com' % chassis['name'] txn.add(self.api.chassis_add( chassis['name'], ['geneve'], chassis['hostname'], hostname=chassis['hostname'], external_ids=chassis['external_ids'])) def test_get_chassis_hostname_and_physnets(self): mapping = self.api.get_chassis_hostname_and_physnets() self.assertLessEqual(len(self.data['chassis']), len(mapping)) self.assertGreaterEqual(set(mapping.keys()), {c['hostname'] for c in self.data['chassis']}) def test_get_all_chassis(self): chassis_list = set(self.api.get_all_chassis()) our_chassis = {c['name'] for c in self.data['chassis']} self.assertLessEqual(our_chassis, chassis_list) def test_get_chassis_data_for_ml2_bind_port(self): host = self.data['chassis'][0]['hostname'] dp, iface, phys = self.api.get_chassis_data_for_ml2_bind_port(host) self.assertEqual('', dp) self.assertEqual('', iface) self.assertItemsEqual(phys, ['private', 'public']) def test_chassis_exists(self): self.assertTrue(self.api.chassis_exists( self.data['chassis'][0]['hostname'])) self.assertFalse(self.api.chassis_exists("nochassishere")) def test_get_chassis_and_physnets(self): mapping = self.api.get_chassis_and_physnets() self.assertLessEqual(len(self.data['chassis']), len(mapping)) self.assertGreaterEqual(set(mapping.keys()), {c['name'] for c in self.data['chassis']}) def _add_switch_port(self, chassis_name, type='localport'): sname, pname = (utils.get_rand_device_name(prefix=p) for p in ('switch', 'port')) chassis = self.api.lookup('Chassis', chassis_name) row_event = events.WaitForCreatePortBindingEvent(pname) self.handler.watch_event(row_event) with self.nbapi.transaction(check_error=True) as txn: switch = txn.add(self.nbapi.ls_add(sname)) port = txn.add(self.nbapi.lsp_add(sname, pname, type=type)) row_event.wait() return chassis, switch.result, port.result, row_event.row def test_get_metadata_port_network(self): chassis, switch, port, binding = self._add_switch_port( self.data['chassis'][0]['name']) result = self.api.get_metadata_port_network(str(binding.datapath.uuid)) self.assertEqual(binding, result) self.assertEqual(binding.datapath.external_ids['logical-switch'], str(switch.uuid)) def test_get_metadata_port_network_missing(self): val = str(uuid.uuid4()) self.assertIsNone(self.api.get_metadata_port_network(val)) def test_set_get_chassis_metadata_networks(self): name = self.data['chassis'][0]['name'] nets = [str(uuid.uuid4()) for _ in range(3)] self.api.set_chassis_metadata_networks(name, nets).execute( check_error=True) self.assertEqual(nets, self.api.get_chassis_metadata_networks(name)) def test_get_network_port_bindings_by_ip(self): chassis, switch, port, binding = self._add_switch_port( self.data['chassis'][0]['name']) mac = 'de:ad:be:ef:4d:ad' ipaddr = '192.0.2.1' mac_ip = '%s %s' % (mac, ipaddr) pb_update_event = events.WaitForUpdatePortBindingEvent( port.name, mac=[mac_ip]) self.handler.watch_event(pb_update_event) self.nbapi.lsp_set_addresses( port.name, [mac_ip]).execute(check_error=True) self.assertTrue(pb_update_event.wait()) self.api.lsp_bind(port.name, chassis.name).execute(check_error=True) result = self.api.get_network_port_bindings_by_ip( str(binding.datapath.uuid), ipaddr) self.assertIn(binding, result) def test_get_ports_on_chassis(self): chassis, switch, port, binding = self._add_switch_port( self.data['chassis'][0]['name']) self.api.lsp_bind(port.name, chassis.name).execute(check_error=True) self.assertEqual([binding], self.api.get_ports_on_chassis(chassis.name)) def test_get_logical_port_chassis_and_datapath(self): chassis, switch, port, binding = self._add_switch_port( self.data['chassis'][0]['name']) self.api.lsp_bind(port.name, chassis.name).execute(check_error=True) self.assertEqual( (chassis.name, str(binding.datapath.uuid)), self.api.get_logical_port_chassis_and_datapath(port.name)) class TestNbApi(BaseOvnIdlTest): def setUp(self): super(TestNbApi, self).setUp() self.data = { 'lbs': [ {'name': 'pf-floatingip-fip_id1-tcp', 'protocol': const.PROTO_TCP, 'vips': {"172.24.4.8:2020": ["10.0.0.10:22"], "172.24.4.8:2021": ["10.0.0.11:22"]}, 'external_ids': { ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: pf_const.PORT_FORWARDING_PLUGIN, ovn_const.OVN_FIP_EXT_ID_KEY: 'fip_id1', ovn_const.OVN_REV_NUM_EXT_ID_KEY: '1', ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY: 'neutron-rtr1_id'}}, {'name': 'pf-floatingip-fip_id1-udp', 'protocol': const.PROTO_UDP, 'vips': {"172.24.4.8:53": ["10.0.0.10:53"]}, 'external_ids': { ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: pf_const.PORT_FORWARDING_PLUGIN, ovn_const.OVN_FIP_EXT_ID_KEY: 'fip_id1', ovn_const.OVN_REV_NUM_EXT_ID_KEY: '1', ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY: 'neutron-rtr1_id'}}, {'name': 'pf-floatingip-fip_id2-tcp', 'protocol': const.PROTO_TCP, 'vips': {"172.24.4.100:2020": ["10.0.0.10:22"]}, 'external_ids': { ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: pf_const.PORT_FORWARDING_PLUGIN, ovn_const.OVN_FIP_EXT_ID_KEY: 'fip_id2', ovn_const.OVN_REV_NUM_EXT_ID_KEY: '10', ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY: 'neutron-rtr2_id'}}, {'name': 'octavia_lb1_id', 'vips': {"172.24.4.250:80": ["10.0.0.10:8080", "10.0.0.11:8080"]}, 'selection_fields': ['ip_dst', 'ip_src', 'tp_dst', 'tp_src'], 'external_ids': { 'enabled': 'True', 'lr_ref': 'neutron-rtr1_id', 'ls_refs': str({'neutron-net1_id': 1}), ovn_const.LB_EXT_IDS_VIP_KEY: '172.24.4.250', ovn_const.LB_EXT_IDS_VIP_PORT_ID_KEY: 'neutron-port1_id', }}, ], } self.load_test_data() def load_test_data(self): with self.nbapi.transaction(check_error=True) as txn: for lb in copy.deepcopy(self.data['lbs']): lb_name = lb.pop('name') for vip, ips in lb.pop('vips').items(): txn.add(self.nbapi.lb_add( lb_name, vip, ips, may_exist=True, **lb)) def test_lb_list(self): lbs = self.nbapi.lb_list().execute(check_error=True) self.assertEqual(len(self.data['lbs']), len(lbs)) exp_values = [(lb['name'], lb['external_ids']) for lb in self.data['lbs']] lbs_values = [(lb.name, lb.external_ids) for lb in lbs] self.assertItemsEqual(exp_values, lbs_values) def test_get_router_floatingip_lbs(self): f = self.nbapi.get_router_floatingip_lbs self.assertEqual([], f('unused_router_name')) for exp_router_name in ['neutron-rtr1_id', 'neutron-rtr2_id']: exp_values = [ (lb['name'], lb['external_ids']) for lb in self.data['lbs'] if all([lb['external_ids'].get(OWNER) == PF_PLUGIN, lb['external_ids'].get( ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY) == exp_router_name])] lbs_values = [(lb.name, lb.external_ids) for lb in f(exp_router_name)] self.assertTrue(exp_values) self.assertItemsEqual(exp_values, lbs_values) def test_get_floatingip_in_nat_or_lb(self): f = self.nbapi.get_floatingip_in_nat_or_lb self.assertIsNone(f('unused_fip_id')) for exp_fip_id in ['fip_id1', 'fip_id2']: exp_values = [ (lb['name'], lb['external_ids']) for lb in self.data['lbs'] if all([lb['external_ids'].get(OWNER) == PF_PLUGIN, lb['external_ids'].get( ovn_const.OVN_FIP_EXT_ID_KEY) == exp_fip_id])] # get_floatingip_in_nat_or_lb returns the first entry # it finds for the fip_id provided. Make sure that it # is present in exp_values. lb_match = f(exp_fip_id) self.assertIn((lb_match['name'], lb_match['external_ids']), exp_values) class TestIgnoreConnectionTimeout(BaseOvnIdlTest): @classmethod def create_connection(cls, schema): idl = connection.OvsdbIdl.from_server(cls.schema_map[schema], schema) return connection.Connection(idl, 0) def test_setUp_will_fail_if_this_is_broken(self): pass