diff --git a/openstackclient/api/compute_v2.py b/openstackclient/api/compute_v2.py index 3bf3a0d81f..065121fc68 100644 --- a/openstackclient/api/compute_v2.py +++ b/openstackclient/api/compute_v2.py @@ -19,6 +19,12 @@ from osc_lib import exceptions from osc_lib.i18n import _ +# TODO(dtroyer): Mingrate this to osc-lib +class InvalidValue(Exception): + """An argument value is not valid: wrong type, out of range, etc""" + message = "Supplied value is not valid" + + class APIv2(api.BaseAPI): """Compute v2 API""" @@ -27,6 +33,29 @@ class APIv2(api.BaseAPI): # Overrides + def _check_integer(self, value, msg=None): + """Attempt to convert value to an integer + + Raises InvalidValue on failure + + :param value: + Convert this to an integer. None is converted to 0 (zero). + :param msg: + An alternate message for the exception, must include exactly + one substitution to receive the attempted value. + """ + + if value is None: + return 0 + + try: + value = int(value) + except (TypeError, ValueError): + if not msg: + msg = "%s is not an integer" % value + raise InvalidValue(msg) + return value + # TODO(dtroyer): Override find() until these fixes get into an osc-lib # minimum release def find( @@ -209,3 +238,71 @@ class APIv2(api.BaseAPI): json={'security_group': security_group}, ).json()['security_group'] return None + + # Security Group Rules + + def security_group_rule_create( + self, + security_group_id=None, + ip_protocol=None, + from_port=None, + to_port=None, + remote_ip=None, + remote_group=None, + ): + """Create a new security group rule + + https://developer.openstack.org/api-ref/compute/#create-security-group-rule + + :param string security_group_id: + Security group ID + :param ip_protocol: + IP protocol, 'tcp', 'udp' or 'icmp' + :param from_port: + Source port + :param to_port: + Destination port + :param remote_ip: + Source IP address in CIDR notation + :param remote_group: + Remote security group + """ + + url = "/os-security-group-rules" + + if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']: + raise InvalidValue( + "%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol + ) + + params = { + 'parent_group_id': security_group_id, + 'ip_protocol': ip_protocol, + 'from_port': self._check_integer(from_port), + 'to_port': self._check_integer(to_port), + 'cidr': remote_ip, + 'group_id': remote_group, + } + + return self.create( + url, + json={'security_group_rule': params}, + )['security_group_rule'] + + def security_group_rule_delete( + self, + security_group_rule_id=None, + ): + """Delete a security group rule + + https://developer.openstack.org/api-ref/compute/#delete-security-group-rule + + :param string security_group_rule_id: + Security group rule ID + """ + + url = "/os-security-group-rules" + if security_group_rule_id is not None: + return self.delete('/%s/%s' % (url, security_group_rule_id)) + + return None diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py index 33c3ff0211..ad685cf8ae 100644 --- a/openstackclient/network/v2/security_group_rule.py +++ b/openstackclient/network/v2/security_group_rule.py @@ -376,15 +376,15 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne): else: remote_ip = '0.0.0.0/0' - obj = client.security_group_rules.create( - group['id'], - protocol, - from_port, - to_port, - remote_ip, - parsed_args.remote_group, + obj = client.api.security_group_rule_create( + security_group_id=group['id'], + ip_protocol=protocol, + from_port=from_port, + to_port=to_port, + remote_ip=remote_ip, + remote_group=parsed_args.remote_group, ) - return _format_security_group_rule_show(obj._info) + return _format_security_group_rule_show(obj) class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): @@ -409,7 +409,7 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): client.delete_security_group_rule(obj) def take_action_compute(self, client, parsed_args): - client.security_group_rules.delete(self.r) + client.api.security_group_rule_delete(self.r) class ListSecurityGroupRule(common.NetworkAndComputeLister): diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py index f443e810c1..949205fd8b 100644 --- a/openstackclient/tests/unit/api/test_compute_v2.py +++ b/openstackclient/tests/unit/api/test_compute_v2.py @@ -226,3 +226,84 @@ class TestSecurityGroup(TestComputeAPIv2): security_group='sg2', description='desc2') self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) + + +class TestSecurityGroupRule(TestComputeAPIv2): + + FAKE_SECURITY_GROUP_RULE_RESP = { + 'id': '1', + 'name': 'sgr1', + 'tenant_id': 'proj-1', + 'ip_protocol': 'TCP', + 'from_port': 1, + 'to_port': 22, + 'group': {}, + # 'ip_range': , + # 'cidr': , + # 'parent_group_id': , + } + + def test_security_group_create_no_options(self): + self.requests_mock.register_uri( + 'POST', + FAKE_URL + '/os-security-group-rules', + json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, + status_code=200, + ) + ret = self.api.security_group_rule_create( + security_group_id='1', + ip_protocol='tcp', + ) + self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) + + def test_security_group_create_options(self): + self.requests_mock.register_uri( + 'POST', + FAKE_URL + '/os-security-group-rules', + json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, + status_code=200, + ) + ret = self.api.security_group_rule_create( + security_group_id='1', + ip_protocol='tcp', + from_port=22, + to_port=22, + remote_ip='1.2.3.4/24', + ) + self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) + + def test_security_group_create_port_errors(self): + self.requests_mock.register_uri( + 'POST', + FAKE_URL + '/os-security-group-rules', + json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, + status_code=200, + ) + self.assertRaises( + compute.InvalidValue, + self.api.security_group_rule_create, + security_group_id='1', + ip_protocol='tcp', + from_port='', + to_port=22, + remote_ip='1.2.3.4/24', + ) + self.assertRaises( + compute.InvalidValue, + self.api.security_group_rule_create, + security_group_id='1', + ip_protocol='tcp', + from_port=0, + to_port=[], + remote_ip='1.2.3.4/24', + ) + + def test_security_group_rule_delete(self): + self.requests_mock.register_uri( + 'DELETE', + FAKE_URL + '/os-security-group-rules/1', + status_code=202, + ) + ret = self.api.security_group_rule_delete('1') + self.assertEqual(202, ret.status_code) + self.assertEqual("", ret.text) diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py index 05cb507625..f7e9548264 100644 --- a/openstackclient/tests/unit/compute/v2/fakes.py +++ b/openstackclient/tests/unit/compute/v2/fakes.py @@ -556,10 +556,7 @@ class FakeSecurityGroupRule(object): # Overwrite default attributes. security_group_rule_attrs.update(attrs) - security_group_rule = fakes.FakeResource( - info=copy.deepcopy(security_group_rule_attrs), - loaded=True) - return security_group_rule + return security_group_rule_attrs @staticmethod def create_security_group_rules(attrs=None, count=2): diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py index db9831bb2b..c949e2c82d 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_compute.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py @@ -356,7 +356,7 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute): # The security group to be shown. _security_group = \ compute_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'rules': [_security_group_rule._info]} + attrs={'rules': [_security_group_rule]} ) columns = ( @@ -373,7 +373,7 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute): _security_group['name'], _security_group['tenant_id'], security_group._format_compute_security_group_rules( - [_security_group_rule._info]), + [_security_group_rule]), ) def setUp(self): diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py index 06a7a25d45..5c1937e307 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py @@ -32,6 +32,9 @@ class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): self.compute = self.app.client_manager.compute +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_rule_create' +) class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): project = identity_fakes.FakeProject.create_one_project() @@ -48,11 +51,9 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self._security_group_rule = \ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( attrs) - self.compute.security_group_rules.create.return_value = \ - self._security_group_rule expected_columns, expected_data = \ security_group_rule._format_security_group_rule_show( - self._security_group_rule._info) + self._security_group_rule) return expected_columns, expected_data def setUp(self): @@ -67,11 +68,11 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # Get the command object to test self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) - def test_security_group_rule_create_no_options(self): + def test_security_group_rule_create_no_options(self, sgr_mock): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, [], []) - def test_security_group_rule_create_all_source_options(self): + def test_security_group_rule_create_all_source_options(self, sgr_mock): arglist = [ '--src-ip', '10.10.0.0/24', '--src-group', self._security_group['id'], @@ -80,7 +81,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_security_group_rule_create_all_remote_options(self): + def test_security_group_rule_create_all_remote_options(self, sgr_mock): arglist = [ '--remote-ip', '10.10.0.0/24', '--remote-group', self._security_group['id'], @@ -89,7 +90,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_security_group_rule_create_bad_protocol(self): + def test_security_group_rule_create_bad_protocol(self, sgr_mock): arglist = [ '--protocol', 'foo', self._security_group['id'], @@ -97,7 +98,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_security_group_rule_create_all_protocol_options(self): + def test_security_group_rule_create_all_protocol_options(self, sgr_mock): arglist = [ '--protocol', 'tcp', '--proto', 'tcp', @@ -106,7 +107,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_security_group_rule_create_network_options(self): + def test_security_group_rule_create_network_options(self, sgr_mock): arglist = [ '--ingress', '--ethertype', 'IPv4', @@ -119,17 +120,18 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.assertRaises(tests_utils.ParserException, self.check_parser, self.cmd, arglist, []) - def test_security_group_rule_create_default_rule(self): + def test_security_group_rule_create_default_rule(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule() - dst_port = str(self._security_group_rule.from_port) + ':' + \ - str(self._security_group_rule.to_port) + sgr_mock.return_value = self._security_group_rule + dst_port = str(self._security_group_rule['from_port']) + ':' + \ + str(self._security_group_rule['to_port']) arglist = [ '--dst-port', dst_port, self._security_group['id'], ] verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), + ('dst_port', (self._security_group_rule['from_port'], + self._security_group_rule['to_port'])), ('group', self._security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -138,31 +140,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=None, ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) - def test_security_group_rule_create_source_group(self): + def test_security_group_rule_create_source_group(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule({ 'from_port': 22, 'to_port': 22, 'group': {'name': self._security_group['name']}, }) + sgr_mock.return_value = self._security_group_rule arglist = [ - '--dst-port', str(self._security_group_rule.from_port), + '--dst-port', str(self._security_group_rule['from_port']), '--src-group', self._security_group['name'], self._security_group['id'], ] verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), + ('dst_port', (self._security_group_rule['from_port'], + self._security_group_rule['to_port'])), ('src_group', self._security_group['name']), ('group', self._security_group['id']), ] @@ -172,31 +175,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group['id'], + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=self._security_group['id'], ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) - def test_security_group_rule_create_remote_group(self): + def test_security_group_rule_create_remote_group(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule({ 'from_port': 22, 'to_port': 22, 'group': {'name': self._security_group['name']}, }) + sgr_mock.return_value = self._security_group_rule arglist = [ - '--dst-port', str(self._security_group_rule.from_port), + '--dst-port', str(self._security_group_rule['from_port']), '--remote-group', self._security_group['name'], self._security_group['id'], ] verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), + ('dst_port', (self._security_group_rule['from_port'], + self._security_group_rule['to_port'])), ('remote_group', self._security_group['name']), ('group', self._security_group['id']), ] @@ -206,32 +210,33 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group['id'], + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=self._security_group['id'], ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) - def test_security_group_rule_create_source_ip(self): + def test_security_group_rule_create_source_ip(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule({ 'ip_protocol': 'icmp', 'from_port': -1, 'to_port': -1, 'ip_range': {'cidr': '10.0.2.0/24'}, }) + sgr_mock.return_value = self._security_group_rule arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], + '--protocol', self._security_group_rule['ip_protocol'], + '--src-ip', self._security_group_rule['ip_range']['cidr'], self._security_group['id'], ] verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('src_ip', self._security_group_rule.ip_range['cidr']), + ('protocol', self._security_group_rule['ip_protocol']), + ('src_ip', self._security_group_rule['ip_range']['cidr']), ('group', self._security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -240,32 +245,33 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=None, ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) - def test_security_group_rule_create_remote_ip(self): + def test_security_group_rule_create_remote_ip(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule({ 'ip_protocol': 'icmp', 'from_port': -1, 'to_port': -1, 'ip_range': {'cidr': '10.0.2.0/24'}, }) + sgr_mock.return_value = self._security_group_rule arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--remote-ip', self._security_group_rule.ip_range['cidr'], + '--protocol', self._security_group_rule['ip_protocol'], + '--remote-ip', self._security_group_rule['ip_range']['cidr'], self._security_group['id'], ] verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('remote_ip', self._security_group_rule.ip_range['cidr']), + ('protocol', self._security_group_rule['ip_protocol']), + ('remote_ip', self._security_group_rule['ip_range']['cidr']), ('group', self._security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -274,33 +280,34 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=None, ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) - def test_security_group_rule_create_proto_option(self): + def test_security_group_rule_create_proto_option(self, sgr_mock): expected_columns, expected_data = self._setup_security_group_rule({ 'ip_protocol': 'icmp', 'from_port': -1, 'to_port': -1, 'ip_range': {'cidr': '10.0.2.0/24'}, }) + sgr_mock.return_value = self._security_group_rule arglist = [ - '--proto', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], + '--proto', self._security_group_rule['ip_protocol'], + '--src-ip', self._security_group_rule['ip_range']['cidr'], self._security_group['id'], ] verifylist = [ - ('proto', self._security_group_rule.ip_protocol), + ('proto', self._security_group_rule['ip_protocol']), ('protocol', None), - ('src_ip', self._security_group_rule.ip_range['cidr']), + ('src_ip', self._security_group_rule['ip_range']['cidr']), ('group', self._security_group['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -309,18 +316,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # TODO(dtroyer): save this for the security group rule changes # self.compute.api.security_group_rule_create.assert_called_once_with( - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group['id'], - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, + sgr_mock.assert_called_once_with( + security_group_id=self._security_group['id'], + ip_protocol=self._security_group_rule['ip_protocol'], + from_port=self._security_group_rule['from_port'], + to_port=self._security_group_rule['to_port'], + remote_ip=self._security_group_rule['ip_range']['cidr'], + remote_group=None, ) self.assertEqual(expected_columns, columns) self.assertEqual(expected_data, data) +@mock.patch( + 'openstackclient.api.compute_v2.APIv2.security_group_rule_delete' +) class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # The security group rule to be deleted. @@ -336,27 +346,27 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # Get the command object to test self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) - def test_security_group_rule_delete(self): + def test_security_group_rule_delete(self, sgr_mock): arglist = [ - self._security_group_rules[0].id, + self._security_group_rules[0]['id'], ] verifylist = [ - ('rule', [self._security_group_rules[0].id]), + ('rule', [self._security_group_rules[0]['id']]), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.compute.security_group_rules.delete.assert_called_once_with( - self._security_group_rules[0].id) + sgr_mock.assert_called_once_with( + self._security_group_rules[0]['id']) self.assertIsNone(result) - def test_security_group_rule_multi_delete(self): + def test_security_group_rule_delete_multi(self, sgr_mock): arglist = [] verifylist = [] for s in self._security_group_rules: - arglist.append(s.id) + arglist.append(s['id']) verifylist = [ ('rule', arglist), ] @@ -366,25 +376,23 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): calls = [] for s in self._security_group_rules: - calls.append(call(s.id)) - self.compute.security_group_rules.delete.assert_has_calls(calls) + calls.append(call(s['id'])) + sgr_mock.assert_has_calls(calls) self.assertIsNone(result) - def test_security_group_rule_multi_delete_with_exception(self): + def test_security_group_rule_delete_multi_with_exception(self, sgr_mock): arglist = [ - self._security_group_rules[0].id, + self._security_group_rules[0]['id'], 'unexist_rule', ] verifylist = [ ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), + [self._security_group_rules[0]['id'], 'unexist_rule']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) find_mock_result = [None, exceptions.CommandError] - self.compute.security_group_rules.delete = ( - mock.Mock(side_effect=find_mock_result) - ) + sgr_mock.side_effect = find_mock_result try: self.cmd.take_action(parsed_args) @@ -392,9 +400,9 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): except exceptions.CommandError as e: self.assertEqual('1 of 2 rules failed to delete.', str(e)) - self.compute.security_group_rules.delete.assert_any_call( - self._security_group_rules[0].id) - self.compute.security_group_rules.delete.assert_any_call( + sgr_mock.assert_any_call( + self._security_group_rules[0]['id']) + sgr_mock.assert_any_call( 'unexist_rule') @@ -421,8 +429,8 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): 'group': {'name': _security_group['name']}, }) _security_group['rules'] = [ - _security_group_rule_tcp._info, - _security_group_rule_icmp._info, + _security_group_rule_tcp, + _security_group_rule_icmp, ] expected_columns_with_group = ( @@ -535,7 +543,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): columns, data = \ security_group_rule._format_security_group_rule_show( - _security_group_rule._info) + _security_group_rule) def setUp(self): super(TestShowSecurityGroupRuleCompute, self).setUp() @@ -543,7 +551,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): self.app.client_manager.network_endpoint_enabled = False # Build a security group fake customized for this test. - security_group_rules = [self._security_group_rule._info] + security_group_rules = [self._security_group_rule] security_group = {'rules': security_group_rules} self.compute.api.security_group_list = mock.Mock( return_value=[security_group], @@ -558,10 +566,10 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): def test_security_group_rule_show_all_options(self): arglist = [ - self._security_group_rule.id, + self._security_group_rule['id'], ] verifylist = [ - ('rule', self._security_group_rule.id), + ('rule', self._security_group_rule['id']), ] parsed_args = self.check_parser(self.cmd, arglist, verifylist)