Merge "Add router ownership check on vpnservice creation"

This commit is contained in:
Jenkins 2013-09-15 23:10:26 +00:00 committed by Gerrit Code Review
commit 289524bd78
3 changed files with 86 additions and 9 deletions

View File

@ -538,9 +538,28 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
'status': vpnservice['status']}
return self._fields(res, fields)
def _check_router(self, context, router_id):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
l3_plugin.get_router(context, router_id)
def _check_subnet_id(self, context, router_id, subnet_id):
core_plugin = manager.NeutronManager.get_plugin()
ports = core_plugin.get_ports(
context,
filters={
'fixed_ips': {'subnet_id': [subnet_id]},
'device_id': [router_id]})
if not ports:
raise vpnaas.SubnetIsNotConnectedToRouter(
subnet_id=subnet_id,
router_id=router_id)
def create_vpnservice(self, context, vpnservice):
vpns = vpnservice['vpnservice']
tenant_id = self._get_tenant_id_for_create(context, vpns)
self._check_router(context, vpns['router_id'])
self._check_subnet_id(context, vpns['router_id'], vpns['subnet_id'])
with context.session.begin(subtransactions=True):
vpnservice_db = VPNService(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,

View File

@ -81,6 +81,11 @@ class DeviceDriverImportError(qexception.NeutronException):
message = _("Can not load driver :%(device_driver)s")
class SubnetIsNotConnectedToRouter(qexception.BadRequest):
message = _("Subnet %(subnet_id)s is not "
"connected to Router %(router_id)s")
vpn_supported_initiators = ['bi-directional', 'response-only']
vpn_supported_encryption_algorithms = ['3des', 'aes-128',
'aes-192', 'aes-256']

View File

@ -217,15 +217,21 @@ class VPNPluginDbTestCase(test_l3_plugin.L3NatTestCaseMixin,
admin_state_up,
router_id, subnet_id,
expected_res_status=None, **kwargs):
tenant_id = kwargs.get('tenant_id', self._tenant_id)
data = {'vpnservice': {'name': name,
'subnet_id': subnet_id,
'router_id': router_id,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
'tenant_id': tenant_id}}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['vpnservice'][arg] = kwargs[arg]
vpnservice_req = self.new_create_request('vpnservices', data, fmt)
if (kwargs.get('set_context') and
'tenant_id' in kwargs):
# create a specific auth context for this request
vpnservice_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
vpnservice_res = vpnservice_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(vpnservice_res.status_int, expected_res_status)
@ -236,12 +242,19 @@ class VPNPluginDbTestCase(test_l3_plugin.L3NatTestCaseMixin,
subnet=None,
router=None,
admin_state_up=True,
no_delete=False, **kwargs):
no_delete=False,
plug_subnet=True, **kwargs):
if not fmt:
fmt = self.fmt
with test_db_plugin.optional_ctx(subnet, self.subnet) as tmp_subnet:
with test_db_plugin.optional_ctx(router,
self.router) as tmp_router:
if plug_subnet:
self._router_interface_action(
'add',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
res = self._create_vpnservice(fmt,
name,
admin_state_up,
@ -259,6 +272,11 @@ class VPNPluginDbTestCase(test_l3_plugin.L3NatTestCaseMixin,
if not no_delete:
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
if plug_subnet:
self._router_interface_action(
'remove',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
@ -768,7 +786,36 @@ class TestVpnaas(VPNPluginDbTestCase):
vpnservice['vpnservice'].items()
if k in expected),
expected)
return vpnservice
def test_create_vpnservice_with_invalid_router(self):
"""Test case to create a vpnservice with invalid router"""
with self.network(
set_context=True,
tenant_id='tenant_a') as network:
with self.subnet(network=network,
cidr='10.2.0.0/24') as subnet:
with self.router(
set_context=True, tenant_id='tenant_a') as router:
router_id = router['router']['id']
subnet_id = subnet['subnet']['id']
self._create_vpnservice(
self.fmt, 'fake',
True, router_id, subnet_id,
expected_res_status=webob.exc.HTTPNotFound.code,
set_context=True, tenant_id='tenant_b')
def test_create_vpnservice_with_nonconnected_subnet(self):
"""Test case to create a vpnservice with nonconnected subnet."""
with self.network() as network:
with self.subnet(network=network,
cidr='10.2.0.0/24') as subnet:
with self.router() as router:
router_id = router['router']['id']
subnet_id = subnet['subnet']['id']
self._create_vpnservice(
self.fmt, 'fake',
True, router_id, subnet_id,
expected_res_status=webob.exc.HTTPBadRequest.code)
def test_delete_router_in_use_by_vpnservice(self):
"""Test delete router in use by vpn service."""
@ -887,10 +934,12 @@ class TestVpnaas(VPNPluginDbTestCase):
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router),
router=router,
plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router)
router=router,
plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_sort('vpnservice', (vpnservice3,
vpnservice2,
@ -907,10 +956,12 @@ class TestVpnaas(VPNPluginDbTestCase):
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router),
router=router,
plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router)
router=router,
plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination('vpnservice',
(vpnservice1,
@ -928,10 +979,12 @@ class TestVpnaas(VPNPluginDbTestCase):
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
router=router),
router=router,
plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
router=router)
router=router,
plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination_reverse('vpnservice',
(vpnservice1,