neutron/neutron/tests/unit/services/portforwarding/test_pf_plugin.py

444 lines
20 KiB
Python

# Copyright (C) 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from unittest import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants as lib_plugin_conts
from neutron_lib.plugins import directory
from oslo_config import cfg
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resource_manager
from neutron.api.rpc.handlers import resources_rpc
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron import manager
from neutron.objects import base as obj_base
from neutron.objects import port_forwarding
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import constants as pf_consts
from neutron.services.portforwarding import pf_plugin
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class TestPortForwardingPlugin(testlib_api.SqlTestCase):
def setUp(self):
super(TestPortForwardingPlugin, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()
for mgr in (self.cons_mgr, self.prod_mgr):
mgr.clear()
mock.patch.object(
cons_registry, '_get_manager', return_value=self.cons_mgr).start()
mock.patch.object(
prod_registry, '_get_manager', return_value=self.prod_mgr).start()
self.setup_coreplugin(load_plugins=False)
mock.patch('neutron_lib.callbacks.registry.publish').start()
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
# We don't use real models as per mocks above. We also need to mock-out
# methods that work with real data types
mock.patch(
'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
).start()
mock.patch.object(router.FloatingIP, 'from_db_object').start()
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins", ["router", "port_forwarding"])
manager.init()
self.pf_plugin = directory.get_plugin(lib_plugin_conts.PORTFORWARDING)
self.ctxt = context.Context('admin', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_get_floatingip_port_forwarding(self, get_object_mock):
self.pf_plugin.get_floatingip_port_forwarding(
self.ctxt, 'pf_id', 'test-fip-id', fields=None)
get_object_mock.assert_called_once_with(self.ctxt, id='pf_id')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object',
return_value=None)
def test_negative_get_floatingip_port_forwarding(self, get_object_mock):
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.get_floatingip_port_forwarding,
self.ctxt, 'pf_id', 'test-fip-id', fields=None)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
def test_get_floatingip_port_forwardings(self, get_objects_mock):
self.pf_plugin.get_floatingip_port_forwardings(self.ctxt)
get_objects_mock.assert_called_once_with(
self.ctxt, _pager=mock.ANY, floatingip_id=None)
@mock.patch.object(registry, 'notify')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(router.FloatingIP, 'get_object')
def test_delete_floatingip_port_forwarding(
self, fip_get_object_mock, pf_get_objects_mock,
pf_get_object_mock, push_api_mock, registry_notify_mock):
# After delete, not empty resource list
pf_get_objects_mock.return_value = [mock.Mock(id='pf_id'),
mock.Mock(id='pf_id2')]
pf_obj = mock.Mock(id='pf_id', floatingip_id='fip_id')
pf_get_object_mock.return_value = pf_obj
self.pf_plugin.delete_floatingip_port_forwarding(
self.ctxt, 'pf_id', 'fip_id')
pf_get_objects_mock.assert_called_once_with(
self.ctxt, floatingip_id='fip_id')
pf_obj.delete.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
registry_notify_mock.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_DELETE, self.pf_plugin, payload=mock.ANY)
# After delete, empty resource list
pf_get_objects_mock.reset_mock()
pf_get_object_mock.reset_mock()
push_api_mock.reset_mock()
registry_notify_mock.reset_mock()
pf_obj = mock.Mock(id='need_to_delete_pf_id', floatingip_id='fip_id')
fip_obj = mock.Mock(id='fip_id')
fip_get_object_mock.return_value = fip_obj
pf_get_object_mock.return_value = pf_obj
pf_get_objects_mock.return_value = [
mock.Mock(id='need_to_delete_pf_id')]
self.pf_plugin.delete_floatingip_port_forwarding(
self.ctxt, 'need_to_delete_pf_id', 'fip_id')
pf_get_objects_mock.assert_called_once_with(
self.ctxt, floatingip_id='fip_id')
pf_get_object_mock.assert_called_once_with(
self.ctxt, id='need_to_delete_pf_id')
fip_obj.update_fields.assert_called_once_with({'router_id': None})
fip_obj.update.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
registry_notify_mock.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_DELETE, self.pf_plugin, payload=mock.ANY)
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_delete_floatingip_port_forwarding(
self, pf_get_object_mock):
pf_get_object_mock.return_value = None
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.ctxt, 'pf_id', floatingip_id='fip_id')
@mock.patch.object(registry, 'notify')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_update_floatingip_port_forwarding(
self, mock_pf_get_object, mock_rpc_push, mock_registry_notify):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
mock_pf_get_object.return_value = pf_obj
self.pf_plugin.update_floatingip_port_forwarding(
self.ctxt, 'pf_id', **pf_input)
mock_pf_get_object.assert_called_once_with(self.ctxt, id='pf_id')
self.assertTrue(pf_obj.update_fields)
self.assertTrue(pf_obj.update)
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)
mock_registry_notify.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_UPDATE, self.pf_plugin, payload=mock.ANY)
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_update_floatingip_port_forwarding(
self, mock_pf_get_object):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
mock_pf_get_object.return_value = None
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.update_floatingip_port_forwarding,
self.ctxt, 'pf_id', **pf_input)
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_check_port_has_binding_floating_ip')
@mock.patch.object(obj_base.NeutronDbObject, 'update_objects')
@mock.patch.object(registry, 'notify')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(pf_plugin.PortForwardingPlugin, '_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_a_router_for_fip_port_forwarding',
return_value='target_router_id')
@mock.patch.object(router.FloatingIP, 'get_object')
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object, mock_find_router,
mock_check_router_match, mock_push_api, mock_registry_notify,
mock_update_objects, mock_check_bind_fip):
# Update fip
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.router_id = ''
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(mock_update_objects.called)
self.assertTrue(pf_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
mock_registry_notify.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_CREATE, self.pf_plugin, payload=mock.ANY)
# Not update fip
pf_obj.reset_mock()
fip_obj.reset_mock()
mock_port_forwarding.reset_mock()
mock_update_objects.reset_mock()
mock_push_api.reset_mock()
mock_registry_notify.reset_mock()
mock_port_forwarding.return_value = pf_obj
fip_obj.router_id = 'router_id'
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(pf_obj.create.called)
self.assertFalse(mock_update_objects.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
mock_registry_notify.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_CREATE, self.pf_plugin, payload=mock.ANY)
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_check_port_has_binding_floating_ip')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_existing_port_forwarding')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_a_router_for_fip_port_forwarding',
return_value='target_router_id')
@mock.patch.object(router.FloatingIP, 'get_object')
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_negative_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object,
mock_find_router,
mock_check_router_match, mock_try_find_exist,
mock_check_bind_fip):
pf_input = {
'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}}
pf_obj = mock.Mock()
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.fixed_port_id = ''
pf_obj.create.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())
mock_try_find_exist.return_value = ('pf_obj', 'conflict_param')
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin.create_floatingip_port_forwarding,
self.ctxt, 'fip_id', pf_input)
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_get_internal_ip_subnet')
@mock.patch.object(l3_db.L3_NAT_dbonly_mixin, 'get_router_for_floatingip')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_subnet')
def test_negative_find_a_router_for_fip_port_forwarding(
self, mock_get_subnet, mock_get_port, mock_get_router,
mock_get_ip_subnet):
fip_obj = mock.Mock()
pf_dict = {'internal_port_id': 'internal_neutron_port',
'internal_ip_address': '10.0.0.1'}
port_dict = {'id': 'ID', 'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
mock_get_ip_subnet.return_value = None
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
self.assertTrue(not mock_get_subnet.called)
mock_get_ip_subnet.return_value = 'internal_subnet_id'
mock_get_router.side_effect = (
lib_l3_exc.ExternalGatewayForFloatingIPNotFound(
external_network_id=mock.Mock(),
subnet_id=mock.Mock(), port_id=mock.Mock()))
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
self.assertTrue(mock_get_subnet.called)
ipv6_port_dict = {'id': 'ID',
'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "1::1"}]}
mock_get_port.return_value = ipv6_port_dict
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
def test_negative_check_router_match(self, mock_pf_get_objects):
pf_dict = {
'internal_port_id': 'internal_neutron_port',
'internal_ip_address': 'internal_fixed_ip',
'internal_port': 'internal protocol port num'}
fip_obj = mock.Mock()
mock_pf_get_objects.return_value = ['Exist port forwardings']
router_id = 'selected router id'
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin._check_router_match,
self.ctxt, fip_obj, router_id, pf_dict)
mock_pf_get_objects.return_value = []
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin._check_router_match,
self.ctxt, fip_obj, router_id, pf_dict)
@mock.patch.object(router.FloatingIP, 'get_objects')
def test_create_floatingip_port_forwarding_port_in_use(
self, mock_fip_get_objects):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'internal_port_id': 'internal_neutron_port',
'floatingip_id': 'fip_id_1'}},
'floatingip_id': 'fip_id_1'}
fip_obj = mock.Mock(floating_ip_address="10.10.10.10")
mock_fip_get_objects.return_value = [fip_obj]
self.assertRaises(pf_exc.PortHasBindingFloatingIP,
self.pf_plugin.create_floatingip_port_forwarding,
self.ctxt, **pf_input)
@mock.patch.object(router.FloatingIP, 'get_objects')
def test_update_floatingip_port_forwarding_port_in_use(
self, mock_fip_get_objects):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'internal_port_id': 'internal_neutron_port',
'floatingip_id': 'fip_id_2'}}}
fip_obj = mock.Mock(floating_ip_address="10.10.10.11")
mock_fip_get_objects.return_value = [fip_obj]
self.assertRaises(pf_exc.PortHasBindingFloatingIP,
self.pf_plugin.update_floatingip_port_forwarding,
self.ctxt, 'fake-pf-id', 'fip_id_2', **pf_input)
def test_service_plugins_values(self):
exp_default_plugins = ['router']
supported_plugins = ['router', 'ovn-router']
same_as_input = 'same_as_input'
TC = namedtuple('TC', 'input exp_plugins exp_uses_rpc description')
test_cases = [
TC([], exp_default_plugins, True, "default from empty cfg"),
TC(['foo'], exp_default_plugins, True,
"default from unexpected cfg"),
TC(['foo', 123], exp_default_plugins, True,
"default from unexpected cfg"),
TC(['foo', 'router'], exp_default_plugins, True,
"default from valid cfg"),
TC(['router'], same_as_input, True, "valid cfg 1"),
TC(['router'], same_as_input, True, "valid cfg 1"),
TC(['ovn-router'], same_as_input, False, "valid cfg 2"),
TC(['ovn-router', 'router'], supported_plugins, True,
"valid cfg 3"),
TC(['router', 'ovn-router'], supported_plugins, True,
"valid cfg 4"),
TC(['bar', 'router', 'foo'], ['router'], True, "valid cfg 5"),
TC(['bar', 'ovn-router', 'foo'], ['ovn-router'], False,
"valid cfg 6"),
TC(['bar', 'router', 123, 'ovn-router', 'foo', 'kitchen', 'sink'],
supported_plugins, True, "valid cfg 7"),
]
for tc in test_cases:
cfg.CONF.set_override("service_plugins", tc.input)
plugins, rpc_required = pf_plugin._required_service_plugins()
if tc.exp_plugins == same_as_input:
self.assertEqual(
(tc.input, tc.exp_uses_rpc), (plugins, rpc_required),
tc.description)
else:
self.assertEqual(
(tc.exp_plugins, tc.exp_uses_rpc), (plugins, rpc_required),
tc.description)
@mock.patch.object(cfg.ConfigOpts, '__getattr__')
def test_service_plugins_no_such_opt(self, mock_config_opts_get):
description = "test cfg.NoSuchOptError exception"
mock_config_opts_get.side_effect = cfg.NoSuchOptError('test_svc_plug')
plugins, rpc_required = pf_plugin._required_service_plugins()
mock_config_opts_get.assert_called_once()
expected = (['router'], True)
self.assertEqual(
expected, (plugins, rpc_required), description)