columns and datalist has been set in each test case in compute, which is not necessary. This patch abstract it out and remove all redundant code. Change-Id: I5e8423722416ea31fdced4c932ed141de90028ab Closes-Bug: #1531816
		
			
				
	
	
		
			485 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			485 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#   not use this file except in compliance with the License. You may obtain
 | 
						|
#   a copy of the License at
 | 
						|
#
 | 
						|
#        http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#   Unless required by applicable law or agreed to in writing, software
 | 
						|
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#   License for the specific language governing permissions and limitations
 | 
						|
#   under the License.
 | 
						|
#
 | 
						|
 | 
						|
import copy
 | 
						|
import mock
 | 
						|
 | 
						|
from openstackclient.compute.v2 import security_group
 | 
						|
from openstackclient.tests.compute.v2 import fakes as compute_fakes
 | 
						|
from openstackclient.tests import fakes
 | 
						|
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
 | 
						|
from openstackclient.tests import utils
 | 
						|
 | 
						|
 | 
						|
security_group_id = '11'
 | 
						|
security_group_name = 'wide-open'
 | 
						|
security_group_description = 'nothing but net'
 | 
						|
 | 
						|
security_group_rule_id = '1'
 | 
						|
security_group_rule_cidr = '0.0.0.0/0'
 | 
						|
 | 
						|
SECURITY_GROUP_RULE = {
 | 
						|
    'id': security_group_rule_id,
 | 
						|
    'group': {},
 | 
						|
    'ip_protocol': 'tcp',
 | 
						|
    'ip_range': {'cidr': security_group_rule_cidr},
 | 
						|
    'parent_group_id': security_group_id,
 | 
						|
    'from_port': 0,
 | 
						|
    'to_port': 0,
 | 
						|
}
 | 
						|
 | 
						|
SECURITY_GROUP_RULE_ICMP = {
 | 
						|
    'id': security_group_rule_id,
 | 
						|
    'group': {},
 | 
						|
    'ip_protocol': 'icmp',
 | 
						|
    'ip_range': {'cidr': security_group_rule_cidr},
 | 
						|
    'parent_group_id': security_group_id,
 | 
						|
    'from_port': -1,
 | 
						|
    'to_port': -1,
 | 
						|
}
 | 
						|
 | 
						|
SECURITY_GROUP_RULE_REMOTE_GROUP = {
 | 
						|
    'id': security_group_rule_id,
 | 
						|
    'group': {"tenant_id": "14", "name": "default"},
 | 
						|
    'ip_protocol': 'tcp',
 | 
						|
    'ip_range': {},
 | 
						|
    'parent_group_id': security_group_id,
 | 
						|
    'from_port': 80,
 | 
						|
    'to_port': 80,
 | 
						|
}
 | 
						|
 | 
						|
SECURITY_GROUP = {
 | 
						|
    'id': security_group_id,
 | 
						|
    'name': security_group_name,
 | 
						|
    'description': security_group_description,
 | 
						|
    'tenant_id': identity_fakes.project_id,
 | 
						|
    'rules': [SECURITY_GROUP_RULE,
 | 
						|
              SECURITY_GROUP_RULE_ICMP,
 | 
						|
              SECURITY_GROUP_RULE_REMOTE_GROUP],
 | 
						|
}
 | 
						|
 | 
						|
security_group_2_id = '12'
 | 
						|
security_group_2_name = 'he-shoots'
 | 
						|
security_group_2_description = 'he scores'
 | 
						|
 | 
						|
SECURITY_GROUP_2_RULE = {
 | 
						|
    'id': '2',
 | 
						|
    'group': {},
 | 
						|
    'ip_protocol': 'tcp',
 | 
						|
    'ip_range': {},
 | 
						|
    'parent_group_id': security_group_2_id,
 | 
						|
    'from_port': 80,
 | 
						|
    'to_port': 80,
 | 
						|
}
 | 
						|
 | 
						|
SECURITY_GROUP_2 = {
 | 
						|
    'id': security_group_2_id,
 | 
						|
    'name': security_group_2_name,
 | 
						|
    'description': security_group_2_description,
 | 
						|
    'tenant_id': identity_fakes.project_id,
 | 
						|
    'rules': [SECURITY_GROUP_2_RULE],
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class FakeSecurityGroupRuleResource(fakes.FakeResource):
 | 
						|
 | 
						|
    def get_keys(self):
 | 
						|
        return {'property': 'value'}
 | 
						|
 | 
						|
 | 
						|
class TestSecurityGroupRule(compute_fakes.TestComputev2):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestSecurityGroupRule, self).setUp()
 | 
						|
 | 
						|
        self.secgroups_mock = mock.Mock()
 | 
						|
        self.secgroups_mock.resource_class = fakes.FakeResource(None, {})
 | 
						|
        self.app.client_manager.compute.security_groups = self.secgroups_mock
 | 
						|
        self.secgroups_mock.reset_mock()
 | 
						|
 | 
						|
        self.sg_rules_mock = mock.Mock()
 | 
						|
        self.sg_rules_mock.resource_class = fakes.FakeResource(None, {})
 | 
						|
        self.app.client_manager.compute.security_group_rules = \
 | 
						|
            self.sg_rules_mock
 | 
						|
        self.sg_rules_mock.reset_mock()
 | 
						|
 | 
						|
 | 
						|
