Low-level Compute v2 API: security group rules
api.compute.APIv2 security group rule functions. novaclient 8.0 is now released without support for the previously deprecated nova-net functions, so include a new low-level REST implementation of the removed APIs. Change-Id: Ieabd61113bc6d3562738686f52bb06aa84fca765
This commit is contained in:
		| @@ -19,6 +19,12 @@ from osc_lib import exceptions | ||||
| from osc_lib.i18n import _ | ||||
|  | ||||
|  | ||||
| # TODO(dtroyer): Mingrate this to osc-lib | ||||
| class InvalidValue(Exception): | ||||
|     """An argument value is not valid: wrong type, out of range, etc""" | ||||
|     message = "Supplied value is not valid" | ||||
|  | ||||
|  | ||||
| class APIv2(api.BaseAPI): | ||||
|     """Compute v2 API""" | ||||
|  | ||||
| @@ -27,6 +33,29 @@ class APIv2(api.BaseAPI): | ||||
|  | ||||
|     # Overrides | ||||
|  | ||||
|     def _check_integer(self, value, msg=None): | ||||
|         """Attempt to convert value to an integer | ||||
|  | ||||
|         Raises InvalidValue on failure | ||||
|  | ||||
|         :param value: | ||||
|             Convert this to an integer.  None is converted to 0 (zero). | ||||
|         :param msg: | ||||
|             An alternate message for the exception, must include exactly | ||||
|             one substitution to receive the attempted value. | ||||
|         """ | ||||
|  | ||||
|         if value is None: | ||||
|             return 0 | ||||
|  | ||||
|         try: | ||||
|             value = int(value) | ||||
|         except (TypeError, ValueError): | ||||
|             if not msg: | ||||
|                 msg = "%s is not an integer" % value | ||||
|             raise InvalidValue(msg) | ||||
|         return value | ||||
|  | ||||
|     # TODO(dtroyer): Override find() until these fixes get into an osc-lib | ||||
|     #                minimum release | ||||
|     def find( | ||||
| @@ -209,3 +238,71 @@ class APIv2(api.BaseAPI): | ||||
|                 json={'security_group': security_group}, | ||||
|             ).json()['security_group'] | ||||
|         return None | ||||
|  | ||||
|     # Security Group Rules | ||||
|  | ||||
|     def security_group_rule_create( | ||||
|         self, | ||||
|         security_group_id=None, | ||||
|         ip_protocol=None, | ||||
|         from_port=None, | ||||
|         to_port=None, | ||||
|         remote_ip=None, | ||||
|         remote_group=None, | ||||
|     ): | ||||
|         """Create a new security group rule | ||||
|  | ||||
|         https://developer.openstack.org/api-ref/compute/#create-security-group-rule | ||||
|  | ||||
|         :param string security_group_id: | ||||
|             Security group ID | ||||
|         :param ip_protocol: | ||||
|             IP protocol, 'tcp', 'udp' or 'icmp' | ||||
|         :param from_port: | ||||
|             Source port | ||||
|         :param to_port: | ||||
|             Destination port | ||||
|         :param remote_ip: | ||||
|             Source IP address in CIDR notation | ||||
|         :param remote_group: | ||||
|             Remote security group | ||||
|         """ | ||||
|  | ||||
|         url = "/os-security-group-rules" | ||||
|  | ||||
|         if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']: | ||||
|             raise InvalidValue( | ||||
|                 "%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol | ||||
|             ) | ||||
|  | ||||
|         params = { | ||||
|             'parent_group_id': security_group_id, | ||||
|             'ip_protocol': ip_protocol, | ||||
|             'from_port': self._check_integer(from_port), | ||||
|             'to_port': self._check_integer(to_port), | ||||
|             'cidr': remote_ip, | ||||
|             'group_id': remote_group, | ||||
|         } | ||||
|  | ||||
|         return self.create( | ||||
|             url, | ||||
|             json={'security_group_rule': params}, | ||||
|         )['security_group_rule'] | ||||
|  | ||||
|     def security_group_rule_delete( | ||||
|         self, | ||||
|         security_group_rule_id=None, | ||||
|     ): | ||||
|         """Delete a security group rule | ||||
|  | ||||
|         https://developer.openstack.org/api-ref/compute/#delete-security-group-rule | ||||
|  | ||||
|         :param string security_group_rule_id: | ||||
|             Security group rule ID | ||||
|         """ | ||||
|  | ||||
|         url = "/os-security-group-rules" | ||||
|         if security_group_rule_id is not None: | ||||
|             return self.delete('/%s/%s' % (url, security_group_rule_id)) | ||||
|  | ||||
|         return None | ||||
|   | ||||
| @@ -376,15 +376,15 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne): | ||||
|         else: | ||||
|             remote_ip = '0.0.0.0/0' | ||||
|  | ||||
|         obj = client.security_group_rules.create( | ||||
|             group['id'], | ||||
|             protocol, | ||||
|             from_port, | ||||
|             to_port, | ||||
|             remote_ip, | ||||
|             parsed_args.remote_group, | ||||
|         obj = client.api.security_group_rule_create( | ||||
|             security_group_id=group['id'], | ||||
|             ip_protocol=protocol, | ||||
|             from_port=from_port, | ||||
|             to_port=to_port, | ||||
|             remote_ip=remote_ip, | ||||
|             remote_group=parsed_args.remote_group, | ||||
|         ) | ||||
|         return _format_security_group_rule_show(obj._info) | ||||
|         return _format_security_group_rule_show(obj) | ||||
|  | ||||
|  | ||||
| class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): | ||||
| @@ -409,7 +409,7 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): | ||||
|         client.delete_security_group_rule(obj) | ||||
|  | ||||
|     def take_action_compute(self, client, parsed_args): | ||||
|         client.security_group_rules.delete(self.r) | ||||
|         client.api.security_group_rule_delete(self.r) | ||||
|  | ||||
|  | ||||
| class ListSecurityGroupRule(common.NetworkAndComputeLister): | ||||
|   | ||||
| @@ -226,3 +226,84 @@ class TestSecurityGroup(TestComputeAPIv2): | ||||
|             security_group='sg2', | ||||
|             description='desc2') | ||||
|         self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret) | ||||
|  | ||||
|  | ||||
| class TestSecurityGroupRule(TestComputeAPIv2): | ||||
|  | ||||
|     FAKE_SECURITY_GROUP_RULE_RESP = { | ||||
|         'id': '1', | ||||
|         'name': 'sgr1', | ||||
|         'tenant_id': 'proj-1', | ||||
|         'ip_protocol': 'TCP', | ||||
|         'from_port': 1, | ||||
|         'to_port': 22, | ||||
|         'group': {}, | ||||
|         # 'ip_range': , | ||||
|         # 'cidr': , | ||||
|         # 'parent_group_id': , | ||||
|     } | ||||
|  | ||||
|     def test_security_group_create_no_options(self): | ||||
|         self.requests_mock.register_uri( | ||||
|             'POST', | ||||
|             FAKE_URL + '/os-security-group-rules', | ||||
|             json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, | ||||
|             status_code=200, | ||||
|         ) | ||||
|         ret = self.api.security_group_rule_create( | ||||
|             security_group_id='1', | ||||
|             ip_protocol='tcp', | ||||
|         ) | ||||
|         self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) | ||||
|  | ||||
|     def test_security_group_create_options(self): | ||||
|         self.requests_mock.register_uri( | ||||
|             'POST', | ||||
|             FAKE_URL + '/os-security-group-rules', | ||||
|             json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, | ||||
|             status_code=200, | ||||
|         ) | ||||
|         ret = self.api.security_group_rule_create( | ||||
|             security_group_id='1', | ||||
|             ip_protocol='tcp', | ||||
|             from_port=22, | ||||
|             to_port=22, | ||||
|             remote_ip='1.2.3.4/24', | ||||
|         ) | ||||
|         self.assertEqual(self.FAKE_SECURITY_GROUP_RULE_RESP, ret) | ||||
|  | ||||
|     def test_security_group_create_port_errors(self): | ||||
|         self.requests_mock.register_uri( | ||||
|             'POST', | ||||
|             FAKE_URL + '/os-security-group-rules', | ||||
|             json={'security_group_rule': self.FAKE_SECURITY_GROUP_RULE_RESP}, | ||||
|             status_code=200, | ||||
|         ) | ||||
|         self.assertRaises( | ||||
|             compute.InvalidValue, | ||||
|             self.api.security_group_rule_create, | ||||
|             security_group_id='1', | ||||
|             ip_protocol='tcp', | ||||
|             from_port='', | ||||
|             to_port=22, | ||||
|             remote_ip='1.2.3.4/24', | ||||
|         ) | ||||
|         self.assertRaises( | ||||
|             compute.InvalidValue, | ||||
|             self.api.security_group_rule_create, | ||||
|             security_group_id='1', | ||||
|             ip_protocol='tcp', | ||||
|             from_port=0, | ||||
|             to_port=[], | ||||
|             remote_ip='1.2.3.4/24', | ||||
|         ) | ||||
|  | ||||
|     def test_security_group_rule_delete(self): | ||||
|         self.requests_mock.register_uri( | ||||
|             'DELETE', | ||||
|             FAKE_URL + '/os-security-group-rules/1', | ||||
|             status_code=202, | ||||
|         ) | ||||
|         ret = self.api.security_group_rule_delete('1') | ||||
|         self.assertEqual(202, ret.status_code) | ||||
|         self.assertEqual("", ret.text) | ||||
|   | ||||
| @@ -556,10 +556,7 @@ class FakeSecurityGroupRule(object): | ||||
|         # Overwrite default attributes. | ||||
|         security_group_rule_attrs.update(attrs) | ||||
|  | ||||
|         security_group_rule = fakes.FakeResource( | ||||
|             info=copy.deepcopy(security_group_rule_attrs), | ||||
|             loaded=True) | ||||
|         return security_group_rule | ||||
|         return security_group_rule_attrs | ||||
|  | ||||
|     @staticmethod | ||||
|     def create_security_group_rules(attrs=None, count=2): | ||||
|   | ||||
| @@ -356,7 +356,7 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute): | ||||
|     # The security group to be shown. | ||||
|     _security_group = \ | ||||
|         compute_fakes.FakeSecurityGroup.create_one_security_group( | ||||
|             attrs={'rules': [_security_group_rule._info]} | ||||
|             attrs={'rules': [_security_group_rule]} | ||||
|         ) | ||||
|  | ||||
|     columns = ( | ||||
| @@ -373,7 +373,7 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute): | ||||
|         _security_group['name'], | ||||
|         _security_group['tenant_id'], | ||||
|         security_group._format_compute_security_group_rules( | ||||
|             [_security_group_rule._info]), | ||||
|             [_security_group_rule]), | ||||
|     ) | ||||
|  | ||||
|     def setUp(self): | ||||
|   | ||||
| @@ -32,6 +32,9 @@ class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): | ||||
|         self.compute = self.app.client_manager.compute | ||||
|  | ||||
|  | ||||
| @mock.patch( | ||||
|     'openstackclient.api.compute_v2.APIv2.security_group_rule_create' | ||||
| ) | ||||
| class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|     project = identity_fakes.FakeProject.create_one_project() | ||||
| @@ -48,11 +51,9 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self._security_group_rule = \ | ||||
|             compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( | ||||
|                 attrs) | ||||
|         self.compute.security_group_rules.create.return_value = \ | ||||
|             self._security_group_rule | ||||
|         expected_columns, expected_data = \ | ||||
|             security_group_rule._format_security_group_rule_show( | ||||
|                 self._security_group_rule._info) | ||||
|                 self._security_group_rule) | ||||
|         return expected_columns, expected_data | ||||
|  | ||||
|     def setUp(self): | ||||
| @@ -67,11 +68,11 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         # Get the command object to test | ||||
|         self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) | ||||
|  | ||||
|     def test_security_group_rule_create_no_options(self): | ||||
|     def test_security_group_rule_create_no_options(self, sgr_mock): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, [], []) | ||||
|  | ||||
|     def test_security_group_rule_create_all_source_options(self): | ||||
|     def test_security_group_rule_create_all_source_options(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             '--src-ip', '10.10.0.0/24', | ||||
|             '--src-group', self._security_group['id'], | ||||
| @@ -80,7 +81,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, arglist, []) | ||||
|  | ||||
|     def test_security_group_rule_create_all_remote_options(self): | ||||
|     def test_security_group_rule_create_all_remote_options(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             '--remote-ip', '10.10.0.0/24', | ||||
|             '--remote-group', self._security_group['id'], | ||||
| @@ -89,7 +90,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, arglist, []) | ||||
|  | ||||
|     def test_security_group_rule_create_bad_protocol(self): | ||||
|     def test_security_group_rule_create_bad_protocol(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             '--protocol', 'foo', | ||||
|             self._security_group['id'], | ||||
| @@ -97,7 +98,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, arglist, []) | ||||
|  | ||||
|     def test_security_group_rule_create_all_protocol_options(self): | ||||
|     def test_security_group_rule_create_all_protocol_options(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             '--protocol', 'tcp', | ||||
|             '--proto', 'tcp', | ||||
| @@ -106,7 +107,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, arglist, []) | ||||
|  | ||||
|     def test_security_group_rule_create_network_options(self): | ||||
|     def test_security_group_rule_create_network_options(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             '--ingress', | ||||
|             '--ethertype', 'IPv4', | ||||
| @@ -119,17 +120,18 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.assertRaises(tests_utils.ParserException, | ||||
|                           self.check_parser, self.cmd, arglist, []) | ||||
|  | ||||
|     def test_security_group_rule_create_default_rule(self): | ||||
|     def test_security_group_rule_create_default_rule(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule() | ||||
|         dst_port = str(self._security_group_rule.from_port) + ':' + \ | ||||
|             str(self._security_group_rule.to_port) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         dst_port = str(self._security_group_rule['from_port']) + ':' + \ | ||||
|             str(self._security_group_rule['to_port']) | ||||
|         arglist = [ | ||||
|             '--dst-port', dst_port, | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('dst_port', (self._security_group_rule.from_port, | ||||
|                           self._security_group_rule.to_port)), | ||||
|             ('dst_port', (self._security_group_rule['from_port'], | ||||
|                           self._security_group_rule['to_port'])), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
| @@ -138,31 +140,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             None, | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=None, | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|     def test_security_group_rule_create_source_group(self): | ||||
|     def test_security_group_rule_create_source_group(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule({ | ||||
|             'from_port': 22, | ||||
|             'to_port': 22, | ||||
|             'group': {'name': self._security_group['name']}, | ||||
|         }) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         arglist = [ | ||||
|             '--dst-port', str(self._security_group_rule.from_port), | ||||
|             '--dst-port', str(self._security_group_rule['from_port']), | ||||
|             '--src-group', self._security_group['name'], | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('dst_port', (self._security_group_rule.from_port, | ||||
|                           self._security_group_rule.to_port)), | ||||
|             ('dst_port', (self._security_group_rule['from_port'], | ||||
|                           self._security_group_rule['to_port'])), | ||||
|             ('src_group', self._security_group['name']), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
| @@ -172,31 +175,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             self._security_group['id'], | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=self._security_group['id'], | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|     def test_security_group_rule_create_remote_group(self): | ||||
|     def test_security_group_rule_create_remote_group(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule({ | ||||
|             'from_port': 22, | ||||
|             'to_port': 22, | ||||
|             'group': {'name': self._security_group['name']}, | ||||
|         }) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         arglist = [ | ||||
|             '--dst-port', str(self._security_group_rule.from_port), | ||||
|             '--dst-port', str(self._security_group_rule['from_port']), | ||||
|             '--remote-group', self._security_group['name'], | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('dst_port', (self._security_group_rule.from_port, | ||||
|                           self._security_group_rule.to_port)), | ||||
|             ('dst_port', (self._security_group_rule['from_port'], | ||||
|                           self._security_group_rule['to_port'])), | ||||
|             ('remote_group', self._security_group['name']), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
| @@ -206,32 +210,33 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             self._security_group['id'], | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=self._security_group['id'], | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|     def test_security_group_rule_create_source_ip(self): | ||||
|     def test_security_group_rule_create_source_ip(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule({ | ||||
|             'ip_protocol': 'icmp', | ||||
|             'from_port': -1, | ||||
|             'to_port': -1, | ||||
|             'ip_range': {'cidr': '10.0.2.0/24'}, | ||||
|         }) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         arglist = [ | ||||
|             '--protocol', self._security_group_rule.ip_protocol, | ||||
|             '--src-ip', self._security_group_rule.ip_range['cidr'], | ||||
|             '--protocol', self._security_group_rule['ip_protocol'], | ||||
|             '--src-ip', self._security_group_rule['ip_range']['cidr'], | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('protocol', self._security_group_rule.ip_protocol), | ||||
|             ('src_ip', self._security_group_rule.ip_range['cidr']), | ||||
|             ('protocol', self._security_group_rule['ip_protocol']), | ||||
|             ('src_ip', self._security_group_rule['ip_range']['cidr']), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
| @@ -240,32 +245,33 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             None, | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=None, | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|     def test_security_group_rule_create_remote_ip(self): | ||||
|     def test_security_group_rule_create_remote_ip(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule({ | ||||
|             'ip_protocol': 'icmp', | ||||
|             'from_port': -1, | ||||
|             'to_port': -1, | ||||
|             'ip_range': {'cidr': '10.0.2.0/24'}, | ||||
|         }) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         arglist = [ | ||||
|             '--protocol', self._security_group_rule.ip_protocol, | ||||
|             '--remote-ip', self._security_group_rule.ip_range['cidr'], | ||||
|             '--protocol', self._security_group_rule['ip_protocol'], | ||||
|             '--remote-ip', self._security_group_rule['ip_range']['cidr'], | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('protocol', self._security_group_rule.ip_protocol), | ||||
|             ('remote_ip', self._security_group_rule.ip_range['cidr']), | ||||
|             ('protocol', self._security_group_rule['ip_protocol']), | ||||
|             ('remote_ip', self._security_group_rule['ip_range']['cidr']), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
| @@ -274,33 +280,34 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             None, | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=None, | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|     def test_security_group_rule_create_proto_option(self): | ||||
|     def test_security_group_rule_create_proto_option(self, sgr_mock): | ||||
|         expected_columns, expected_data = self._setup_security_group_rule({ | ||||
|             'ip_protocol': 'icmp', | ||||
|             'from_port': -1, | ||||
|             'to_port': -1, | ||||
|             'ip_range': {'cidr': '10.0.2.0/24'}, | ||||
|         }) | ||||
|         sgr_mock.return_value = self._security_group_rule | ||||
|         arglist = [ | ||||
|             '--proto', self._security_group_rule.ip_protocol, | ||||
|             '--src-ip', self._security_group_rule.ip_range['cidr'], | ||||
|             '--proto', self._security_group_rule['ip_protocol'], | ||||
|             '--src-ip', self._security_group_rule['ip_range']['cidr'], | ||||
|             self._security_group['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('proto', self._security_group_rule.ip_protocol), | ||||
|             ('proto', self._security_group_rule['ip_protocol']), | ||||
|             ('protocol', None), | ||||
|             ('src_ip', self._security_group_rule.ip_range['cidr']), | ||||
|             ('src_ip', self._security_group_rule['ip_range']['cidr']), | ||||
|             ('group', self._security_group['id']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
| @@ -309,18 +316,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         # TODO(dtroyer): save this for the security group rule changes | ||||
|         # self.compute.api.security_group_rule_create.assert_called_once_with( | ||||
|         self.compute.security_group_rules.create.assert_called_once_with( | ||||
|             self._security_group['id'], | ||||
|             self._security_group_rule.ip_protocol, | ||||
|             self._security_group_rule.from_port, | ||||
|             self._security_group_rule.to_port, | ||||
|             self._security_group_rule.ip_range['cidr'], | ||||
|             None, | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             security_group_id=self._security_group['id'], | ||||
|             ip_protocol=self._security_group_rule['ip_protocol'], | ||||
|             from_port=self._security_group_rule['from_port'], | ||||
|             to_port=self._security_group_rule['to_port'], | ||||
|             remote_ip=self._security_group_rule['ip_range']['cidr'], | ||||
|             remote_group=None, | ||||
|         ) | ||||
|         self.assertEqual(expected_columns, columns) | ||||
|         self.assertEqual(expected_data, data) | ||||
|  | ||||
|  | ||||
| @mock.patch( | ||||
|     'openstackclient.api.compute_v2.APIv2.security_group_rule_delete' | ||||
| ) | ||||
| class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|     # The security group rule to be deleted. | ||||
| @@ -336,27 +346,27 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         # Get the command object to test | ||||
|         self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) | ||||
|  | ||||
|     def test_security_group_rule_delete(self): | ||||
|     def test_security_group_rule_delete(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             self._security_group_rules[0].id, | ||||
|             self._security_group_rules[0]['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('rule', [self._security_group_rules[0].id]), | ||||
|             ('rule', [self._security_group_rules[0]['id']]), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
|  | ||||
|         result = self.cmd.take_action(parsed_args) | ||||
|  | ||||
|         self.compute.security_group_rules.delete.assert_called_once_with( | ||||
|             self._security_group_rules[0].id) | ||||
|         sgr_mock.assert_called_once_with( | ||||
|             self._security_group_rules[0]['id']) | ||||
|         self.assertIsNone(result) | ||||
|  | ||||
|     def test_security_group_rule_multi_delete(self): | ||||
|     def test_security_group_rule_delete_multi(self, sgr_mock): | ||||
|         arglist = [] | ||||
|         verifylist = [] | ||||
|  | ||||
|         for s in self._security_group_rules: | ||||
|             arglist.append(s.id) | ||||
|             arglist.append(s['id']) | ||||
|         verifylist = [ | ||||
|             ('rule', arglist), | ||||
|         ] | ||||
| @@ -366,25 +376,23 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|         calls = [] | ||||
|         for s in self._security_group_rules: | ||||
|             calls.append(call(s.id)) | ||||
|         self.compute.security_group_rules.delete.assert_has_calls(calls) | ||||
|             calls.append(call(s['id'])) | ||||
|         sgr_mock.assert_has_calls(calls) | ||||
|         self.assertIsNone(result) | ||||
|  | ||||
|     def test_security_group_rule_multi_delete_with_exception(self): | ||||
|     def test_security_group_rule_delete_multi_with_exception(self, sgr_mock): | ||||
|         arglist = [ | ||||
|             self._security_group_rules[0].id, | ||||
|             self._security_group_rules[0]['id'], | ||||
|             'unexist_rule', | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('rule', | ||||
|              [self._security_group_rules[0].id, 'unexist_rule']), | ||||
|              [self._security_group_rules[0]['id'], 'unexist_rule']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
|  | ||||
|         find_mock_result = [None, exceptions.CommandError] | ||||
|         self.compute.security_group_rules.delete = ( | ||||
|             mock.Mock(side_effect=find_mock_result) | ||||
|         ) | ||||
|         sgr_mock.side_effect = find_mock_result | ||||
|  | ||||
|         try: | ||||
|             self.cmd.take_action(parsed_args) | ||||
| @@ -392,9 +400,9 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         except exceptions.CommandError as e: | ||||
|             self.assertEqual('1 of 2 rules failed to delete.', str(e)) | ||||
|  | ||||
|         self.compute.security_group_rules.delete.assert_any_call( | ||||
|             self._security_group_rules[0].id) | ||||
|         self.compute.security_group_rules.delete.assert_any_call( | ||||
|         sgr_mock.assert_any_call( | ||||
|             self._security_group_rules[0]['id']) | ||||
|         sgr_mock.assert_any_call( | ||||
|             'unexist_rule') | ||||
|  | ||||
|  | ||||
| @@ -421,8 +429,8 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|             'group': {'name': _security_group['name']}, | ||||
|         }) | ||||
|     _security_group['rules'] = [ | ||||
|         _security_group_rule_tcp._info, | ||||
|         _security_group_rule_icmp._info, | ||||
|         _security_group_rule_tcp, | ||||
|         _security_group_rule_icmp, | ||||
|     ] | ||||
|  | ||||
|     expected_columns_with_group = ( | ||||
| @@ -535,7 +543,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|     columns, data = \ | ||||
|         security_group_rule._format_security_group_rule_show( | ||||
|             _security_group_rule._info) | ||||
|             _security_group_rule) | ||||
|  | ||||
|     def setUp(self): | ||||
|         super(TestShowSecurityGroupRuleCompute, self).setUp() | ||||
| @@ -543,7 +551,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|         self.app.client_manager.network_endpoint_enabled = False | ||||
|  | ||||
|         # Build a security group fake customized for this test. | ||||
|         security_group_rules = [self._security_group_rule._info] | ||||
|         security_group_rules = [self._security_group_rule] | ||||
|         security_group = {'rules': security_group_rules} | ||||
|         self.compute.api.security_group_list = mock.Mock( | ||||
|             return_value=[security_group], | ||||
| @@ -558,10 +566,10 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): | ||||
|  | ||||
|     def test_security_group_rule_show_all_options(self): | ||||
|         arglist = [ | ||||
|             self._security_group_rule.id, | ||||
|             self._security_group_rule['id'], | ||||
|         ] | ||||
|         verifylist = [ | ||||
|             ('rule', self._security_group_rule.id), | ||||
|             ('rule', self._security_group_rule['id']), | ||||
|         ] | ||||
|         parsed_args = self.check_parser(self.cmd, arglist, verifylist) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Dean Troyer
					Dean Troyer