diff --git a/openstackclient/api/compute_v2.py b/openstackclient/api/compute_v2.py index 7429a859d7..af0c69e587 100644 --- a/openstackclient/api/compute_v2.py +++ b/openstackclient/api/compute_v2.py @@ -11,9 +11,16 @@ # under the License. # -"""Compute v2 API Library""" +"""Compute v2 API Library + +A collection of wrappers for deprecated Compute v2 APIs that are not +intentionally supported by SDK. Most of these are proxy APIs. +""" + +import http from keystoneauth1 import exceptions as ksa_exceptions +from openstack import exceptions as sdk_exceptions from osc_lib.api import api from osc_lib import exceptions from osc_lib.i18n import _ @@ -577,3 +584,40 @@ class APIv2(api.BaseAPI): return self.delete(f'/{url}/{security_group_rule_id}') return None + + +def find_security_group(compute_client, name_or_id): + """Find the name for a given security group name or ID + + https://docs.openstack.org/api-ref/compute/#show-security-group-details + + :param compute_client: A compute client + :param name_or_id: The name or ID of the security group to look up + :returns: A security group object + :raises exception.NotFound: If a matching security group could not be + found or more than one match was found + """ + response = compute_client.get( + f'/os-security-groups/{name_or_id}', microversion='2.1' + ) + if response.status_code != http.HTTPStatus.NOT_FOUND: + # there might be other, non-404 errors + sdk_exceptions.raise_from_response(response) + return response.json()['security_group'] + + response = compute_client.get('/os-security-groups', microversion='2.1') + sdk_exceptions.raise_from_response(response) + found = None + security_groups = response.json()['security_groups'] + for security_group in security_groups: + if security_group['name'] == name_or_id: + if found: + raise exceptions.NotFound( + f'multiple matches found for {name_or_id}' + ) + found = security_group + + if not found: + raise exceptions.NotFound(f'{name_or_id} not found') + + return found diff --git a/openstackclient/compute/v2/server.py b/openstackclient/compute/v2/server.py index 42caafead4..63640a0e1c 100644 --- a/openstackclient/compute/v2/server.py +++ b/openstackclient/compute/v2/server.py @@ -32,6 +32,7 @@ from osc_lib.command import command from osc_lib import exceptions from osc_lib import utils +from openstackclient.api import compute_v2 from openstackclient.common import pagination from openstackclient.i18n import _ from openstackclient.identity import common as identity_common @@ -677,17 +678,21 @@ class AddServerSecurityGroup(command.Command): return parser def take_action(self, parsed_args): - compute_client = self.app.client_manager.compute + compute_client = self.app.client_manager.sdk_connection.compute - server = utils.find_resource( - compute_client.servers, - parsed_args.server, + server = compute_client.find_server( + parsed_args.server, ignore_missing=False ) - security_group = compute_client.api.security_group_find( - parsed_args.group, - ) - - server.add_security_group(security_group['id']) + if self.app.client_manager.is_network_endpoint_enabled(): + # the server handles both names and IDs for neutron SGs, so just + # pass things through + security_group = parsed_args.group + else: + # however, if using nova-network then it needs a name, not an ID + security_group = compute_v2.find_security_group( + compute_client, parsed_args.group + )['name'] + compute_client.add_security_group_to_server(server, security_group) class AddServerVolume(command.ShowOne): @@ -3960,28 +3965,34 @@ class RemoveServerSecurityGroup(command.Command): parser.add_argument( 'server', metavar='<server>', - help=_('Name or ID of server to use'), + help=_('Server (name or ID)'), ) parser.add_argument( 'group', metavar='<group>', - help=_('Name or ID of security group to remove from server'), + help=_('Security group to remove (name or ID)'), ) return parser def take_action(self, parsed_args): - compute_client = self.app.client_manager.compute + compute_client = self.app.client_manager.sdk_connection.compute - server = utils.find_resource( - compute_client.servers, - parsed_args.server, + server = compute_client.find_server( + parsed_args.server, ignore_missing=False ) - security_group = compute_client.api.security_group_find( - parsed_args.group, + if self.app.client_manager.is_network_endpoint_enabled(): + # the server handles both names and IDs for neutron SGs, so just + # pass things through + security_group = parsed_args.group + else: + # however, if using nova-network then it needs a name, not an ID + security_group = compute_v2.find_security_group( + compute_client, parsed_args.group + )['name'] + compute_client.remove_security_group_from_server( + server, security_group ) - server.remove_security_group(security_group['id']) - class RemoveServerVolume(command.Command): _description = _( diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py index 3bcd2a912a..5a8442602b 100644 --- a/openstackclient/tests/unit/api/test_compute_v2.py +++ b/openstackclient/tests/unit/api/test_compute_v2.py @@ -13,11 +13,17 @@ """Compute v2 API Library Tests""" +import http +from unittest import mock +import uuid + from keystoneauth1 import session +from openstack.compute.v2 import _proxy from osc_lib import exceptions as osc_lib_exceptions from requests_mock.contrib import fixture from openstackclient.api import compute_v2 as compute +from openstackclient.tests.unit import fakes from openstackclient.tests.unit import utils @@ -648,3 +654,112 @@ class TestSecurityGroupRule(TestComputeAPIv2): ret = self.api.security_group_rule_delete('1') self.assertEqual(202, ret.status_code) self.assertEqual("", ret.text) + + +class TestFindSecurityGroup(utils.TestCase): + + def setUp(self): + super().setUp() + + self.compute_sdk_client = mock.Mock(_proxy.Proxy) + + def test_find_security_group_by_id(self): + sg_id = uuid.uuid4().hex + sg_name = 'name-' + uuid.uuid4().hex + data = { + 'security_group': { + 'id': sg_id, + 'name': sg_name, + 'description': 'description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + } + } + self.compute_sdk_client.get.side_effect = [ + fakes.FakeResponse(data=data), + ] + + result = compute.find_security_group(self.compute_sdk_client, sg_id) + + self.compute_sdk_client.get.assert_has_calls( + [ + mock.call(f'/os-security-groups/{sg_id}', microversion='2.1'), + ] + ) + self.assertEqual(data['security_group'], result) + + def test_find_security_group_by_name(self): + sg_id = uuid.uuid4().hex + sg_name = 'name-' + uuid.uuid4().hex + data = { + 'security_groups': [ + { + 'id': sg_id, + 'name': sg_name, + 'description': 'description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + } + ], + } + self.compute_sdk_client.get.side_effect = [ + fakes.FakeResponse(status_code=http.HTTPStatus.NOT_FOUND), + fakes.FakeResponse(data=data), + ] + + result = compute.find_security_group(self.compute_sdk_client, sg_name) + + self.compute_sdk_client.get.assert_has_calls( + [ + mock.call( + f'/os-security-groups/{sg_name}', microversion='2.1' + ), + mock.call('/os-security-groups', microversion='2.1'), + ] + ) + self.assertEqual(data['security_groups'][0], result) + + def test_find_security_group_not_found(self): + data = {'security_groups': []} + self.compute_sdk_client.get.side_effect = [ + fakes.FakeResponse(status_code=http.HTTPStatus.NOT_FOUND), + fakes.FakeResponse(data=data), + ] + self.assertRaises( + osc_lib_exceptions.NotFound, + compute.find_security_group, + self.compute_sdk_client, + 'invalid-sg', + ) + + def test_find_security_group_by_name_duplicate(self): + sg_name = 'name-' + uuid.uuid4().hex + data = { + 'security_groups': [ + { + 'id': uuid.uuid4().hex, + 'name': sg_name, + 'description': 'description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + }, + { + 'id': uuid.uuid4().hex, + 'name': sg_name, + 'description': 'description-' + uuid.uuid4().hex, + 'tenant_id': 'project-id-' + uuid.uuid4().hex, + 'rules': [], + }, + ], + } + self.compute_sdk_client.get.side_effect = [ + fakes.FakeResponse(status_code=http.HTTPStatus.NOT_FOUND), + fakes.FakeResponse(data=data), + ] + + self.assertRaises( + osc_lib_exceptions.NotFound, + compute.find_security_group, + self.compute_sdk_client, + sg_name, + ) diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py index 441d69f5c5..ec1190c7e9 100644 --- a/openstackclient/tests/unit/compute/v2/test_server.py +++ b/openstackclient/tests/unit/compute/v2/test_server.py @@ -18,7 +18,6 @@ import getpass import json import tempfile from unittest import mock -from unittest.mock import call import iso8601 from openstack import exceptions as sdk_exceptions @@ -26,6 +25,7 @@ from osc_lib.cli import format_columns from osc_lib import exceptions from osc_lib import utils as common_utils +from openstackclient.api import compute_v2 from openstackclient.compute.v2 import server from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit.image.v2 import fakes as image_fakes @@ -148,7 +148,7 @@ class TestServer(compute_fakes.TestComputev2): result = self.cmd.take_action(parsed_args) - calls = [call(s.id) for s in servers] + calls = [mock.call(s.id) for s in servers] method = getattr(self.compute_sdk_client, method_name) method.assert_has_calls(calls) self.assertIsNone(result) @@ -1221,42 +1221,65 @@ class TestServerAddNetwork(TestServer): ) -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_find') -class TestServerAddSecurityGroup(TestServer): +class TestServerAddSecurityGroup(compute_fakes.TestComputev2): def setUp(self): super().setUp() - self.security_group = compute_fakes.create_one_security_group() - - attrs = {'security_groups': [{'name': self.security_group['id']}]} - methods = { - 'add_security_group': None, - } - - self.server = compute_fakes.create_one_server( - attrs=attrs, methods=methods + self.server = compute_fakes.create_one_sdk_server() + self.compute_sdk_client.find_server.return_value = self.server + self.compute_sdk_client.add_security_group_to_server.return_value = ( + None ) - # This is the return value for utils.find_resource() for server - self.servers_mock.get.return_value = self.server # Get the command object to test self.cmd = server.AddServerSecurityGroup(self.app, None) - def test_server_add_security_group(self, sg_find_mock): - sg_find_mock.return_value = self.security_group - arglist = [self.server.id, self.security_group['id']] + def test_server_add_security_group__nova_network(self): + arglist = [self.server.id, 'fake_sg'] verifylist = [ ('server', self.server.id), - ('group', self.security_group['id']), + ('group', 'fake_sg'), ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + with mock.patch.object( + self.app.client_manager, + 'is_network_endpoint_enabled', + return_value=False, + ): + with mock.patch.object( + compute_v2, + 'find_security_group', + return_value={'name': 'fake_sg'}, + ) as mock_find_nova_net_sg: + result = self.cmd.take_action(parsed_args) + + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.id, ignore_missing=False + ) + self.compute_sdk_client.add_security_group_to_server.assert_called_once_with( + self.server, 'fake_sg' + ) + mock_find_nova_net_sg.assert_called_once_with( + self.compute_sdk_client, 'fake_sg' + ) + self.assertIsNone(result) + + def test_server_add_security_group(self): + arglist = [self.server.id, 'fake_sg'] + verifylist = [ + ('server', self.server.id), + ('group', 'fake_sg'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - sg_find_mock.assert_called_with( - self.security_group['id'], + + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.id, ignore_missing=False ) - self.servers_mock.get.assert_called_with(self.server.id) - self.server.add_security_group.assert_called_with( - self.security_group['id'], + self.compute_sdk_client.add_security_group_to_server.assert_called_once_with( + self.server, 'fake_sg' ) self.assertIsNone(result) @@ -4452,7 +4475,7 @@ class TestServerDelete(TestServer): calls = [] for s in servers: - calls.append(call(s.id)) + calls.append(mock.call(s.id)) self.servers_mock.delete.assert_has_calls(calls) self.assertIsNone(result) @@ -7260,42 +7283,65 @@ class TestServerRemoveNetwork(TestServer): self.find_network.assert_not_called() -@mock.patch('openstackclient.api.compute_v2.APIv2.security_group_find') class TestServerRemoveSecurityGroup(TestServer): def setUp(self): super().setUp() - self.security_group = compute_fakes.create_one_security_group() - - attrs = {'security_groups': [{'name': self.security_group['id']}]} - methods = { - 'remove_security_group': None, - } - - self.server = compute_fakes.create_one_server( - attrs=attrs, methods=methods + self.server = compute_fakes.create_one_sdk_server() + self.compute_sdk_client.find_server.return_value = self.server + self.compute_sdk_client.remove_security_group_from_server.return_value = ( + None ) - # This is the return value for utils.find_resource() for server - self.servers_mock.get.return_value = self.server # Get the command object to test self.cmd = server.RemoveServerSecurityGroup(self.app, None) - def test_server_remove_security_group(self, sg_find_mock): - sg_find_mock.return_value = self.security_group - arglist = [self.server.id, self.security_group['id']] + def test_server_remove_security_group__nova_network(self): + arglist = [self.server.id, 'fake_sg'] verifylist = [ ('server', self.server.id), - ('group', self.security_group['id']), + ('group', 'fake_sg'), ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + with mock.patch.object( + self.app.client_manager, + 'is_network_endpoint_enabled', + return_value=False, + ): + with mock.patch.object( + compute_v2, + 'find_security_group', + return_value={'name': 'fake_sg'}, + ) as mock_find_nova_net_sg: + result = self.cmd.take_action(parsed_args) + + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.id, ignore_missing=False + ) + self.compute_sdk_client.remove_security_group_from_server.assert_called_once_with( + self.server, 'fake_sg' + ) + mock_find_nova_net_sg.assert_called_once_with( + self.compute_sdk_client, 'fake_sg' + ) + self.assertIsNone(result) + + def test_server_remove_security_group(self): + arglist = [self.server.id, 'fake_sg'] + verifylist = [ + ('server', self.server.id), + ('group', 'fake_sg'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - sg_find_mock.assert_called_with( - self.security_group['id'], + + self.compute_sdk_client.find_server.assert_called_once_with( + self.server.id, ignore_missing=False ) - self.servers_mock.get.assert_called_with(self.server.id) - self.server.remove_security_group.assert_called_with( - self.security_group['id'], + self.compute_sdk_client.remove_security_group_from_server.assert_called_once_with( + self.server, 'fake_sg' ) self.assertIsNone(result)