class TestSecurityGroupRuleCreate(TestSecurityGroupRule):
 | 
						|
 | 
						|
    columns = (
 | 
						|
        'id',
 | 
						|
        'ip_protocol',
 | 
						|
        'ip_range',
 | 
						|
        'parent_group_id',
 | 
						|
        'port_range',
 | 
						|
        'remote_security_group',
 | 
						|
    )
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestSecurityGroupRuleCreate, self).setUp()
 | 
						|
 | 
						|
        self.secgroups_mock.get.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            copy.deepcopy(SECURITY_GROUP),
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        # Get the command object to test
 | 
						|
        self.cmd = security_group.CreateSecurityGroupRule(self.app, None)
 | 
						|
 | 
						|
    def test_security_group_rule_create_no_options(self):
 | 
						|
        self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            copy.deepcopy(SECURITY_GROUP_RULE),
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
        ]
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        # SecurityGroupManager.create(name, description)
 | 
						|
        self.sg_rules_mock.create.assert_called_with(
 | 
						|
            security_group_id,
 | 
						|
            'tcp',
 | 
						|
            0,
 | 
						|
            0,
 | 
						|
            security_group_rule_cidr,
 | 
						|
            None,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(self.columns, columns)
 | 
						|
        datalist = (
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            security_group_id,
 | 
						|
            '0:0',
 | 
						|
            '',
 | 
						|
        )
 | 
						|
        self.assertEqual(datalist, data)
 | 
						|
 | 
						|
    def test_security_group_rule_create_ftp(self):
 | 
						|
        sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
 | 
						|
        sg_rule['from_port'] = 20
 | 
						|
        sg_rule['to_port'] = 21
 | 
						|
        self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            sg_rule,
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
            '--dst-port', '20:21',
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
            ('dst_port', (20, 21)),
 | 
						|
        ]
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        # SecurityGroupManager.create(name, description)
 | 
						|
        self.sg_rules_mock.create.assert_called_with(
 | 
						|
            security_group_id,
 | 
						|
            'tcp',
 | 
						|
            20,
 | 
						|
            21,
 | 
						|
            security_group_rule_cidr,
 | 
						|
            None,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(self.columns, columns)
 | 
						|
        datalist = (
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            security_group_id,
 | 
						|
            '20:21',
 | 
						|
            '',
 | 
						|
        )
 | 
						|
        self.assertEqual(datalist, data)
 | 
						|
 | 
						|
    def test_security_group_rule_create_ssh(self):
 | 
						|
        sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
 | 
						|
        sg_rule['from_port'] = 22
 | 
						|
        sg_rule['to_port'] = 22
 | 
						|
        sg_rule['ip_range'] = {}
 | 
						|
        sg_rule['group'] = {'name': security_group_name}
 | 
						|
        self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            sg_rule,
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
            '--dst-port', '22',
 | 
						|
            '--src-group', security_group_id,
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
            ('dst_port', (22, 22)),
 | 
						|
            ('src_group', security_group_id),
 | 
						|
        ]
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        # SecurityGroupManager.create(name, description)
 | 
						|
        self.sg_rules_mock.create.assert_called_with(
 | 
						|
            security_group_id,
 | 
						|
            'tcp',
 | 
						|
            22,
 | 
						|
            22,
 | 
						|
            security_group_rule_cidr,
 | 
						|
            security_group_id,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(self.columns, columns)
 | 
						|
        datalist = (
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            '',
 | 
						|
            security_group_id,
 | 
						|
            '22:22',
 | 
						|
            security_group_name,
 | 
						|
        )
 | 
						|
        self.assertEqual(datalist, data)
 | 
						|
 | 
						|
    def test_security_group_rule_create_udp(self):
 | 
						|
        sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
 | 
						|
        sg_rule['ip_protocol'] = 'udp'
 | 
						|
        self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            sg_rule,
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
            '--proto', 'udp',
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
            ('proto', 'udp'),
 | 
						|
        ]
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        # SecurityGroupManager.create(name, description)
 | 
						|
        self.sg_rules_mock.create.assert_called_with(
 | 
						|
            security_group_id,
 | 
						|
            'udp',
 | 
						|
            0,
 | 
						|
            0,
 | 
						|
            security_group_rule_cidr,
 | 
						|
            None,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(self.columns, columns)
 | 
						|
        datalist = (
 | 
						|
            security_group_rule_id,
 | 
						|
            'udp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            security_group_id,
 | 
						|
            '0:0',
 | 
						|
            '',
 | 
						|
        )
 | 
						|
        self.assertEqual(datalist, data)
 | 
						|
 | 
						|
    def test_security_group_rule_create_icmp(self):
 | 
						|
        sg_rule_cidr = '10.0.2.0/24'
 | 
						|
        sg_rule = copy.deepcopy(SECURITY_GROUP_RULE_ICMP)
 | 
						|
        sg_rule['ip_range'] = {'cidr': sg_rule_cidr}
 | 
						|
        self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            sg_rule,
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
            '--proto', 'ICMP',
 | 
						|
            '--src-ip', sg_rule_cidr,
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
            ('proto', 'ICMP'),
 | 
						|
            ('src_ip', sg_rule_cidr)
 | 
						|
        ]
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        # SecurityGroupManager.create(name, description)
 | 
						|
        self.sg_rules_mock.create.assert_called_with(
 | 
						|
            security_group_id,
 | 
						|
            'ICMP',
 | 
						|
            -1,
 | 
						|
            -1,
 | 
						|
            sg_rule_cidr,
 | 
						|
            None,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(self.columns, columns)
 | 
						|
        datalist = (
 | 
						|
            security_group_rule_id,
 | 
						|
            'icmp',
 | 
						|
            sg_rule_cidr,
 | 
						|
            security_group_id,
 | 
						|
            '',
 | 
						|
            '',
 | 
						|
        )
 | 
						|
        self.assertEqual(datalist, data)
 | 
						|
 | 
						|
    def test_security_group_rule_create_src_invalid(self):
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
            '--proto', 'ICMP',
 | 
						|
            '--src-ip', security_group_rule_cidr,
 | 
						|
            '--src-group', security_group_id,
 | 
						|
        ]
 | 
						|
 | 
						|
        self.assertRaises(utils.ParserException,
 | 
						|
                          self.check_parser, self.cmd, arglist, [])
 | 
						|
 | 
						|
 | 
						|
