vmware-nsx/vmware_nsx/tests/unit/extensions/test_security_group_policy.py

254 lines
11 KiB
Python

# Copyright 2016 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
import webob.exc
from neutron.extensions import securitygroup as ext_sg
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as n_exc
from vmware_nsx.extensions import nsxpolicy
from vmware_nsx.extensions import securitygrouplogging as ext_logging
from vmware_nsx.extensions import securitygrouppolicy as ext_policy
from vmware_nsx.tests.unit.nsx_v import test_plugin
from vmware_nsx.tests.unit.nsx_v.vshield import fake_vcns
PLUGIN_NAME = 'vmware_nsx.plugin.NsxVPlugin'
class SecGroupPolicyExtensionTestCase(
test_plugin.NsxVPluginV2TestCase,
test_securitygroup.SecurityGroupDBTestCase):
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
cfg.CONF.set_override('use_nsx_policies', True, group='nsxv')
cfg.CONF.set_override('default_policy_id', 'policy-1', group='nsxv')
# This feature is enabled only since 6.2
with mock.patch.object(fake_vcns.FakeVcns,
'get_version',
return_value="6.2.3"):
super(SecGroupPolicyExtensionTestCase, self).setUp(
plugin=plugin, ext_mgr=ext_mgr)
self._tenant_id = test_db_base_plugin_v2.TEST_TENANT_ID
# add policy & logging security group attribute
ext_sg.Securitygroup().update_attributes_map(
ext_policy.RESOURCE_ATTRIBUTE_MAP)
ext_sg.Securitygroup().update_attributes_map(
ext_logging.RESOURCE_ATTRIBUTE_MAP)
def _create_secgroup_with_policy(self, policy_id, description=None,
logging=False):
body = {'security_group':
{'name': 'sg-policy',
'tenant_id': self._tenant_id,
'policy': policy_id,
'description': description if description else '',
'logging': logging}}
return self._create_security_group_response(self.fmt, body)
def _get_secgroup_with_policy(self):
policy_id = 'policy-5'
res = self._create_secgroup_with_policy(policy_id)
return self.deserialize(self.fmt, res)
def test_secgroup_create_with_policy(self):
policy_id = 'policy-5'
res = self._create_secgroup_with_policy(policy_id)
sg = self.deserialize(self.fmt, res)
self.assertEqual(policy_id, sg['security_group']['policy'])
self.assertEqual('dummy', sg['security_group']['description'])
def test_secgroup_create_with_policyand_desc(self):
policy_id = 'policy-5'
desc = 'test'
res = self._create_secgroup_with_policy(policy_id, description=desc)
sg = self.deserialize(self.fmt, res)
self.assertEqual(policy_id, sg['security_group']['policy'])
self.assertEqual(desc, sg['security_group']['description'])
def test_secgroup_create_without_policy(self):
res = self._create_secgroup_with_policy(None)
self.assertEqual(400, res.status_int)
def test_secgroup_create_with_illegal_policy(self):
policy_id = 'bad-policy'
with mock.patch(PLUGIN_NAME + '.get_nsx_policy',
side_effect=n_exc.ObjectNotFound(id=policy_id)):
res = self._create_secgroup_with_policy(policy_id)
self.assertEqual(400, res.status_int)
def test_secgroup_create_with_policy_and_logging(self):
# We do not support policy & logging together
policy_id = 'policy-5'
res = self._create_secgroup_with_policy(policy_id, logging=True)
self.assertEqual(400, res.status_int)
def test_secgroup_update_with_policy(self):
# Test that updating the policy is allowed
old_policy = 'policy-5'
new_policy = 'policy-6'
res = self._create_secgroup_with_policy(old_policy)
sg = self.deserialize(self.fmt, res)
data = {'security_group': {'policy': new_policy}}
req = self.new_update_request('security-groups', data,
sg['security_group']['id'])
updated_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(new_policy, updated_sg['security_group']['policy'])
# Verify the same result in 'get'
req = self.new_show_request('security-groups',
sg['security_group']['id'])
shown_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(new_policy, shown_sg['security_group']['policy'])
def test_secgroup_update_no_policy_change(self):
# Test updating without changing the policy
old_policy = 'policy-5'
desc = 'abc'
res = self._create_secgroup_with_policy(old_policy)
sg = self.deserialize(self.fmt, res)
data = {'security_group': {'description': desc}}
req = self.new_update_request('security-groups', data,
sg['security_group']['id'])
updated_sg = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(old_policy, updated_sg['security_group']['policy'])
self.assertEqual(desc, updated_sg['security_group']['description'])
def test_secgroup_update_remove_policy(self):
# removing the policy is not allowed
sg = self._get_secgroup_with_policy()
data = {'security_group': {'policy': None}}
req = self.new_update_request('security-groups', data,
sg['security_group']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
def test_secgroup_update_add_logging(self):
# We do not support policy & logging together
sg = self._get_secgroup_with_policy()
data = {'security_group': {'logging': True}}
req = self.new_update_request('security-groups', data,
sg['security_group']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
def test_non_admin_cannot_delete_policy_sg_and_admin_can(self):
sg = self._get_secgroup_with_policy()
sg_id = sg['security_group']['id']
# Try deleting the request as a normal user returns forbidden
# as a tenant is not allowed to delete this.
ctx = context.Context('', self._tenant_id)
self._delete('security-groups', sg_id,
expected_code=webob.exc.HTTPForbidden.code,
neutron_context=ctx)
# can be deleted though as admin
self._delete('security-groups', sg_id,
expected_code=webob.exc.HTTPNoContent.code)
def test_create_rule(self):
sg = self._get_secgroup_with_policy()
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress',
constants.PROTO_NAME_TCP, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(400, res.status_int)
class SecGroupPolicyExtensionTestCaseWithRules(
SecGroupPolicyExtensionTestCase):
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
cfg.CONF.set_override('allow_tenant_rules_with_policy',
True, group='nsxv')
super(SecGroupPolicyExtensionTestCaseWithRules, self).setUp(
plugin=plugin, ext_mgr=ext_mgr)
def test_secgroup_create_without_policy(self):
# in case allow_tenant_rules_with_policy is True, it is allowed to
# create a regular sg
desc = 'test'
res = self._create_secgroup_with_policy(None, description=desc)
sg = self.deserialize(self.fmt, res)
self.assertIsNone(sg['security_group']['policy'])
self.assertEqual(desc, sg['security_group']['description'])
def test_secgroup_create_without_policy_update_policy(self):
# Create a regular security group. adding the policy later should fail
res = self._create_secgroup_with_policy(None)
sg = self.deserialize(self.fmt, res)
data = {'security_group': {'policy': 'policy-1'}}
req = self.new_update_request('security-groups', data,
sg['security_group']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
def test_secgroup_create_without_policy_and_rule(self):
# Test that regular security groups can have rules
res = self._create_secgroup_with_policy(None)
sg = self.deserialize(self.fmt, res)
self.assertIsNone(sg['security_group']['policy'])
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress',
constants.PROTO_NAME_TCP, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
rule_data = self.deserialize(self.fmt, res)
self.assertEqual(
sg['security_group']['id'],
rule_data['security_group_rule']['security_group_id'])
class NsxPolExtensionManager(object):
def get_resources(self):
return nsxpolicy.Nsxpolicy.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestNsxPolicies(test_plugin.NsxVPluginV2TestCase):
def setUp(self, plugin=None):
super(TestNsxPolicies, self).setUp()
ext_mgr = NsxPolExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def test_get_policy(self):
id = 'policy-1'
req = self.new_show_request('nsx-policies', id)
res = self.deserialize(
self.fmt, req.get_response(self.ext_api)
)
policy = res['nsx_policy']
self.assertEqual(id, policy['id'])
def test_list_policies(self):
req = self.new_list_request('nsx-policies')
res = self.deserialize(
self.fmt, req.get_response(self.ext_api)
)
self.assertIn('nsx_policies', res)
# the fake_vcns api returns 3 policies
self.assertEqual(3, len(res['nsx_policies']))