From 39bcf6f02db0dc99fe41d97e94fe2f6543a440e4 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Tue, 26 Nov 2019 16:48:20 +0000 Subject: [PATCH] nova-net: Drop nova-network-base security group tests A future change will drop the actual security group driver. For now, just focus on merging the two disparate test cases, which is easier said than done. Change-Id: Idd0b741760601aaccca4adfc5101c3539ba9ec09 Signed-off-by: Stephen Finucane --- .../compute/test_neutron_security_groups.py | 849 --------------- .../openstack/compute/test_security_groups.py | 971 +++++++++++++----- 2 files changed, 695 insertions(+), 1125 deletions(-) delete mode 100644 nova/tests/unit/api/openstack/compute/test_neutron_security_groups.py diff --git a/nova/tests/unit/api/openstack/compute/test_neutron_security_groups.py b/nova/tests/unit/api/openstack/compute/test_neutron_security_groups.py deleted file mode 100644 index af0b2bcd2d39..000000000000 --- a/nova/tests/unit/api/openstack/compute/test_neutron_security_groups.py +++ /dev/null @@ -1,849 +0,0 @@ -# Copyright 2013 Nicira, Inc. -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import six - -import mock -from neutronclient.common import exceptions as n_exc -from oslo_config import cfg -from oslo_serialization import jsonutils -from oslo_utils import encodeutils -from oslo_utils.fixture import uuidsentinel as uuids -from oslo_utils import uuidutils -import webob - -from nova.api.openstack.compute import security_groups -from nova import context -import nova.db.api -from nova import exception -from nova.network import model -from nova.network.neutronv2 import api as neutron_api -from nova.objects import instance as instance_obj -from nova import test -from nova.tests.unit.api.openstack.compute import test_security_groups -from nova.tests.unit.api.openstack import fakes - - -UUID_SERVER = uuids.server - - -class TestNeutronSecurityGroupsV21(test_security_groups.TestSecurityGroupsV21): - # Used to override set config in the base test in test_security_groups. - use_neutron = True - - def setUp(self): - super(TestNeutronSecurityGroupsV21, self).setUp() - cfg.CONF.set_override('use_neutron', True) - self.original_client = neutron_api.get_client - neutron_api.get_client = get_client - - def tearDown(self): - neutron_api.get_client = self.original_client - get_client()._reset() - super(TestNeutronSecurityGroupsV21, self).tearDown() - - def _create_sg_template(self, **kwargs): - sg = test_security_groups.security_group_request_template(**kwargs) - return self.controller.create(self.req, body={'security_group': sg}) - - def _create_network(self): - body = {'network': {'name': 'net1'}} - neutron = get_client() - net = neutron.create_network(body) - body = {'subnet': {'network_id': net['network']['id'], - 'cidr': '10.0.0.0/24'}} - neutron.create_subnet(body) - return net - - def _create_port(self, **kwargs): - body = {'port': {'binding:vnic_type': model.VNIC_TYPE_NORMAL}} - fields = ['security_groups', 'device_id', 'network_id', - 'port_security_enabled', 'ip_allocation'] - for field in fields: - if field in kwargs: - body['port'][field] = kwargs[field] - neutron = get_client() - return neutron.create_port(body) - - def _create_security_group(self, **kwargs): - body = {'security_group': {}} - fields = ['name', 'description'] - for field in fields: - if field in kwargs: - body['security_group'][field] = kwargs[field] - neutron = get_client() - return neutron.create_security_group(body) - - def test_get_security_group_list(self): - self._create_sg_template().get('security_group') - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) - list_dict = self.controller.index(req) - self.assertEqual(len(list_dict['security_groups']), 2) - - def test_get_security_group_list_offset_and_limit(self): - path = ('/v2/%s/os-security-groups?offset=1&limit=1' % - fakes.FAKE_PROJECT_ID) - self._create_sg_template().get('security_group') - req = fakes.HTTPRequest.blank(path) - list_dict = self.controller.index(req) - self.assertEqual(len(list_dict['security_groups']), 1) - - def test_get_security_group_by_instance(self): - sg = self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg['id']], - device_id=test_security_groups.UUID_SERVER) - expected = [{'rules': [], 'tenant_id': fakes.FAKE_PROJECT_ID, - 'id': sg['id'], 'name': 'test', - 'description': 'test-description'}] - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/os-security-groups' - % (fakes.FAKE_PROJECT_ID, test_security_groups.UUID_SERVER)) - res_dict = self.server_controller.index( - req, test_security_groups.UUID_SERVER)['security_groups'] - self.assertEqual(expected, res_dict) - - def test_get_security_group_by_id(self): - sg = self._create_sg_template().get('security_group') - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups/%s' - % (fakes.FAKE_PROJECT_ID, sg['id'])) - res_dict = self.controller.show(req, sg['id']) - expected = {'security_group': sg} - self.assertEqual(res_dict, expected) - - def test_delete_security_group_by_id(self): - sg = self._create_sg_template().get('security_group') - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups/%s' % - (fakes.FAKE_PROJECT_ID, sg['id'])) - self.controller.delete(req, sg['id']) - - def test_delete_security_group_by_admin(self): - sg = self._create_sg_template().get('security_group') - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups/%s' % - (fakes.FAKE_PROJECT_ID, sg['id']), use_admin_context=True) - self.controller.delete(req, sg['id']) - - @mock.patch('nova.compute.utils.refresh_info_cache_for_instance') - def test_delete_security_group_in_use(self, refresh_info_cache_mock): - sg = self._create_sg_template().get('security_group') - self._create_network() - db_inst = fakes.stub_instance(id=1, nw_cache=[], security_groups=[]) - _context = context.get_admin_context() - instance = instance_obj.Instance._from_db_object( - _context, instance_obj.Instance(), db_inst, - expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS) - neutron = neutron_api.API() - with mock.patch.object(nova.db.api, 'instance_get_by_uuid', - return_value=db_inst): - neutron.allocate_for_instance(_context, instance, False, None, - security_groups=[sg['id']]) - - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups/%s' - % (fakes.FAKE_PROJECT_ID, sg['id'])) - self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, - req, sg['id']) - - def test_associate(self): - sg = self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg['id']], - device_id=UUID_SERVER) - - body = dict(addSecurityGroup=dict(name="test")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.manager._addSecurityGroup(req, UUID_SERVER, body) - - def test_associate_duplicate_names(self): - sg1 = self._create_security_group(name='sg1', - description='sg1')['security_group'] - self._create_security_group(name='sg1', - description='sg1')['security_group'] - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg1['id']], - device_id=UUID_SERVER) - - body = dict(addSecurityGroup=dict(name="sg1")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.assertRaises(webob.exc.HTTPConflict, - self.manager._addSecurityGroup, - req, UUID_SERVER, body) - - def test_associate_port_security_enabled_true(self): - sg = self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg['id']], - port_security_enabled=True, - device_id=UUID_SERVER) - - body = dict(addSecurityGroup=dict(name="test")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.manager._addSecurityGroup(req, UUID_SERVER, body) - - def test_associate_port_security_enabled_false(self): - self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], port_security_enabled=False, - device_id=UUID_SERVER) - - body = dict(addSecurityGroup=dict(name="test")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.assertRaises(webob.exc.HTTPBadRequest, - self.manager._addSecurityGroup, - req, UUID_SERVER, body) - - def test_associate_deferred_ip_port(self): - sg = self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg['id']], - port_security_enabled=True, ip_allocation='deferred', - device_id=UUID_SERVER) - - body = dict(addSecurityGroup=dict(name="test")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.manager._addSecurityGroup(req, UUID_SERVER, body) - - def test_disassociate_by_non_existing_security_group_name(self): - body = dict(removeSecurityGroup=dict(name='non-existing')) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.assertRaises(webob.exc.HTTPNotFound, - self.manager._removeSecurityGroup, - req, UUID_SERVER, body) - - def test_disassociate(self): - sg = self._create_sg_template().get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg['id']], - device_id=UUID_SERVER) - - body = dict(removeSecurityGroup=dict(name="test")) - - req = fakes.HTTPRequest.blank( - '/v2/%s/servers/%s/action' % - (fakes.FAKE_PROJECT_ID, UUID_SERVER)) - self.manager._removeSecurityGroup(req, UUID_SERVER, body) - - def test_get_instances_security_groups_bindings(self): - servers = [{'id': test_security_groups.FAKE_UUID1}, - {'id': test_security_groups.FAKE_UUID2}] - sg1 = self._create_sg_template(name='test1').get('security_group') - sg2 = self._create_sg_template(name='test2').get('security_group') - # test name='' is replaced with id - sg3 = self._create_sg_template(name='').get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg1['id'], - sg2['id']], - device_id=test_security_groups.FAKE_UUID1) - self._create_port( - network_id=net['network']['id'], security_groups=[sg2['id'], - sg3['id']], - device_id=test_security_groups.FAKE_UUID2) - expected = {test_security_groups.FAKE_UUID1: [{'name': sg1['name']}, - {'name': sg2['name']}], - test_security_groups.FAKE_UUID2: [{'name': sg2['name']}, - {'name': sg3['id']}]} - security_group_api = self.controller.security_group_api - bindings = ( - security_group_api.get_instances_security_groups_bindings( - context.get_admin_context(), servers)) - self.assertEqual(bindings, expected) - - def test_get_instance_security_groups(self): - sg1 = self._create_sg_template(name='test1').get('security_group') - sg2 = self._create_sg_template(name='test2').get('security_group') - # test name='' is replaced with id - sg3 = self._create_sg_template(name='').get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg1['id'], - sg2['id'], - sg3['id']], - device_id=test_security_groups.FAKE_UUID1) - - expected = [{'name': sg1['name']}, {'name': sg2['name']}, - {'name': sg3['id']}] - security_group_api = self.controller.security_group_api - sgs = security_group_api.get_instance_security_groups( - context.get_admin_context(), - instance_obj.Instance(uuid=test_security_groups.FAKE_UUID1)) - self.assertEqual(sgs, expected) - - @mock.patch('nova.network.security_group.neutron_driver.SecurityGroupAPI.' - 'get_instances_security_groups_bindings') - def test_get_security_group_empty_for_instance(self, neutron_sg_bind_mock): - servers = [{'id': test_security_groups.FAKE_UUID1}] - neutron_sg_bind_mock.return_value = {} - - security_group_api = self.controller.security_group_api - ctx = context.get_admin_context() - sgs = security_group_api.get_instance_security_groups(ctx, - instance_obj.Instance(uuid=test_security_groups.FAKE_UUID1)) - - neutron_sg_bind_mock.assert_called_once_with(ctx, servers, False) - self.assertEqual([], sgs) - - def test_create_port_with_sg_and_port_security_enabled_true(self): - sg1 = self._create_sg_template(name='test1').get('security_group') - net = self._create_network() - self._create_port( - network_id=net['network']['id'], security_groups=[sg1['id']], - port_security_enabled=True, - device_id=test_security_groups.FAKE_UUID1) - security_group_api = self.controller.security_group_api - sgs = security_group_api.get_instance_security_groups( - context.get_admin_context(), - instance_obj.Instance(uuid=test_security_groups.FAKE_UUID1)) - self.assertEqual(sgs, [{'name': 'test1'}]) - - def test_create_port_with_sg_and_port_security_enabled_false(self): - sg1 = self._create_sg_template(name='test1').get('security_group') - net = self._create_network() - self.assertRaises(exception.SecurityGroupCannotBeApplied, - self._create_port, - network_id=net['network']['id'], - security_groups=[sg1['id']], - port_security_enabled=False, - device_id=test_security_groups.FAKE_UUID1) - - -class TestNeutronSecurityGroupRulesV21( - test_security_groups.TestSecurityGroupRulesV21): - # Used to override set config in the base test in test_security_groups. - use_neutron = True - - def setUp(self): - super(TestNeutronSecurityGroupRulesV21, self).setUp() - - cfg.CONF.set_override('use_neutron', True) - self.original_client = neutron_api.get_client - neutron_api.get_client = get_client - - id1 = '11111111-1111-1111-1111-111111111111' - sg_template1 = test_security_groups.security_group_template( - security_group_rules=[], id=id1) - id2 = '22222222-2222-2222-2222-222222222222' - sg_template2 = test_security_groups.security_group_template( - security_group_rules=[], id=id2) - self.controller_sg = security_groups.SecurityGroupController() - neutron = get_client() - neutron._fake_security_groups[id1] = sg_template1 - neutron._fake_security_groups[id2] = sg_template2 - - def tearDown(self): - neutron_api.get_client = self.original_client - get_client()._reset() - super(TestNeutronSecurityGroupRulesV21, self).tearDown() - - def test_create_add_existing_rules_by_cidr(self): - sg = test_security_groups.security_group_template() - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) - self.controller_sg.create(req, {'security_group': sg}) - rule = test_security_groups.security_group_rule_template( - cidr='15.0.0.0/8', parent_group_id=self.sg2['id']) - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) - self.controller.create(req, {'security_group_rule': rule}) - self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, - req, {'security_group_rule': rule}) - - def test_create_add_existing_rules_by_group_id(self): - sg = test_security_groups.security_group_template() - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) - self.controller_sg.create(req, {'security_group': sg}) - rule = test_security_groups.security_group_rule_template( - group=self.sg1['id'], parent_group_id=self.sg2['id']) - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) - self.controller.create(req, {'security_group_rule': rule}) - self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, - req, {'security_group_rule': rule}) - - def test_delete(self): - rule = test_security_groups.security_group_rule_template( - parent_group_id=self.sg2['id']) - - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) - res_dict = self.controller.create(req, {'security_group_rule': rule}) - security_group_rule = res_dict['security_group_rule'] - req = fakes.HTTPRequest.blank('/v2/%s/os-security-group-rules/%s' % ( - fakes.FAKE_PROJECT_ID, security_group_rule['id'])) - self.controller.delete(req, security_group_rule['id']) - - -class TestNeutronSecurityGroupsOutputTest(test.TestCase): - content_type = 'application/json' - - def setUp(self): - super(TestNeutronSecurityGroupsOutputTest, self).setUp() - - cfg.CONF.set_override('use_neutron', True) - self.original_client = neutron_api.get_client - neutron_api.get_client = get_client - - fakes.stub_out_nw_api(self) - self.controller = security_groups.SecurityGroupController() - self.stub_out('nova.compute.api.API.get', - test_security_groups.fake_compute_get) - self.stub_out('nova.compute.api.API.get_all', - test_security_groups.fake_compute_get_all) - self.stub_out('nova.compute.api.API.create', - test_security_groups.fake_compute_create) - self.stub_out( - 'nova.network.security_group.neutron_driver.SecurityGroupAPI.' - 'get_instances_security_groups_bindings', - self._fake_get_instances_security_groups_bindings) - - def tearDown(self): - neutron_api.get_client = self.original_client - get_client()._reset() - super(TestNeutronSecurityGroupsOutputTest, self).tearDown() - - def _fake_get_instances_security_groups_bindings(self, inst, context, - servers): - groups = { - '00000000-0000-0000-0000-000000000001': [{'name': 'fake-0-0'}, - {'name': 'fake-0-1'}], - '00000000-0000-0000-0000-000000000002': [{'name': 'fake-1-0'}, - {'name': 'fake-1-1'}], - '00000000-0000-0000-0000-000000000003': [{'name': 'fake-2-0'}, - {'name': 'fake-2-1'}]} - result = {} - for server in servers: - result[server['id']] = groups.get(server['id']) - return result - - def _make_request(self, url, body=None): - req = fakes.HTTPRequest.blank(url) - if body: - req.method = 'POST' - req.body = encodeutils.safe_encode(self._encode_body(body)) - req.content_type = self.content_type - req.headers['Accept'] = self.content_type - - # NOTE: This 'os-security-groups' is for enabling security_groups - # attribute on response body. - res = req.get_response(fakes.wsgi_app_v21()) - return res - - def _encode_body(self, body): - return jsonutils.dumps(body) - - def _get_server(self, body): - return jsonutils.loads(body).get('server') - - def _get_servers(self, body): - return jsonutils.loads(body).get('servers') - - def _get_groups(self, server): - return server.get('security_groups') - - def test_create(self): - url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID - image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) - security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}] - for security_group in security_groups: - sg = test_security_groups.security_group_template( - name=security_group['name']) - self.controller.create(req, {'security_group': sg}) - - server = dict(name='server_test', imageRef=image_uuid, flavorRef=2, - security_groups=security_groups) - res = self._make_request(url, {'server': server}) - self.assertEqual(res.status_int, 202) - server = self._get_server(res.body) - for i, group in enumerate(self._get_groups(server)): - name = 'fake-2-%s' % i - self.assertEqual(group.get('name'), name) - - def test_create_server_get_default_security_group(self): - url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID - image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' - server = dict(name='server_test', imageRef=image_uuid, flavorRef=2) - res = self._make_request(url, {'server': server}) - self.assertEqual(res.status_int, 202) - server = self._get_server(res.body) - group = self._get_groups(server)[0] - self.assertEqual(group.get('name'), 'default') - - def test_show(self): - self.stub_out('nova.network.security_group.neutron_driver.' - 'SecurityGroupAPI.get_instance_security_groups', - lambda self, inst, context, id: - [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]) - - url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID - image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' - req = fakes.HTTPRequest.blank( - '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) - security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}] - for security_group in security_groups: - sg = test_security_groups.security_group_template( - name=security_group['name']) - self.controller.create(req, {'security_group': sg}) - server = dict(name='server_test', imageRef=image_uuid, flavorRef=2, - security_groups=security_groups) - - res = self._make_request(url, {'server': server}) - self.assertEqual(res.status_int, 202) - server = self._get_server(res.body) - for i, group in enumerate(self._get_groups(server)): - name = 'fake-2-%s' % i - self.assertEqual(group.get('name'), name) - - # Test that show (GET) returns the same information as create (POST) - url = ('/v2/%s/servers/%s' % (fakes.FAKE_PROJECT_ID, - test_security_groups.UUID3)) - res = self._make_request(url) - self.assertEqual(res.status_int, 200) - server = self._get_server(res.body) - - for i, group in enumerate(self._get_groups(server)): - name = 'fake-2-%s' % i - self.assertEqual(group.get('name'), name) - - def test_detail(self): - url = '/v2/%s/servers/detail' % fakes.FAKE_PROJECT_ID - res = self._make_request(url) - - self.assertEqual(res.status_int, 200) - for i, server in enumerate(self._get_servers(res.body)): - for j, group in enumerate(self._get_groups(server)): - name = 'fake-%s-%s' % (i, j) - self.assertEqual(group.get('name'), name) - - @mock.patch('nova.compute.api.API.get', - side_effect=exception.InstanceNotFound(instance_id='fake')) - def test_no_instance_passthrough_404(self, mock_get): - url = ('/v2/%s/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115' % - fakes.FAKE_PROJECT_ID) - res = self._make_request(url) - - self.assertEqual(res.status_int, 404) - - -def get_client(context=None, admin=False): - return MockClient() - - -class MockClient(object): - - # Needs to be global to survive multiple calls to get_client. - _fake_security_groups = {} - _fake_ports = {} - _fake_networks = {} - _fake_subnets = {} - _fake_security_group_rules = {} - - def __init__(self): - # add default security group - if not len(self._fake_security_groups): - ret = {'name': 'default', 'description': 'default', - 'tenant_id': fakes.FAKE_PROJECT_ID, - 'security_group_rules': [], - 'id': uuidutils.generate_uuid()} - self._fake_security_groups[ret['id']] = ret - - def _reset(self): - self._fake_security_groups.clear() - self._fake_ports.clear() - self._fake_networks.clear() - self._fake_subnets.clear() - self._fake_security_group_rules.clear() - - def create_security_group(self, body=None): - s = body.get('security_group') - if not isinstance(s.get('name', ''), six.string_types): - msg = ('BadRequest: Invalid input for name. Reason: ' - 'None is not a valid string.') - raise n_exc.BadRequest(message=msg) - if not isinstance(s.get('description.', ''), six.string_types): - msg = ('BadRequest: Invalid input for description. Reason: ' - 'None is not a valid string.') - raise n_exc.BadRequest(message=msg) - if len(s.get('name')) > 255 or len(s.get('description')) > 255: - msg = 'Security Group name great than 255' - raise n_exc.NeutronClientException(message=msg, status_code=401) - ret = {'name': s.get('name'), 'description': s.get('description'), - 'tenant_id': fakes.FAKE_PROJECT_ID, 'security_group_rules': [], - 'id': uuidutils.generate_uuid()} - - self._fake_security_groups[ret['id']] = ret - return {'security_group': ret} - - def create_network(self, body): - n = body.get('network') - ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'), - 'admin_state_up': n.get('admin_state_up', True), - 'tenant_id': fakes.FAKE_PROJECT_ID, - 'id': uuidutils.generate_uuid()} - if 'port_security_enabled' in n: - ret['port_security_enabled'] = n['port_security_enabled'] - self._fake_networks[ret['id']] = ret - return {'network': ret} - - def create_subnet(self, body): - s = body.get('subnet') - try: - net = self._fake_networks[s.get('network_id')] - except KeyError: - msg = 'Network %s not found' % s.get('network_id') - raise n_exc.NeutronClientException(message=msg, status_code=404) - ret = {'name': s.get('name'), 'network_id': s.get('network_id'), - 'tenant_id': fakes.FAKE_PROJECT_ID, 'cidr': s.get('cidr'), - 'id': uuidutils.generate_uuid(), 'gateway_ip': '10.0.0.1'} - net['subnets'].append(ret['id']) - self._fake_networks[net['id']] = net - self._fake_subnets[ret['id']] = ret - return {'subnet': ret} - - def create_port(self, body): - p = body.get('port') - ret = {'status': 'ACTIVE', 'id': uuidutils.generate_uuid(), - 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'), - 'device_id': p.get('device_id', uuidutils.generate_uuid()), - 'admin_state_up': p.get('admin_state_up', True), - 'security_groups': p.get('security_groups', []), - 'network_id': p.get('network_id'), - 'ip_allocation': p.get('ip_allocation'), - 'binding:vnic_type': - p.get('binding:vnic_type') or model.VNIC_TYPE_NORMAL} - - network = self._fake_networks[p['network_id']] - if 'port_security_enabled' in p: - ret['port_security_enabled'] = p['port_security_enabled'] - elif 'port_security_enabled' in network: - ret['port_security_enabled'] = network['port_security_enabled'] - - port_security = ret.get('port_security_enabled', True) - # port_security must be True if security groups are present - if not port_security and ret['security_groups']: - raise exception.SecurityGroupCannotBeApplied() - - if network['subnets'] and p.get('ip_allocation') != 'deferred': - ret['fixed_ips'] = [{'subnet_id': network['subnets'][0], - 'ip_address': '10.0.0.1'}] - if not ret['security_groups'] and (port_security is None or - port_security is True): - for security_group in self._fake_security_groups.values(): - if security_group['name'] == 'default': - ret['security_groups'] = [security_group['id']] - break - self._fake_ports[ret['id']] = ret - return {'port': ret} - - def create_security_group_rule(self, body): - # does not handle bulk case so just picks rule[0] - r = body.get('security_group_rules')[0] - fields = ['direction', 'protocol', 'port_range_min', 'port_range_max', - 'ethertype', 'remote_ip_prefix', 'tenant_id', - 'security_group_id', 'remote_group_id'] - ret = {} - for field in fields: - ret[field] = r.get(field) - ret['id'] = uuidutils.generate_uuid() - self._fake_security_group_rules[ret['id']] = ret - return {'security_group_rules': [ret]} - - def show_security_group(self, security_group, **_params): - try: - sg = self._fake_security_groups[security_group] - except KeyError: - msg = 'Security Group %s not found' % security_group - raise n_exc.NeutronClientException(message=msg, status_code=404) - for security_group_rule in self._fake_security_group_rules.values(): - if security_group_rule['security_group_id'] == sg['id']: - sg['security_group_rules'].append(security_group_rule) - - return {'security_group': sg} - - def show_security_group_rule(self, security_group_rule, **_params): - try: - return {'security_group_rule': - self._fake_security_group_rules[security_group_rule]} - except KeyError: - msg = 'Security Group rule %s not found' % security_group_rule - raise n_exc.NeutronClientException(message=msg, status_code=404) - - def show_network(self, network, **_params): - try: - return {'network': - self._fake_networks[network]} - except KeyError: - msg = 'Network %s not found' % network - raise n_exc.NeutronClientException(message=msg, status_code=404) - - def show_port(self, port, **_params): - try: - return {'port': - self._fake_ports[port]} - except KeyError: - msg = 'Port %s not found' % port - raise n_exc.NeutronClientException(message=msg, status_code=404) - - def show_subnet(self, subnet, **_params): - try: - return {'subnet': - self._fake_subnets[subnet]} - except KeyError: - msg = 'Port %s not found' % subnet - raise n_exc.NeutronClientException(message=msg, status_code=404) - - def list_security_groups(self, **_params): - ret = [] - for security_group in self._fake_security_groups.values(): - names = _params.get('name') - if names: - if not isinstance(names, list): - names = [names] - for name in names: - if security_group.get('name') == name: - ret.append(security_group) - ids = _params.get('id') - if ids: - if not isinstance(ids, list): - ids = [ids] - for id in ids: - if security_group.get('id') == id: - ret.append(security_group) - elif not (names or ids): - ret.append(security_group) - return {'security_groups': ret} - - def list_networks(self, **_params): - # neutronv2/api.py _get_available_networks calls this assuming - # search_opts filter "shared" is implemented and not ignored - shared = _params.get("shared", None) - if shared: - return {'networks': []} - else: - return {'networks': - [network for network in self._fake_networks.values()]} - - def list_ports(self, **_params): - ret = [] - device_id = _params.get('device_id') - for port in self._fake_ports.values(): - if device_id: - if port['device_id'] in device_id: - ret.append(port) - else: - ret.append(port) - return {'ports': ret} - - def list_subnets(self, **_params): - return {'subnets': - [subnet for subnet in self._fake_subnets.values()]} - - def list_floatingips(self, **_params): - return {'floatingips': []} - - def delete_security_group(self, security_group): - self.show_security_group(security_group) - ports = self.list_ports() - for port in ports.get('ports'): - for sg_port in port['security_groups']: - if sg_port == security_group: - msg = ('Unable to delete Security group %s in use' - % security_group) - raise n_exc.NeutronClientException(message=msg, - status_code=409) - del self._fake_security_groups[security_group] - - def delete_security_group_rule(self, security_group_rule): - self.show_security_group_rule(security_group_rule) - del self._fake_security_group_rules[security_group_rule] - - def delete_network(self, network): - self.show_network(network) - self._check_ports_on_network(network) - for subnet in self._fake_subnets.values(): - if subnet['network_id'] == network: - del self._fake_subnets[subnet['id']] - del self._fake_networks[network] - - def delete_port(self, port): - self.show_port(port) - del self._fake_ports[port] - - def update_port(self, port, body=None): - self.show_port(port) - self._fake_ports[port].update(body['port']) - return {'port': self._fake_ports[port]} - - def list_extensions(self, **_parms): - return {'extensions': []} - - def _check_ports_on_network(self, network): - ports = self.list_ports() - for port in ports: - if port['network_id'] == network: - msg = ('Unable to complete operation on network %s. There is ' - 'one or more ports still in use on the network' - % network) - raise n_exc.NeutronClientException(message=msg, status_code=409) - - def find_resource(self, resource, name_or_id, project_id=None, - cmd_resource=None, parent_id=None, fields=None): - if resource == 'security_group': - # lookup first by unique id - sg = self._fake_security_groups.get(name_or_id) - if sg: - return sg - # lookup by name, raise an exception on duplicates - res = None - for sg in self._fake_security_groups.values(): - if sg['name'] == name_or_id: - if res: - raise n_exc.NeutronClientNoUniqueMatch( - resource=resource, name=name_or_id) - res = sg - if res: - return res - raise n_exc.NotFound("Fake %s '%s' not found." % - (resource, name_or_id)) diff --git a/nova/tests/unit/api/openstack/compute/test_security_groups.py b/nova/tests/unit/api/openstack/compute/test_security_groups.py index 10c5141b0a74..f8529d8600f6 100644 --- a/nova/tests/unit/api/openstack/compute/test_security_groups.py +++ b/nova/tests/unit/api/openstack/compute/test_security_groups.py @@ -14,19 +14,23 @@ # under the License. import mock +from neutronclient.common import exceptions as n_exc from oslo_config import cfg from oslo_serialization import jsonutils from oslo_utils import encodeutils from oslo_utils.fixture import uuidsentinel as uuids +from oslo_utils import uuidutils +import six import webob -from nova.api.openstack.compute import security_groups as \ - secgroups_v21 -from nova import compute +from nova.api.openstack.compute import security_groups as secgroups_v21 from nova import context as context_maker import nova.db.api from nova import exception +from nova.network import model +from nova.network.neutronv2 import api as neutron_api from nova import objects +from nova.objects import instance as instance_obj from nova import test from nova.tests.unit.api.openstack import fakes @@ -90,22 +94,296 @@ def return_security_group_by_name(context, project_id, group_name, "instances": [{'id': 1, 'uuid': UUID_SERVER}]} -def return_security_group_without_instances(context, project_id, group_name): - return {'id': 1, 'name': group_name} +class MockClient(object): + + # Needs to be global to survive multiple calls to get_client. + _fake_security_groups = {} + _fake_ports = {} + _fake_networks = {} + _fake_subnets = {} + _fake_security_group_rules = {} + + def __init__(self): + # add default security group + if not len(self._fake_security_groups): + ret = {'name': 'default', 'description': 'default', + 'tenant_id': fakes.FAKE_PROJECT_ID, + 'security_group_rules': [], + 'id': uuidutils.generate_uuid()} + self._fake_security_groups[ret['id']] = ret + + def _reset(self): + self._fake_security_groups.clear() + self._fake_ports.clear() + self._fake_networks.clear() + self._fake_subnets.clear() + self._fake_security_group_rules.clear() + + def create_security_group(self, body=None): + s = body.get('security_group') + if not isinstance(s.get('name', ''), six.string_types): + msg = ('BadRequest: Invalid input for name. Reason: ' + 'None is not a valid string.') + raise n_exc.BadRequest(message=msg) + if not isinstance(s.get('description.', ''), six.string_types): + msg = ('BadRequest: Invalid input for description. Reason: ' + 'None is not a valid string.') + raise n_exc.BadRequest(message=msg) + if len(s.get('name')) > 255 or len(s.get('description')) > 255: + msg = 'Security Group name great than 255' + raise n_exc.NeutronClientException(message=msg, status_code=401) + ret = {'name': s.get('name'), 'description': s.get('description'), + 'tenant_id': fakes.FAKE_PROJECT_ID, 'security_group_rules': [], + 'id': uuidutils.generate_uuid()} + + self._fake_security_groups[ret['id']] = ret + return {'security_group': ret} + + def create_network(self, body): + n = body.get('network') + ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'), + 'admin_state_up': n.get('admin_state_up', True), + 'tenant_id': fakes.FAKE_PROJECT_ID, + 'id': uuidutils.generate_uuid()} + if 'port_security_enabled' in n: + ret['port_security_enabled'] = n['port_security_enabled'] + self._fake_networks[ret['id']] = ret + return {'network': ret} + + def create_subnet(self, body): + s = body.get('subnet') + try: + net = self._fake_networks[s.get('network_id')] + except KeyError: + msg = 'Network %s not found' % s.get('network_id') + raise n_exc.NeutronClientException(message=msg, status_code=404) + ret = {'name': s.get('name'), 'network_id': s.get('network_id'), + 'tenant_id': fakes.FAKE_PROJECT_ID, 'cidr': s.get('cidr'), + 'id': uuidutils.generate_uuid(), 'gateway_ip': '10.0.0.1'} + net['subnets'].append(ret['id']) + self._fake_networks[net['id']] = net + self._fake_subnets[ret['id']] = ret + return {'subnet': ret} + + def create_port(self, body): + p = body.get('port') + ret = {'status': 'ACTIVE', 'id': uuidutils.generate_uuid(), + 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'), + 'device_id': p.get('device_id', uuidutils.generate_uuid()), + 'admin_state_up': p.get('admin_state_up', True), + 'security_groups': p.get('security_groups', []), + 'network_id': p.get('network_id'), + 'ip_allocation': p.get('ip_allocation'), + 'binding:vnic_type': + p.get('binding:vnic_type') or model.VNIC_TYPE_NORMAL} + + network = self._fake_networks[p['network_id']] + if 'port_security_enabled' in p: + ret['port_security_enabled'] = p['port_security_enabled'] + elif 'port_security_enabled' in network: + ret['port_security_enabled'] = network['port_security_enabled'] + + port_security = ret.get('port_security_enabled', True) + # port_security must be True if security groups are present + if not port_security and ret['security_groups']: + raise exception.SecurityGroupCannotBeApplied() + + if network['subnets'] and p.get('ip_allocation') != 'deferred': + ret['fixed_ips'] = [{'subnet_id': network['subnets'][0], + 'ip_address': '10.0.0.1'}] + if not ret['security_groups'] and (port_security is None or + port_security is True): + for security_group in self._fake_security_groups.values(): + if security_group['name'] == 'default': + ret['security_groups'] = [security_group['id']] + break + self._fake_ports[ret['id']] = ret + return {'port': ret} + + def create_security_group_rule(self, body): + # does not handle bulk case so just picks rule[0] + r = body.get('security_group_rules')[0] + fields = ['direction', 'protocol', 'port_range_min', 'port_range_max', + 'ethertype', 'remote_ip_prefix', 'tenant_id', + 'security_group_id', 'remote_group_id'] + ret = {} + for field in fields: + ret[field] = r.get(field) + ret['id'] = uuidutils.generate_uuid() + self._fake_security_group_rules[ret['id']] = ret + return {'security_group_rules': [ret]} + + def show_security_group(self, security_group, **_params): + try: + sg = self._fake_security_groups[security_group] + except KeyError: + msg = 'Security Group %s not found' % security_group + raise n_exc.NeutronClientException(message=msg, status_code=404) + for security_group_rule in self._fake_security_group_rules.values(): + if security_group_rule['security_group_id'] == sg['id']: + sg['security_group_rules'].append(security_group_rule) + + return {'security_group': sg} + + def show_security_group_rule(self, security_group_rule, **_params): + try: + return {'security_group_rule': + self._fake_security_group_rules[security_group_rule]} + except KeyError: + msg = 'Security Group rule %s not found' % security_group_rule + raise n_exc.NeutronClientException(message=msg, status_code=404) + + def show_network(self, network, **_params): + try: + return {'network': + self._fake_networks[network]} + except KeyError: + msg = 'Network %s not found' % network + raise n_exc.NeutronClientException(message=msg, status_code=404) + + def show_port(self, port, **_params): + try: + return {'port': + self._fake_ports[port]} + except KeyError: + msg = 'Port %s not found' % port + raise n_exc.NeutronClientException(message=msg, status_code=404) + + def show_subnet(self, subnet, **_params): + try: + return {'subnet': + self._fake_subnets[subnet]} + except KeyError: + msg = 'Port %s not found' % subnet + raise n_exc.NeutronClientException(message=msg, status_code=404) + + def list_security_groups(self, **_params): + ret = [] + for security_group in self._fake_security_groups.values(): + names = _params.get('name') + if names: + if not isinstance(names, list): + names = [names] + for name in names: + if security_group.get('name') == name: + ret.append(security_group) + ids = _params.get('id') + if ids: + if not isinstance(ids, list): + ids = [ids] + for id in ids: + if security_group.get('id') == id: + ret.append(security_group) + elif not (names or ids): + ret.append(security_group) + return {'security_groups': ret} + + def list_networks(self, **_params): + # neutronv2/api.py _get_available_networks calls this assuming + # search_opts filter "shared" is implemented and not ignored + shared = _params.get("shared", None) + if shared: + return {'networks': []} + else: + return {'networks': + [network for network in self._fake_networks.values()]} + + def list_ports(self, **_params): + ret = [] + device_id = _params.get('device_id') + for port in self._fake_ports.values(): + if device_id: + if port['device_id'] in device_id: + ret.append(port) + else: + ret.append(port) + return {'ports': ret} + + def list_subnets(self, **_params): + return {'subnets': + [subnet for subnet in self._fake_subnets.values()]} + + def list_floatingips(self, **_params): + return {'floatingips': []} + + def delete_security_group(self, security_group): + self.show_security_group(security_group) + ports = self.list_ports() + for port in ports.get('ports'): + for sg_port in port['security_groups']: + if sg_port == security_group: + msg = ('Unable to delete Security group %s in use' + % security_group) + raise n_exc.NeutronClientException(message=msg, + status_code=409) + del self._fake_security_groups[security_group] + + def delete_security_group_rule(self, security_group_rule): + self.show_security_group_rule(security_group_rule) + del self._fake_security_group_rules[security_group_rule] + + def delete_network(self, network): + self.show_network(network) + self._check_ports_on_network(network) + for subnet in self._fake_subnets.values(): + if subnet['network_id'] == network: + del self._fake_subnets[subnet['id']] + del self._fake_networks[network] + + def delete_port(self, port): + self.show_port(port) + del self._fake_ports[port] + + def update_port(self, port, body=None): + self.show_port(port) + self._fake_ports[port].update(body['port']) + return {'port': self._fake_ports[port]} + + def list_extensions(self, **_parms): + return {'extensions': []} + + def _check_ports_on_network(self, network): + ports = self.list_ports() + for port in ports: + if port['network_id'] == network: + msg = ('Unable to complete operation on network %s. There is ' + 'one or more ports still in use on the network' + % network) + raise n_exc.NeutronClientException(message=msg, status_code=409) + + def find_resource(self, resource, name_or_id, project_id=None, + cmd_resource=None, parent_id=None, fields=None): + if resource == 'security_group': + # lookup first by unique id + sg = self._fake_security_groups.get(name_or_id) + if sg: + return sg + # lookup by name, raise an exception on duplicates + res = None + for sg in self._fake_security_groups.values(): + if sg['name'] == name_or_id: + if res: + raise n_exc.NeutronClientNoUniqueMatch( + resource=resource, name=name_or_id) + res = sg + if res: + return res + raise n_exc.NotFound("Fake %s '%s' not found." % + (resource, name_or_id)) + + +def get_client(context=None, admin=False): + return MockClient() class TestSecurityGroupsV21(test.TestCase): secgrp_ctl_cls = secgroups_v21.SecurityGroupController server_secgrp_ctl_cls = secgroups_v21.ServerSecurityGroupController secgrp_act_ctl_cls = secgroups_v21.SecurityGroupActionController - # This class is subclassed by Neutron security group API tests so we need - # to be able to override this before creating the controller object. - use_neutron = False def setUp(self): super(TestSecurityGroupsV21, self).setUp() - # Neutron security groups are tested in test_neutron_security_groups.py - self.flags(use_neutron=self.use_neutron) + self.controller = self.secgrp_ctl_cls() self.server_controller = self.server_secgrp_ctl_cls() self.manager = self.secgrp_act_ctl_cls() @@ -113,10 +391,7 @@ class TestSecurityGroupsV21(test.TestCase): # This needs to be done here to set fake_id because the derived # class needs to be called first if it wants to set # 'security_group_api' and this setUp method needs to be called. - if self.controller.security_group_api.id_is_uuid: - self.fake_id = '11111111-1111-1111-1111-111111111111' - else: - self.fake_id = '11111111' + self.fake_id = '11111111-1111-1111-1111-111111111111' self.req = fakes.HTTPRequest.blank('') self.admin_req = fakes.HTTPRequest.blank('', use_admin_context=True) @@ -127,6 +402,46 @@ class TestSecurityGroupsV21(test.TestCase): 'uuid': UUID_SERVER, 'name': 'asdf'})) + self.original_client = neutron_api.get_client + neutron_api.get_client = get_client + + def tearDown(self): + neutron_api.get_client = self.original_client + get_client()._reset() + super(TestSecurityGroupsV21, self).tearDown() + + def _create_sg_template(self, **kwargs): + sg = security_group_request_template(**kwargs) + return self.controller.create(self.req, body={'security_group': sg}) + + def _create_network(self): + body = {'network': {'name': 'net1'}} + neutron = get_client() + net = neutron.create_network(body) + body = {'subnet': {'network_id': net['network']['id'], + 'cidr': '10.0.0.0/24'}} + neutron.create_subnet(body) + return net + + def _create_port(self, **kwargs): + body = {'port': {'binding:vnic_type': model.VNIC_TYPE_NORMAL}} + fields = ['security_groups', 'device_id', 'network_id', + 'port_security_enabled', 'ip_allocation'] + for field in fields: + if field in kwargs: + body['port'][field] = kwargs[field] + neutron = get_client() + return neutron.create_port(body) + + def _create_security_group(self, **kwargs): + body = {'security_group': {}} + fields = ['name', 'description'] + for field in fields: + if field in kwargs: + body['security_group'][field] = kwargs[field] + neutron = get_client() + return neutron.create_security_group(body) + def _assert_security_groups_in_use(self, project_id, user_id, in_use): context = context_maker.get_admin_context() count = objects.Quotas.count_as_dict(context, 'security_groups', @@ -173,38 +488,19 @@ class TestSecurityGroupsV21(test.TestCase): self.req, {'security_group': sg}) def test_get_security_group_list(self): - self._test_get_security_group_list() + self._create_sg_template().get('security_group') + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) + list_dict = self.controller.index(req) + self.assertEqual(len(list_dict['security_groups']), 2) def test_get_security_group_list_offset_and_limit(self): - self._test_get_security_group_list(limited=True) - - def _test_get_security_group_list(self, limited=False): - groups = [] - for i, name in enumerate(['default', 'test']): - sg = security_group_template(id=i + 1, - name=name, - description=name + '-desc', - rules=[]) - groups.append(sg) - if limited: - expected = {'security_groups': [groups[1]]} - else: - expected = {'security_groups': groups} - - def return_security_groups(context, project_id): - return [security_group_db(sg) for sg in groups] - - self.stub_out('nova.db.api.security_group_get_by_project', - return_security_groups) - - path = '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID - if limited: - path += '?offset=1&limit=1' - req = fakes.HTTPRequest.blank(path, use_admin_context=True) - - res_dict = self.controller.index(req) - - self.assertEqual(res_dict, expected) + path = ('/v2/%s/os-security-groups?offset=1&limit=1' % + fakes.FAKE_PROJECT_ID) + self._create_sg_template().get('security_group') + req = fakes.HTTPRequest.blank(path) + list_dict = self.controller.index(req) + self.assertEqual(len(list_dict['security_groups']), 1) def test_get_security_group_list_missing_group_id_rule(self): groups = [] @@ -249,78 +545,34 @@ class TestSecurityGroupsV21(test.TestCase): search_opts={}) def test_get_security_group_by_instance(self): - groups = [] - for i, name in enumerate(['default', 'test']): - # Create two rules per group to test that we don't perform - # redundant group lookups. For the default group, the rule group_id - # is the group itself. For the test group, the rule group_id points - # to a non-existent group. - group_id = i + 1 if name == 'default' else 'HAS_BEEN_DELETED' - rule1 = security_group_rule_template( - cidr='10.2.3.125/24', parent_group_id=1, id=99, protocol='TCP', - group_id=group_id) - rule2 = security_group_rule_template( - cidr='10.2.3.126/24', parent_group_id=1, id=77, protocol='UDP', - group_id=group_id) - sg = security_group_template( - id=i + 1, name=name, description=name + '-desc', - rules=[rule1, rule2], tenant_id=fakes.FAKE_PROJECT_ID) - groups.append(sg) - - # An expected rule here needs to be created as the api returns - # different attributes on the rule for a response than what was - # passed in. - expected_rule1 = security_group_rule_template( - ip_range={}, parent_group_id=1, ip_protocol='TCP', - group={'name': 'default', 'tenant_id': fakes.FAKE_PROJECT_ID}, - id=99) - expected_rule2 = security_group_rule_template( - ip_range={}, parent_group_id=1, ip_protocol='UDP', - group={'name': 'default', 'tenant_id': fakes.FAKE_PROJECT_ID}, - id=77) - expected_group1 = security_group_template( - id=1, name='default', description='default-desc', - rules=[expected_rule1, expected_rule2], - tenant_id=fakes.FAKE_PROJECT_ID) - expected_group2 = security_group_template( - id=2, name='test', description='test-desc', rules=[], - tenant_id=fakes.FAKE_PROJECT_ID) - - expected = {'security_groups': [expected_group1, expected_group2]} - - def return_security_groups(context, instance_uuid): - self.assertEqual(instance_uuid, UUID_SERVER) - return [security_group_db(sg) for sg in groups] - - self.stub_out('nova.db.api.security_group_get_by_instance', - return_security_groups) - - # Stub out the security group API get() method to assert that we only - # call it at most once per group ID. - original_sg_get = self.server_controller.security_group_api.get - queried_group_ids = [] - - def fake_security_group_api_get(_self, context, name=None, id=None, - map_exception=False): - if id in queried_group_ids: - self.fail('Queried security group %s more than once.' % id) - queried_group_ids.append(id) - return original_sg_get(context, id=id) - - self.stub_out('nova.compute.api.SecurityGroupAPI.get', - fake_security_group_api_get) - - res_dict = self.server_controller.index(self.req, FAKE_UUID1) - + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + device_id=UUID_SERVER) + expected = [{'rules': [], 'tenant_id': fakes.FAKE_PROJECT_ID, + 'id': sg['id'], 'name': 'test', + 'description': 'test-description'}] + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/os-security-groups' + % (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + res_dict = self.server_controller.index( + req, UUID_SERVER)['security_groups'] self.assertEqual(expected, res_dict) - @mock.patch('nova.db.api.security_group_get_by_instance', return_value=[]) - def test_get_security_group_empty_for_instance(self, mock_sec_group): - expected = {'security_groups': []} - res_dict = self.server_controller.index(self.req, UUID_SERVER) - self.assertEqual(expected, res_dict) - mock_sec_group.assert_called_once_with( - self.req.environ['nova.context'], UUID_SERVER) + @mock.patch('nova.network.security_group.neutron_driver.SecurityGroupAPI.' + 'get_instances_security_groups_bindings') + def test_get_security_group_empty_for_instance(self, neutron_sg_bind_mock): + servers = [{'id': FAKE_UUID1}] + neutron_sg_bind_mock.return_value = {} + + security_group_api = self.controller.security_group_api + ctx = context_maker.get_admin_context() + sgs = security_group_api.get_instance_security_groups(ctx, + instance_obj.Instance(uuid=FAKE_UUID1)) + + neutron_sg_bind_mock.assert_called_once_with(ctx, servers, False) + self.assertEqual([], sgs) def test_get_security_group_by_instance_non_existing(self): with mock.patch('nova.compute.api.API.get', @@ -330,17 +582,11 @@ class TestSecurityGroupsV21(test.TestCase): self.server_controller.index, self.req, '1') def test_get_security_group_by_id(self): - sg = security_group_template(id=2, rules=[]) - - def return_security_group(context, group_id, columns_to_join=None): - self.assertEqual(sg['id'], group_id) - return security_group_db(sg) - - self.stub_out('nova.db.api.security_group_get', - return_security_group) - - res_dict = self.controller.show(self.req, '2') - + sg = self._create_sg_template().get('security_group') + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups/%s' + % (fakes.FAKE_PROJECT_ID, sg['id'])) + res_dict = self.controller.show(req, sg['id']) expected = {'security_group': sg} self.assertEqual(res_dict, expected) @@ -359,43 +605,18 @@ class TestSecurityGroupsV21(test.TestCase): self.req, '1', {'security_group': sg}) def test_delete_security_group_by_id(self): - sg = security_group_template(id=1, project_id=fakes.FAKE_PROJECT_ID, - user_id='fake_user', rules=[]) - - self.called = False - - def security_group_destroy(context, id): - self.called = True - - def return_security_group(context, group_id, columns_to_join=None): - self.assertEqual(sg['id'], group_id) - return security_group_db(sg) - - self.stub_out('nova.db.api.security_group_destroy', - security_group_destroy) - self.stub_out('nova.db.api.security_group_get', - return_security_group) - - self.controller.delete(self.req, '1') - - self.assertTrue(self.called) + sg = self._create_sg_template().get('security_group') + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups/%s' % + (fakes.FAKE_PROJECT_ID, sg['id'])) + self.controller.delete(req, sg['id']) def test_delete_security_group_by_admin(self): - sg = security_group_request_template() - - self.controller.create(self.req, {'security_group': sg}) - context = self.req.environ['nova.context'] - - # Ensure quota usage for security group is correct. - self._assert_security_groups_in_use(context.project_id, - context.user_id, 2) - - # Delete the security group by admin. - self.controller.delete(self.admin_req, '2') - - # Ensure quota for security group in use is released. - self._assert_security_groups_in_use(context.project_id, - context.user_id, 1) + sg = self._create_sg_template().get('security_group') + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups/%s' % + (fakes.FAKE_PROJECT_ID, sg['id']), use_admin_context=True) + self.controller.delete(req, sg['id']) def test_delete_security_group_by_invalid_id(self): self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, @@ -405,23 +626,26 @@ class TestSecurityGroupsV21(test.TestCase): self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, self.req, self.fake_id) - def test_delete_security_group_in_use(self): - sg = security_group_template(id=1, rules=[]) - - def security_group_in_use(context, id): - return True - - def return_security_group(context, group_id, columns_to_join=None): - self.assertEqual(sg['id'], group_id) - return security_group_db(sg) - - self.stub_out('nova.db.api.security_group_in_use', - security_group_in_use) - self.stub_out('nova.db.api.security_group_get', - return_security_group) + @mock.patch('nova.compute.utils.refresh_info_cache_for_instance') + def test_delete_security_group_in_use(self, refresh_info_cache_mock): + sg = self._create_sg_template().get('security_group') + self._create_network() + db_inst = fakes.stub_instance(id=1, nw_cache=[], security_groups=[]) + _context = context_maker.get_admin_context() + instance = instance_obj.Instance._from_db_object( + _context, instance_obj.Instance(), db_inst, + expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS) + neutron = neutron_api.API() + with mock.patch.object(nova.db.api, 'instance_get_by_uuid', + return_value=db_inst): + neutron.allocate_for_instance(_context, instance, False, None, + security_groups=[sg['id']]) + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups/%s' + % (fakes.FAKE_PROJECT_ID, sg['id'])) self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, - self.req, '1') + req, sg['id']) def _test_list_with_invalid_filter( self, url, expected_exception=exception.ValidationError): @@ -516,24 +740,94 @@ class TestSecurityGroupsV21(test.TestCase): self.manager._addSecurityGroup, self.req, '1', body) - @mock.patch.object(nova.db.api, 'instance_add_security_group') - def test_associate(self, mock_add_security_group): - self.stub_out('nova.db.api.security_group_get_by_name', - return_security_group_without_instances) + def test_associate_duplicate_names(self): + sg1 = self._create_security_group(name='sg1', + description='sg1')['security_group'] + self._create_security_group(name='sg1', + description='sg1')['security_group'] + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg1['id']], + device_id=UUID_SERVER) + + body = dict(addSecurityGroup=dict(name="sg1")) + + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.assertRaises(webob.exc.HTTPConflict, + self.manager._addSecurityGroup, + req, UUID_SERVER, body) + + def test_associate_port_security_enabled_true(self): + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + port_security_enabled=True, + device_id=UUID_SERVER) body = dict(addSecurityGroup=dict(name="test")) - self.manager._addSecurityGroup(self.req, UUID_SERVER, body) - mock_add_security_group.assert_called_once_with(mock.ANY, - mock.ANY, - mock.ANY) + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.manager._addSecurityGroup(req, UUID_SERVER, body) + + def test_associate_port_security_enabled_false(self): + self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], port_security_enabled=False, + device_id=UUID_SERVER) + + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, + req, UUID_SERVER, body) + + def test_associate_deferred_ip_port(self): + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + port_security_enabled=True, ip_allocation='deferred', + device_id=UUID_SERVER) + + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.manager._addSecurityGroup(req, UUID_SERVER, body) + + def test_associate(self): + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + device_id=UUID_SERVER) + + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.manager._addSecurityGroup(req, UUID_SERVER, body) def test_disassociate_by_non_existing_security_group_name(self): body = dict(removeSecurityGroup=dict(name='non-existing')) + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) self.assertRaises(webob.exc.HTTPNotFound, - self.manager._removeSecurityGroup, self.req, - UUID_SERVER, body) + self.manager._removeSecurityGroup, + req, UUID_SERVER, body) def test_disassociate_without_body(self): body = dict(removeSecurityGroup=None) @@ -557,8 +851,6 @@ class TestSecurityGroupsV21(test.TestCase): '1', body) def test_disassociate_non_existing_instance(self): - self.stub_out('nova.db.api.security_group_get_by_name', - return_security_group_by_name) body = dict(removeSecurityGroup=dict(name="test")) with mock.patch('nova.compute.api.API.get', side_effect=exception.InstanceNotFound( @@ -567,60 +859,119 @@ class TestSecurityGroupsV21(test.TestCase): self.manager._removeSecurityGroup, self.req, '1', body) - @mock.patch.object(nova.db.api, 'instance_remove_security_group') - def test_disassociate(self, mock_remove_sec_group): - self.stub_out('nova.db.api.security_group_get_by_name', - return_security_group_by_name) + def test_disassociate(self): + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + device_id=UUID_SERVER) body = dict(removeSecurityGroup=dict(name="test")) - self.manager._removeSecurityGroup(self.req, UUID_SERVER, body) - mock_remove_sec_group.assert_called_once_with(mock.ANY, - mock.ANY, - mock.ANY) + req = fakes.HTTPRequest.blank( + '/v2/%s/servers/%s/action' % + (fakes.FAKE_PROJECT_ID, UUID_SERVER)) + self.manager._removeSecurityGroup(req, UUID_SERVER, body) + + def test_get_instances_security_groups_bindings(self): + servers = [{'id': FAKE_UUID1}, {'id': FAKE_UUID2}] + sg1 = self._create_sg_template(name='test1').get('security_group') + sg2 = self._create_sg_template(name='test2').get('security_group') + # test name='' is replaced with id + sg3 = self._create_sg_template(name='').get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg1['id'], + sg2['id']], + device_id=FAKE_UUID1) + self._create_port( + network_id=net['network']['id'], security_groups=[sg2['id'], + sg3['id']], + device_id=FAKE_UUID2) + expected = {FAKE_UUID1: [{'name': sg1['name']}, {'name': sg2['name']}], + FAKE_UUID2: [{'name': sg2['name']}, {'name': sg3['id']}]} + security_group_api = self.controller.security_group_api + bindings = ( + security_group_api.get_instances_security_groups_bindings( + context_maker.get_admin_context(), servers)) + self.assertEqual(bindings, expected) + + def test_get_instance_security_groups(self): + sg1 = self._create_sg_template(name='test1').get('security_group') + sg2 = self._create_sg_template(name='test2').get('security_group') + # test name='' is replaced with id + sg3 = self._create_sg_template(name='').get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], + security_groups=[sg1['id'], sg2['id'], sg3['id']], + device_id=FAKE_UUID1) + + expected = [{'name': sg1['name']}, {'name': sg2['name']}, + {'name': sg3['id']}] + security_group_api = self.controller.security_group_api + sgs = security_group_api.get_instance_security_groups( + context_maker.get_admin_context(), + instance_obj.Instance(uuid=FAKE_UUID1)) + self.assertEqual(sgs, expected) + + def test_create_port_with_sg_and_port_security_enabled_true(self): + sg1 = self._create_sg_template(name='test1').get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg1['id']], + port_security_enabled=True, + device_id=FAKE_UUID1) + security_group_api = self.controller.security_group_api + sgs = security_group_api.get_instance_security_groups( + context_maker.get_admin_context(), + instance_obj.Instance(uuid=FAKE_UUID1)) + self.assertEqual(sgs, [{'name': 'test1'}]) + + def test_create_port_with_sg_and_port_security_enabled_false(self): + sg1 = self._create_sg_template(name='test1').get('security_group') + net = self._create_network() + self.assertRaises(exception.SecurityGroupCannotBeApplied, + self._create_port, + network_id=net['network']['id'], + security_groups=[sg1['id']], + port_security_enabled=False, + device_id=FAKE_UUID1) class TestSecurityGroupRulesV21(test.TestCase): secgrp_ctl_cls = secgroups_v21.SecurityGroupRulesController - # This class is subclassed by Neutron security group API tests so we need - # to be able to override this before creating the controller object. - use_neutron = False def setUp(self): super(TestSecurityGroupRulesV21, self).setUp() - # Neutron security groups are tested in test_neutron_security_groups.py - self.flags(use_neutron=self.use_neutron) self.controller = self.secgrp_ctl_cls() - if self.controller.security_group_api.id_is_uuid: - id1 = '11111111-1111-1111-1111-111111111111' - id2 = '22222222-2222-2222-2222-222222222222' - self.invalid_id = '33333333-3333-3333-3333-333333333333' - else: - id1 = 1 - id2 = 2 - self.invalid_id = '33333333' + self.controller_sg = secgroups_v21.SecurityGroupController() - self.sg1 = security_group_template(id=id1) + id1 = '11111111-1111-1111-1111-111111111111' + id2 = '22222222-2222-2222-2222-222222222222' + self.invalid_id = '33333333-3333-3333-3333-333333333333' + + self.sg1 = security_group_template( + id=id1, security_group_rules=[]) self.sg2 = security_group_template( - id=id2, name='authorize_revoke', + id=id2, security_group_rules=[], name='authorize_revoke', description='authorize-revoke testing') - db1 = security_group_db(self.sg1) - db2 = security_group_db(self.sg2) - - def return_security_group(context, group_id, columns_to_join=None): - if group_id == db1['id']: - return db1 - if group_id == db2['id']: - return db2 - raise exception.SecurityGroupNotFound(security_group_id=group_id) - - self.stub_out('nova.db.api.security_group_get', - return_security_group) - - self.parent_security_group = db2 + self.parent_security_group = security_group_db(self.sg2) self.req = fakes.HTTPRequest.blank('') + self.original_client = neutron_api.get_client + neutron_api.get_client = get_client + + neutron = get_client() + neutron._fake_security_groups[id1] = self.sg1 + neutron._fake_security_groups[id2] = self.sg2 + + def tearDown(self): + neutron_api.get_client = self.original_client + get_client()._reset() + super(TestSecurityGroupRulesV21, self).tearDown() + def test_create_by_cidr(self): rule = security_group_rule_template(cidr='10.2.3.124/24', parent_group_id=self.sg2['id']) @@ -738,21 +1089,30 @@ class TestSecurityGroupRulesV21(test.TestCase): self.req, {'security_group_rule': rule}) def test_create_add_existing_rules_by_cidr(self): - rule = security_group_rule_template(cidr='10.0.0.0/24', - parent_group_id=self.sg2['id']) - - self.parent_security_group['rules'] = [security_group_rule_db(rule)] - + sg = security_group_template() + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) + self.controller_sg.create(req, {'security_group': sg}) + rule = security_group_rule_template( + cidr='15.0.0.0/8', parent_group_id=self.sg2['id']) + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) + self.controller.create(req, {'security_group_rule': rule}) self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, - self.req, {'security_group_rule': rule}) + req, {'security_group_rule': rule}) def test_create_add_existing_rules_by_group_id(self): - rule = security_group_rule_template(group_id=1) - - self.parent_security_group['rules'] = [security_group_rule_db(rule)] - + sg = security_group_template() + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) + self.controller_sg.create(req, {'security_group': sg}) + rule = security_group_rule_template( + group=self.sg1['id'], parent_group_id=self.sg2['id']) + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) + self.controller.create(req, {'security_group_rule': rule}) self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, - self.req, {'security_group_rule': rule}) + req, {'security_group_rule': rule}) def test_create_with_no_body(self): self.assertRaises(webob.exc.HTTPBadRequest, @@ -971,21 +1331,15 @@ class TestSecurityGroupRulesV21(test.TestCase): self._test_create_with_ports('udp', 65535, 65535) def test_delete(self): - rule = security_group_rule_template(id=self.sg2['id'], - parent_group_id=self.sg2['id']) + rule = security_group_rule_template(parent_group_id=self.sg2['id']) - def security_group_rule_get(context, id): - return security_group_rule_db(rule) - - def security_group_rule_destroy(context, id): - pass - - self.stub_out('nova.db.api.security_group_rule_get', - security_group_rule_get) - self.stub_out('nova.db.api.security_group_rule_destroy', - security_group_rule_destroy) - - self.controller.delete(self.req, self.sg2['id']) + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-group-rules' % fakes.FAKE_PROJECT_ID) + res_dict = self.controller.create(req, {'security_group_rule': rule}) + security_group_rule = res_dict['security_group_rule'] + req = fakes.HTTPRequest.blank('/v2/%s/os-security-group-rules/%s' % ( + fakes.FAKE_PROJECT_ID, security_group_rule['id'])) + self.controller.delete(req, security_group_rule['id']) def test_delete_invalid_rule_id(self): self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, @@ -1081,22 +1435,43 @@ def fake_compute_create(*args, **kwargs): return ([fake_compute_get(*args, **kwargs)], '') -class SecurityGroupsOutputTestV21(test.TestCase): - base_url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID +class SecurityGroupsOutputTest(test.TestCase): content_type = 'application/json' def setUp(self): - super(SecurityGroupsOutputTestV21, self).setUp() - # Neutron security groups are tested in test_neutron_security_groups.py - self.flags(use_neutron=False) - fakes.stub_out_nw_api(self) - self.stubs.Set(compute.api.API, 'get', fake_compute_get) - self.stubs.Set(compute.api.API, 'get_all', fake_compute_get_all) - self.stubs.Set(compute.api.API, 'create', fake_compute_create) - self.app = self._setup_app() + super(SecurityGroupsOutputTest, self).setUp() - def _setup_app(self): - return fakes.wsgi_app_v21() + self.controller = secgroups_v21.SecurityGroupController() + self.original_client = neutron_api.get_client + neutron_api.get_client = get_client + + fakes.stub_out_nw_api(self) + self.stub_out('nova.compute.api.API.get', fake_compute_get) + self.stub_out('nova.compute.api.API.get_all', fake_compute_get_all) + self.stub_out('nova.compute.api.API.create', fake_compute_create) + self.stub_out( + 'nova.network.security_group.neutron_driver.SecurityGroupAPI.' + 'get_instances_security_groups_bindings', + self._fake_get_instances_security_groups_bindings) + + def tearDown(self): + neutron_api.get_client = self.original_client + get_client()._reset() + super(SecurityGroupsOutputTest, self).tearDown() + + def _fake_get_instances_security_groups_bindings(self, inst, context, + servers): + groups = { + '00000000-0000-0000-0000-000000000001': [{'name': 'fake-0-0'}, + {'name': 'fake-0-1'}], + '00000000-0000-0000-0000-000000000002': [{'name': 'fake-1-0'}, + {'name': 'fake-1-1'}], + '00000000-0000-0000-0000-000000000003': [{'name': 'fake-2-0'}, + {'name': 'fake-2-1'}]} + result = {} + for server in servers: + result[server['id']] = groups.get(server['id']) + return result def _make_request(self, url, body=None): req = fakes.HTTPRequest.blank(url) @@ -1105,7 +1480,10 @@ class SecurityGroupsOutputTestV21(test.TestCase): req.body = encodeutils.safe_encode(self._encode_body(body)) req.content_type = self.content_type req.headers['Accept'] = self.content_type - res = req.get_response(self.app) + + # NOTE: This 'os-security-groups' is for enabling security_groups + # attribute on response body. + res = req.get_response(fakes.wsgi_app_v21()) return res def _encode_body(self, body): @@ -1121,27 +1499,70 @@ class SecurityGroupsOutputTestV21(test.TestCase): return server.get('security_groups') def test_create(self): + url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' - server = dict(name='server_test', imageRef=image_uuid, flavorRef=2) - res = self._make_request(self.base_url, {'server': server}) + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) + security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}] + for security_group in security_groups: + sg = security_group_template(name=security_group['name']) + self.controller.create(req, {'security_group': sg}) + + server = dict(name='server_test', imageRef=image_uuid, flavorRef=2, + security_groups=security_groups) + res = self._make_request(url, {'server': server}) self.assertEqual(res.status_int, 202) server = self._get_server(res.body) for i, group in enumerate(self._get_groups(server)): name = 'fake-2-%s' % i self.assertEqual(group.get('name'), name) - def test_show(self): - url = self.base_url + '/' + UUID3 - res = self._make_request(url) + def test_create_server_get_default_security_group(self): + url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID + image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' + server = dict(name='server_test', imageRef=image_uuid, flavorRef=2) + res = self._make_request(url, {'server': server}) + self.assertEqual(res.status_int, 202) + server = self._get_server(res.body) + group = self._get_groups(server)[0] + self.assertEqual(group.get('name'), 'default') - self.assertEqual(res.status_int, 200) + def test_show(self): + self.stub_out('nova.network.security_group.neutron_driver.' + 'SecurityGroupAPI.get_instance_security_groups', + lambda self, inst, context, id: + [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]) + + url = '/v2/%s/servers' % fakes.FAKE_PROJECT_ID + image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' + req = fakes.HTTPRequest.blank( + '/v2/%s/os-security-groups' % fakes.FAKE_PROJECT_ID) + security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}] + for security_group in security_groups: + sg = security_group_template(name=security_group['name']) + self.controller.create(req, {'security_group': sg}) + server = dict(name='server_test', imageRef=image_uuid, flavorRef=2, + security_groups=security_groups) + + res = self._make_request(url, {'server': server}) + self.assertEqual(res.status_int, 202) server = self._get_server(res.body) for i, group in enumerate(self._get_groups(server)): name = 'fake-2-%s' % i self.assertEqual(group.get('name'), name) + # Test that show (GET) returns the same information as create (POST) + url = '/v2/%s/servers/%s' % (fakes.FAKE_PROJECT_ID, UUID3) + res = self._make_request(url) + self.assertEqual(res.status_int, 200) + server = self._get_server(res.body) + + for i, group in enumerate(self._get_groups(server)): + name = 'fake-2-%s' % i + self.assertEqual(group.get('name'), name) + def test_detail(self): - url = self.base_url + '/detail' + url = '/v2/%s/servers/detail' % fakes.FAKE_PROJECT_ID res = self._make_request(url) self.assertEqual(res.status_int, 200) @@ -1150,13 +1571,11 @@ class SecurityGroupsOutputTestV21(test.TestCase): name = 'fake-%s-%s' % (i, j) self.assertEqual(group.get('name'), name) - def test_no_instance_passthrough_404(self): - - def fake_compute_get(*args, **kwargs): - raise exception.InstanceNotFound(instance_id='fake') - - self.stubs.Set(compute.api.API, 'get', fake_compute_get) - url = self.base_url + '/70f6db34-de8d-4fbd-aafb-4065bdfa6115' + @mock.patch('nova.compute.api.API.get', + side_effect=exception.InstanceNotFound(instance_id='fake')) + def test_no_instance_passthrough_404(self, mock_get): + url = ('/v2/%s/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115' % + fakes.FAKE_PROJECT_ID) res = self._make_request(url) self.assertEqual(res.status_int, 404)