class TestSecurityGroupRuleList(TestSecurityGroupRule):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(TestSecurityGroupRuleList, self).setUp()
 | 
						|
 | 
						|
        security_group_mock = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            copy.deepcopy(SECURITY_GROUP),
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        security_group_2_mock = FakeSecurityGroupRuleResource(
 | 
						|
            None,
 | 
						|
            copy.deepcopy(SECURITY_GROUP_2),
 | 
						|
            loaded=True,
 | 
						|
        )
 | 
						|
 | 
						|
        self.secgroups_mock.get.return_value = security_group_mock
 | 
						|
        self.secgroups_mock.list.return_value = [security_group_mock,
 | 
						|
                                                 security_group_2_mock]
 | 
						|
 | 
						|
        # Get the command object to test
 | 
						|
        self.cmd = security_group.ListSecurityGroupRule(self.app, None)
 | 
						|
 | 
						|
    def test_security_group_rule_list(self):
 | 
						|
 | 
						|
        arglist = [
 | 
						|
            security_group_name,
 | 
						|
        ]
 | 
						|
        verifylist = [
 | 
						|
            ('group', security_group_name),
 | 
						|
        ]
 | 
						|
 | 
						|
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        collist = (
 | 
						|
            'ID',
 | 
						|
            'IP Protocol',
 | 
						|
            'IP Range',
 | 
						|
            'Port Range',
 | 
						|
            'Remote Security Group',
 | 
						|
        )
 | 
						|
        self.assertEqual(collist, columns)
 | 
						|
        datalist = ((
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            '0:0',
 | 
						|
            '',
 | 
						|
        ), (
 | 
						|
            security_group_rule_id,
 | 
						|
            'icmp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            '',
 | 
						|
            '',
 | 
						|
        ), (
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            '',
 | 
						|
            '80:80',
 | 
						|
            'default',
 | 
						|
        ),)
 | 
						|
        self.assertEqual(datalist, tuple(data))
 | 
						|
 | 
						|
    def test_security_group_rule_list_no_group(self):
 | 
						|
 | 
						|
        parsed_args = self.check_parser(self.cmd, [], [])
 | 
						|
 | 
						|
        # DisplayCommandBase.take_action() returns two tuples
 | 
						|
        columns, data = self.cmd.take_action(parsed_args)
 | 
						|
 | 
						|
        collist = (
 | 
						|
            'ID',
 | 
						|
            'IP Protocol',
 | 
						|
            'IP Range',
 | 
						|
            'Port Range',
 | 
						|
            'Remote Security Group',
 | 
						|
            'Security Group',
 | 
						|
        )
 | 
						|
        self.assertEqual(collist, columns)
 | 
						|
        datalist = ((
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            '0:0',
 | 
						|
            '',
 | 
						|
            security_group_id,
 | 
						|
        ), (
 | 
						|
            security_group_rule_id,
 | 
						|
            'icmp',
 | 
						|
            security_group_rule_cidr,
 | 
						|
            '',
 | 
						|
            '',
 | 
						|
            security_group_id,
 | 
						|
        ), (
 | 
						|
            security_group_rule_id,
 | 
						|
            'tcp',
 | 
						|
            '',
 | 
						|
            '80:80',
 | 
						|
            'default',
 | 
						|
            security_group_id,
 | 
						|
        ), (
 | 
						|
            '2',
 | 
						|
            'tcp',
 | 
						|
            '',
 | 
						|
            '80:80',
 | 
						|
            '',
 | 
						|
            security_group_2_id,
 | 
						|
        ),)
 | 
						|
        self.assertEqual(datalist, tuple(data))
 |