Properly handle port arguments for ICMP

The Compute API requires 'from_port' and 'to_port' to be -1 for
ICMP security group rules.  It happily accepts them empty or None
but the resulting rules do not work.  So we force the values for
ICMP rules.

Closes-bug: #1477629
Change-Id: Iba57211014caca16be7c9a28d15d4db2a6c51b8d
This commit is contained in:
Dean Troyer 2015-07-23 15:08:52 -05:00
parent 9210cac86a
commit e6706f2526
2 changed files with 345 additions and 4 deletions

View File

@ -50,10 +50,10 @@ def _xform_security_group_rule(sgroup):
info['ip_range'] = info['ip_range']['cidr']
else:
info['ip_range'] = ''
if info['ip_protocol'] == 'icmp':
info['port_range'] = ''
elif info['ip_protocol'] is None:
if info['ip_protocol'] is None:
info['ip_protocol'] = ''
elif info['ip_protocol'].lower() == 'icmp':
info['port_range'] = ''
return info
@ -307,7 +307,10 @@ class CreateSecurityGroupRule(show.ShowOne):
compute_client.security_groups,
parsed_args.group,
)
from_port, to_port = parsed_args.dst_port
if parsed_args.proto.lower() == 'icmp':
from_port, to_port = -1, -1
else:
from_port, to_port = parsed_args.dst_port
data = compute_client.security_group_rules.create(
group.id,
parsed_args.proto,

View File

@ -0,0 +1,338 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
from openstackclient.compute.v2 import security_group
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests import fakes
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
security_group_id = '11'
security_group_name = 'wide-open'
security_group_description = 'nothing but net'
security_group_rule_id = '1'
SECURITY_GROUP = {
'id': security_group_id,
'name': security_group_name,
'description': security_group_description,
'tenant_id': identity_fakes.project_id,
}
SECURITY_GROUP_RULE = {
'id': security_group_rule_id,
'group': {},
'ip_protocol': 'tcp',
'ip_range': '0.0.0.0/0',
'parent_group_id': security_group_id,
'from_port': 0,
'to_port': 0,
}
SECURITY_GROUP_RULE_ICMP = {
'id': security_group_rule_id,
'group': {},
'ip_protocol': 'icmp',
'ip_range': '0.0.0.0/0',
'parent_group_id': security_group_id,
'from_port': -1,
'to_port': -1,
}
class FakeSecurityGroupRuleResource(fakes.FakeResource):
def get_keys(self):
return {'property': 'value'}
class TestSecurityGroupRule(compute_fakes.TestComputev2):
def setUp(self):
super(TestSecurityGroupRule, self).setUp()
self.secgroups_mock = mock.Mock()
self.secgroups_mock.resource_class = fakes.FakeResource(None, {})
self.app.client_manager.compute.security_groups = self.secgroups_mock
self.secgroups_mock.reset_mock()
self.sg_rules_mock = mock.Mock()
self.sg_rules_mock.resource_class = fakes.FakeResource(None, {})
self.app.client_manager.compute.security_group_rules = \
self.sg_rules_mock
self.sg_rules_mock.reset_mock()
class TestSecurityGroupRuleCreate(TestSecurityGroupRule):
def setUp(self):
super(TestSecurityGroupRuleCreate, self).setUp()
self.secgroups_mock.get.return_value = FakeSecurityGroupRuleResource(
None,
copy.deepcopy(SECURITY_GROUP),
loaded=True,
)
# Get the command object to test
self.cmd = security_group.CreateSecurityGroupRule(self.app, None)
def test_security_group_rule_create_no_options(self):
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
None,
copy.deepcopy(SECURITY_GROUP_RULE),
loaded=True,
)
arglist = [
security_group_name,
]
verifylist = [
('group', security_group_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# SecurityGroupManager.create(name, description)
self.sg_rules_mock.create.assert_called_with(
security_group_id,
'tcp',
0,
0,
'0.0.0.0/0',
)
collist = (
'group',
'id',
'ip_protocol',
'ip_range',
'parent_group_id',
'port_range',
)
self.assertEqual(collist, columns)
datalist = (
{},
security_group_rule_id,
'tcp',
'',
security_group_id,
'0:0',
)
self.assertEqual(datalist, data)
def test_security_group_rule_create_ftp(self):
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
sg_rule['from_port'] = 20
sg_rule['to_port'] = 21
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
None,
sg_rule,
loaded=True,
)
arglist = [
security_group_name,
'--dst-port', '20:21',
]
verifylist = [
('group', security_group_name),
('dst_port', (20, 21)),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# SecurityGroupManager.create(name, description)
self.sg_rules_mock.create.assert_called_with(
security_group_id,
'tcp',
20,
21,
'0.0.0.0/0',
)
collist = (
'group',
'id',
'ip_protocol',
'ip_range',
'parent_group_id',
'port_range',
)
self.assertEqual(collist, columns)
datalist = (
{},
security_group_rule_id,
'tcp',
'',
security_group_id,
'20:21',
)
self.assertEqual(datalist, data)
def test_security_group_rule_create_ssh(self):
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
sg_rule['from_port'] = 22
sg_rule['to_port'] = 22
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
None,
sg_rule,
loaded=True,
)
arglist = [
security_group_name,
'--dst-port', '22',
]
verifylist = [
('group', security_group_name),
('dst_port', (22, 22)),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# SecurityGroupManager.create(name, description)
self.sg_rules_mock.create.assert_called_with(
security_group_id,
'tcp',
22,
22,
'0.0.0.0/0',
)
collist = (
'group',
'id',
'ip_protocol',
'ip_range',
'parent_group_id',
'port_range',
)
self.assertEqual(collist, columns)
datalist = (
{},
security_group_rule_id,
'tcp',
'',
security_group_id,
'22:22',
)
self.assertEqual(datalist, data)
def test_security_group_rule_create_udp(self):
sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
sg_rule['ip_protocol'] = 'udp'
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
None,
sg_rule,
loaded=True,
)
arglist = [
security_group_name,
'--proto', 'udp',
]
verifylist = [
('group', security_group_name),
('proto', 'udp'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# SecurityGroupManager.create(name, description)
self.sg_rules_mock.create.assert_called_with(
security_group_id,
'udp',
0,
0,
'0.0.0.0/0',
)
collist = (
'group',
'id',
'ip_protocol',
'ip_range',
'parent_group_id',
'port_range',
)
self.assertEqual(collist, columns)
datalist = (
{},
security_group_rule_id,
'udp',
'',
security_group_id,
'0:0',
)
self.assertEqual(datalist, data)
def test_security_group_rule_create_icmp(self):
self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
None,
copy.deepcopy(SECURITY_GROUP_RULE_ICMP),
loaded=True,
)
arglist = [
security_group_name,
'--proto', 'ICMP',
]
verifylist = [
('group', security_group_name),
('proto', 'ICMP'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# SecurityGroupManager.create(name, description)
self.sg_rules_mock.create.assert_called_with(
security_group_id,
'ICMP',
-1,
-1,
'0.0.0.0/0',
)
collist = (
'group',
'id',
'ip_protocol',
'ip_range',
'parent_group_id',
'port_range',
)
self.assertEqual(collist, columns)
datalist = (
{},
security_group_rule_id,
'icmp',
'',
security_group_id,
'',
)
self.assertEqual(datalist, data)