diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py new file mode 100644 index 0000000000..2fd441888f --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py @@ -0,0 +1,403 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network.v2 import security_group +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupCompute, self).setUp() + + # Get a shortcut to the compute client + self.compute = self.app.client_manager.compute + + +class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.tenant_id, + '', + ) + + def setUp(self): + super(TestCreateSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.create.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group.CreateSecurityGroup(self.app, None) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_network_options(self): + arglist = [ + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group.name, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_min_options(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('name', self._security_group.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.create.assert_called_once_with( + self._security_group.name, + self._security_group.name) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_all_options(self): + arglist = [ + '--description', self._security_group.description, + self._security_group.name, + ] + verifylist = [ + ('description', self._security_group.description), + ('name', self._security_group.name), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.create.assert_called_once_with( + self._security_group.name, + self._security_group.description) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): + + # The security groups to be deleted. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups() + + def setUp(self): + super(TestDeleteSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.delete = mock.Mock(return_value=None) + + self.compute.security_groups.get = ( + compute_fakes.FakeSecurityGroup.get_security_groups( + self._security_groups) + ) + + # Get the command object to test + self.cmd = security_group.DeleteSecurityGroup(self.app, None) + + def test_security_group_delete(self): + arglist = [ + self._security_groups[0].id, + ] + verifylist = [ + ('group', [self._security_groups[0].id]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.delete.assert_called_once_with( + self._security_groups[0].id) + self.assertIsNone(result) + + def test_multi_security_groups_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_groups: + arglist.append(s.id) + verifylist = [ + ('group', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_groups: + calls.append(call(s.id)) + self.compute.security_groups.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_groups_delete_with_exception(self): + arglist = [ + self._security_groups[0].id, + 'unexist_security_group', + ] + verifylist = [ + ('group', + [self._security_groups[0].id, 'unexist_security_group']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self._security_groups[0], exceptions.CommandError] + self.compute.security_groups.get = ( + mock.Mock(side_effect=find_mock_result) + ) + self.compute.security_groups.find.side_effect = ( + exceptions.NotFound(None)) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 groups failed to delete.', str(e)) + + self.compute.security_groups.get.assert_any_call( + self._security_groups[0].id) + self.compute.security_groups.get.assert_any_call( + 'unexist_security_group') + self.compute.security_groups.delete.assert_called_once_with( + self._security_groups[0].id + ) + + +class TestListSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be listed. + _security_groups = \ + compute_fakes.FakeSecurityGroup.create_security_groups(count=3) + + columns = ( + 'ID', + 'Name', + 'Description', + ) + columns_all_projects = ( + 'ID', + 'Name', + 'Description', + 'Project', + ) + + data = [] + for grp in _security_groups: + data.append(( + grp.id, + grp.name, + grp.description, + )) + data_all_projects = [] + for grp in _security_groups: + data_all_projects.append(( + grp.id, + grp.name, + grp.description, + grp.tenant_id, + )) + + def setUp(self): + super(TestListSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + self.compute.security_groups.list.return_value = self._security_groups + + # Get the command object to test + self.cmd = security_group.ListSecurityGroup(self.app, None) + + def test_security_group_list_no_options(self): + arglist = [] + verifylist = [ + ('all_projects', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': False}} + self.compute.security_groups.list.assert_called_once_with(**kwargs) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_security_group_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + kwargs = {'search_opts': {'all_tenants': True}} + self.compute.security_groups.list.assert_called_once_with(**kwargs) + self.assertEqual(self.columns_all_projects, columns) + self.assertEqual(self.data_all_projects, list(data)) + + +class TestSetSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group to be set. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def setUp(self): + super(TestSetSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.update = mock.Mock(return_value=None) + + self.compute.security_groups.get = mock.Mock( + return_value=self._security_group) + + # Get the command object to test + self.cmd = security_group.SetSecurityGroup(self.app, None) + + def test_set_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_set_no_updates(self): + arglist = [ + self._security_group.name, + ] + verifylist = [ + ('group', self._security_group.name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.update.assert_called_once_with( + self._security_group, + self._security_group.name, + self._security_group.description + ) + self.assertIsNone(result) + + def test_set_all_options(self): + new_name = 'new-' + self._security_group.name + new_description = 'new-' + self._security_group.description + arglist = [ + '--name', new_name, + '--description', new_description, + self._security_group.name, + ] + verifylist = [ + ('description', new_description), + ('group', self._security_group.name), + ('name', new_name), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.compute.security_groups.update.assert_called_once_with( + self._security_group, + new_name, + new_description + ) + self.assertIsNone(result) + + +class TestShowSecurityGroupCompute(TestSecurityGroupCompute): + + # The security group rule to be shown with the group. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + # The security group to be shown. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group( + attrs={'rules': [_security_group_rule._info]} + ) + + columns = ( + 'description', + 'id', + 'name', + 'project_id', + 'rules', + ) + + data = ( + _security_group.description, + _security_group.id, + _security_group.name, + _security_group.tenant_id, + security_group._format_compute_security_group_rules( + [_security_group_rule._info]), + ) + + def setUp(self): + super(TestShowSecurityGroupCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group.ShowSecurityGroup(self.app, None) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group.id, + ] + verifylist = [ + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.get.assert_called_once_with( + self._security_group.id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group.py b/openstackclient/tests/unit/network/v2/test_security_group_network.py similarity index 53% rename from openstackclient/tests/unit/network/v2/test_security_group.py rename to openstackclient/tests/unit/network/v2/test_security_group_network.py index 66d357f98a..35b7e366d6 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_network.py @@ -17,7 +17,6 @@ from mock import call from osc_lib import exceptions from openstackclient.network.v2 import security_group -from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit import utils as tests_utils @@ -36,15 +35,6 @@ class TestSecurityGroupNetwork(network_fakes.TestNetworkV2): self.domains_mock = self.app.client_manager.identity.domains -class TestSecurityGroupCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupCompute, self).setUp() - - # Get a shortcut to the compute client - self.compute = self.app.client_manager.compute - - class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): project = identity_fakes.FakeProject.create_one_project() @@ -129,90 +119,6 @@ class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertEqual(self.data, data) -class TestCreateSecurityGroupCompute(TestSecurityGroupCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - '', - ) - - def setUp(self): - super(TestCreateSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.create.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.CreateSecurityGroup(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_network_options(self): - arglist = [ - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.name, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_min_options(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.name) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - def test_create_all_options(self): - arglist = [ - '--description', self._security_group.description, - self._security_group.name, - ] - verifylist = [ - ('description', self._security_group.description), - ('name', self._security_group.name), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.create.assert_called_once_with( - self._security_group.name, - self._security_group.description) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) - - class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): # The security groups to be deleted. @@ -297,94 +203,6 @@ class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork): ) -class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute): - - # The security groups to be deleted. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups() - - def setUp(self): - super(TestDeleteSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.delete = mock.Mock(return_value=None) - - self.compute.security_groups.get = ( - compute_fakes.FakeSecurityGroup.get_security_groups( - self._security_groups) - ) - - # Get the command object to test - self.cmd = security_group.DeleteSecurityGroup(self.app, None) - - def test_security_group_delete(self): - arglist = [ - self._security_groups[0].id, - ] - verifylist = [ - ('group', [self._security_groups[0].id]), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id) - self.assertIsNone(result) - - def test_multi_security_groups_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_groups: - arglist.append(s.id) - verifylist = [ - ('group', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_groups: - calls.append(call(s.id)) - self.compute.security_groups.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_groups_delete_with_exception(self): - arglist = [ - self._security_groups[0].id, - 'unexist_security_group', - ] - verifylist = [ - ('group', - [self._security_groups[0].id, 'unexist_security_group']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [self._security_groups[0], exceptions.CommandError] - self.compute.security_groups.get = ( - mock.Mock(side_effect=find_mock_result) - ) - self.compute.security_groups.find.side_effect = ( - exceptions.NotFound(None)) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 groups failed to delete.', str(e)) - - self.compute.security_groups.get.assert_any_call( - self._security_groups[0].id) - self.compute.security_groups.get.assert_any_call( - 'unexist_security_group') - self.compute.security_groups.delete.assert_called_once_with( - self._security_groups[0].id - ) - - class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group to be listed. @@ -483,80 +301,6 @@ class TestListSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertEqual(self.data, list(data)) -class TestListSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be listed. - _security_groups = \ - compute_fakes.FakeSecurityGroup.create_security_groups(count=3) - - columns = ( - 'ID', - 'Name', - 'Description', - ) - columns_all_projects = ( - 'ID', - 'Name', - 'Description', - 'Project', - ) - - data = [] - for grp in _security_groups: - data.append(( - grp.id, - grp.name, - grp.description, - )) - data_all_projects = [] - for grp in _security_groups: - data_all_projects.append(( - grp.id, - grp.name, - grp.description, - grp.tenant_id, - )) - - def setUp(self): - super(TestListSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - self.compute.security_groups.list.return_value = self._security_groups - - # Get the command object to test - self.cmd = security_group.ListSecurityGroup(self.app, None) - - def test_security_group_list_no_options(self): - arglist = [] - verifylist = [ - ('all_projects', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': False}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, list(data)) - - def test_security_group_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - kwargs = {'search_opts': {'all_tenants': True}} - self.compute.security_groups.list.assert_called_once_with(**kwargs) - self.assertEqual(self.columns_all_projects, columns) - self.assertEqual(self.data_all_projects, list(data)) - - class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group to be set. @@ -623,72 +367,6 @@ class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork): self.assertIsNone(result) -class TestSetSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group to be set. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def setUp(self): - super(TestSetSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.update = mock.Mock(return_value=None) - - self.compute.security_groups.get = mock.Mock( - return_value=self._security_group) - - # Get the command object to test - self.cmd = security_group.SetSecurityGroup(self.app, None) - - def test_set_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_set_no_updates(self): - arglist = [ - self._security_group.name, - ] - verifylist = [ - ('group', self._security_group.name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - self._security_group.name, - self._security_group.description - ) - self.assertIsNone(result) - - def test_set_all_options(self): - new_name = 'new-' + self._security_group.name - new_description = 'new-' + self._security_group.description - arglist = [ - '--name', new_name, - '--description', new_description, - self._security_group.name, - ] - verifylist = [ - ('description', new_description), - ('group', self._security_group.name), - ('name', new_name), - ] - - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - result = self.cmd.take_action(parsed_args) - - self.compute.security_groups.update.assert_called_once_with( - self._security_group, - new_name, - new_description - ) - self.assertIsNone(result) - - class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): # The security group rule to be shown with the group. @@ -746,63 +424,3 @@ class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork): self._security_group.id, ignore_missing=False) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) - - -class TestShowSecurityGroupCompute(TestSecurityGroupCompute): - - # The security group rule to be shown with the group. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - # The security group to be shown. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group( - attrs={'rules': [_security_group_rule._info]} - ) - - columns = ( - 'description', - 'id', - 'name', - 'project_id', - 'rules', - ) - - data = ( - _security_group.description, - _security_group.id, - _security_group.name, - _security_group.tenant_id, - security_group._format_compute_security_group_rules( - [_security_group_rule._info]), - ) - - def setUp(self): - super(TestShowSecurityGroupCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group.ShowSecurityGroup(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id) - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py new file mode 100644 index 0000000000..7833c0d932 --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py @@ -0,0 +1,556 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import copy +import mock +from mock import call + +from osc_lib import exceptions + +from openstackclient.network import utils as network_utils +from openstackclient.network.v2 import security_group_rule +from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes +from openstackclient.tests.unit import fakes +from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): + + def setUp(self): + super(TestSecurityGroupRuleCompute, self).setUp() + + # Get a shortcut to the network client + self.compute = self.app.client_manager.compute + + +class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + project = identity_fakes.FakeProject.create_one_project() + domain = identity_fakes.FakeDomain.create_one_domain() + # The security group rule to be created. + _security_group_rule = None + + # The security group that will contain the rule created. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + def _setup_security_group_rule(self, attrs=None): + self._security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( + attrs) + self.compute.security_group_rules.create.return_value = \ + self._security_group_rule + expected_columns, expected_data = \ + security_group_rule._format_security_group_rule_show( + self._security_group_rule._info) + return expected_columns, expected_data + + def setUp(self): + super(TestCreateSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = self._security_group + + # Get the command object to test + self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) + + def test_create_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_create_all_source_options(self): + arglist = [ + '--src-ip', '10.10.0.0/24', + '--src-group', self._security_group.id, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_all_remote_options(self): + arglist = [ + '--remote-ip', '10.10.0.0/24', + '--remote-group', self._security_group.id, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_bad_protocol(self): + arglist = [ + '--protocol', 'foo', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_all_protocol_options(self): + arglist = [ + '--protocol', 'tcp', + '--proto', 'tcp', + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_network_options(self): + arglist = [ + '--ingress', + '--ethertype', 'IPv4', + '--icmp-type', '3', + '--icmp-code', '11', + '--project', self.project.name, + '--project-domain', self.domain.name, + self._security_group.id, + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, []) + + def test_create_default_rule(self): + expected_columns, expected_data = self._setup_security_group_rule() + dst_port = str(self._security_group_rule.from_port) + ':' + \ + str(self._security_group_rule.to_port) + arglist = [ + '--dst-port', dst_port, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_source_group(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'from_port': 22, + 'to_port': 22, + 'group': {'name': self._security_group.name}, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.from_port), + '--src-group', self._security_group.name, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('src_group', self._security_group.name), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_remote_group(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'from_port': 22, + 'to_port': 22, + 'group': {'name': self._security_group.name}, + }) + arglist = [ + '--dst-port', str(self._security_group_rule.from_port), + '--remote-group', self._security_group.name, + self._security_group.id, + ] + verifylist = [ + ('dst_port', (self._security_group_rule.from_port, + self._security_group_rule.to_port)), + ('remote_group', self._security_group.name), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_source_ip(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_remote_ip(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--protocol', self._security_group_rule.ip_protocol, + '--remote-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('protocol', self._security_group_rule.ip_protocol), + ('remote_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + def test_create_proto_option(self): + expected_columns, expected_data = self._setup_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + }) + arglist = [ + '--proto', self._security_group_rule.ip_protocol, + '--src-ip', self._security_group_rule.ip_range['cidr'], + self._security_group.id, + ] + verifylist = [ + ('proto', self._security_group_rule.ip_protocol), + ('protocol', None), + ('src_ip', self._security_group_rule.ip_range['cidr']), + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.create.assert_called_once_with( + self._security_group.id, + self._security_group_rule.ip_protocol, + self._security_group_rule.from_port, + self._security_group_rule.to_port, + self._security_group_rule.ip_range['cidr'], + None, + ) + self.assertEqual(expected_columns, columns) + self.assertEqual(expected_data, data) + + +class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be deleted. + _security_group_rules = \ + compute_fakes.FakeSecurityGroupRule.create_security_group_rules( + count=2) + + def setUp(self): + super(TestDeleteSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Get the command object to test + self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) + + def test_security_group_rule_delete(self): + arglist = [ + self._security_group_rules[0].id, + ] + verifylist = [ + ('rule', [self._security_group_rules[0].id]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.compute.security_group_rules.delete.assert_called_once_with( + self._security_group_rules[0].id) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete(self): + arglist = [] + verifylist = [] + + for s in self._security_group_rules: + arglist.append(s.id) + verifylist = [ + ('rule', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._security_group_rules: + calls.append(call(s.id)) + self.compute.security_group_rules.delete.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_security_group_rules_delete_with_exception(self): + arglist = [ + self._security_group_rules[0].id, + 'unexist_rule', + ] + verifylist = [ + ('rule', + [self._security_group_rules[0].id, 'unexist_rule']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [None, exceptions.CommandError] + self.compute.security_group_rules.delete = ( + mock.Mock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 rules failed to delete.', str(e)) + + self.compute.security_group_rules.delete.assert_any_call( + self._security_group_rules[0].id) + self.compute.security_group_rules.delete.assert_any_call( + 'unexist_rule') + + +class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group to hold the rules. + _security_group = \ + compute_fakes.FakeSecurityGroup.create_one_security_group() + + # The security group rule to be listed. + _security_group_rule_tcp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'tcp', + 'from_port': 80, + 'to_port': 80, + 'group': {'name': _security_group.name}, + }) + _security_group_rule_icmp = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ + 'ip_protocol': 'icmp', + 'from_port': -1, + 'to_port': -1, + 'ip_range': {'cidr': '10.0.2.0/24'}, + 'group': {'name': _security_group.name}, + }) + _security_group.rules = [_security_group_rule_tcp._info, + _security_group_rule_icmp._info] + + expected_columns_with_group = ( + 'ID', + 'IP Protocol', + 'IP Range', + 'Port Range', + 'Remote Security Group', + ) + expected_columns_no_group = \ + expected_columns_with_group + ('Security Group',) + + expected_data_with_group = [] + expected_data_no_group = [] + for _security_group_rule in _security_group.rules: + rule = network_utils.transform_compute_security_group_rule( + _security_group_rule + ) + expected_rule_with_group = ( + rule['id'], + rule['ip_protocol'], + rule['ip_range'], + rule['port_range'], + rule['remote_security_group'], + ) + expected_rule_no_group = expected_rule_with_group + \ + (_security_group_rule['parent_group_id'],) + expected_data_with_group.append(expected_rule_with_group) + expected_data_no_group.append(expected_rule_no_group) + + def setUp(self): + super(TestListSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + self.compute.security_groups.get.return_value = \ + self._security_group + self.compute.security_groups.list.return_value = \ + [self._security_group] + + # Get the command object to test + self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) + + def test_list_default(self): + parsed_args = self.check_parser(self.cmd, [], []) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_list_with_group(self): + arglist = [ + self._security_group.id, + ] + verifylist = [ + ('group', self._security_group.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.get.assert_called_once_with( + self._security_group.id + ) + self.assertEqual(self.expected_columns_with_group, columns) + self.assertEqual(self.expected_data_with_group, list(data)) + + def test_list_all_projects(self): + arglist = [ + '--all-projects', + ] + verifylist = [ + ('all_projects', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': True} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + def test_list_with_ignored_options(self): + arglist = [ + '--long', + ] + verifylist = [ + ('long', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.compute.security_groups.list.assert_called_once_with( + search_opts={'all_tenants': False} + ) + self.assertEqual(self.expected_columns_no_group, columns) + self.assertEqual(self.expected_data_no_group, list(data)) + + +class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): + + # The security group rule to be shown. + _security_group_rule = \ + compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() + + columns, data = \ + security_group_rule._format_security_group_rule_show( + _security_group_rule._info) + + def setUp(self): + super(TestShowSecurityGroupRuleCompute, self).setUp() + + self.app.client_manager.network_endpoint_enabled = False + + # Build a security group fake customized for this test. + security_group_rules = [self._security_group_rule._info] + security_group = fakes.FakeResource( + info=copy.deepcopy({'rules': security_group_rules}), + loaded=True) + security_group.rules = security_group_rules + self.compute.security_groups.list.return_value = [security_group] + + # Get the command object to test + self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) + + def test_show_no_options(self): + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, [], []) + + def test_show_all_options(self): + arglist = [ + self._security_group_rule.id, + ] + verifylist = [ + ('rule', self._security_group_rule.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.compute.security_groups.list.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py similarity index 62% rename from openstackclient/tests/unit/network/v2/test_security_group_rule.py rename to openstackclient/tests/unit/network/v2/test_security_group_rule_network.py index e3538d5f4a..5d9d03e9c7 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py @@ -11,16 +11,12 @@ # under the License. # -import copy import mock from mock import call from osc_lib import exceptions -from openstackclient.network import utils as network_utils from openstackclient.network.v2 import security_group_rule -from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes -from openstackclient.tests.unit import fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit import utils as tests_utils @@ -39,15 +35,6 @@ class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2): self.domains_mock = self.app.client_manager.identity.domains -class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2): - - def setUp(self): - super(TestSecurityGroupRuleCompute, self).setUp() - - # Get a shortcut to the network client - self.compute = self.app.client_manager.compute - - class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): project = identity_fakes.FakeProject.create_one_project() @@ -548,280 +535,6 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_data, data) -class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - project = identity_fakes.FakeProject.create_one_project() - domain = identity_fakes.FakeDomain.create_one_domain() - # The security group rule to be created. - _security_group_rule = None - - # The security group that will contain the rule created. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - def _setup_security_group_rule(self, attrs=None): - self._security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule( - attrs) - self.compute.security_group_rules.create.return_value = \ - self._security_group_rule - expected_columns, expected_data = \ - security_group_rule._format_security_group_rule_show( - self._security_group_rule._info) - return expected_columns, expected_data - - def setUp(self): - super(TestCreateSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = self._security_group - - # Get the command object to test - self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None) - - def test_create_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_create_all_source_options(self): - arglist = [ - '--src-ip', '10.10.0.0/24', - '--src-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_remote_options(self): - arglist = [ - '--remote-ip', '10.10.0.0/24', - '--remote-group', self._security_group.id, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_bad_protocol(self): - arglist = [ - '--protocol', 'foo', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_all_protocol_options(self): - arglist = [ - '--protocol', 'tcp', - '--proto', 'tcp', - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_network_options(self): - arglist = [ - '--ingress', - '--ethertype', 'IPv4', - '--icmp-type', '3', - '--icmp-code', '11', - '--project', self.project.name, - '--project-domain', self.domain.name, - self._security_group.id, - ] - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, arglist, []) - - def test_create_default_rule(self): - expected_columns, expected_data = self._setup_security_group_rule() - dst_port = str(self._security_group_rule.from_port) + ':' + \ - str(self._security_group_rule.to_port) - arglist = [ - '--dst-port', dst_port, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--src-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('src_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_remote_group(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'from_port': 22, - 'to_port': 22, - 'group': {'name': self._security_group.name}, - }) - arglist = [ - '--dst-port', str(self._security_group_rule.from_port), - '--remote-group', self._security_group.name, - self._security_group.id, - ] - verifylist = [ - ('dst_port', (self._security_group_rule.from_port, - self._security_group_rule.to_port)), - ('remote_group', self._security_group.name), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_source_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_remote_ip(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--protocol', self._security_group_rule.ip_protocol, - '--remote-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('protocol', self._security_group_rule.ip_protocol), - ('remote_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - def test_create_proto_option(self): - expected_columns, expected_data = self._setup_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - }) - arglist = [ - '--proto', self._security_group_rule.ip_protocol, - '--src-ip', self._security_group_rule.ip_range['cidr'], - self._security_group.id, - ] - verifylist = [ - ('proto', self._security_group_rule.ip_protocol), - ('protocol', None), - ('src_ip', self._security_group_rule.ip_range['cidr']), - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.create.assert_called_once_with( - self._security_group.id, - self._security_group_rule.ip_protocol, - self._security_group_rule.from_port, - self._security_group_rule.to_port, - self._security_group_rule.ip_range['cidr'], - None, - ) - self.assertEqual(expected_columns, columns) - self.assertEqual(expected_data, data) - - class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group rules to be deleted. @@ -909,83 +622,6 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): ) -class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be deleted. - _security_group_rules = \ - compute_fakes.FakeSecurityGroupRule.create_security_group_rules( - count=2) - - def setUp(self): - super(TestDeleteSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Get the command object to test - self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None) - - def test_security_group_rule_delete(self): - arglist = [ - self._security_group_rules[0].id, - ] - verifylist = [ - ('rule', [self._security_group_rules[0].id]), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - self.compute.security_group_rules.delete.assert_called_once_with( - self._security_group_rules[0].id) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete(self): - arglist = [] - verifylist = [] - - for s in self._security_group_rules: - arglist.append(s.id) - verifylist = [ - ('rule', arglist), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - result = self.cmd.take_action(parsed_args) - - calls = [] - for s in self._security_group_rules: - calls.append(call(s.id)) - self.compute.security_group_rules.delete.assert_has_calls(calls) - self.assertIsNone(result) - - def test_multi_security_group_rules_delete_with_exception(self): - arglist = [ - self._security_group_rules[0].id, - 'unexist_rule', - ] - verifylist = [ - ('rule', - [self._security_group_rules[0].id, 'unexist_rule']), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - find_mock_result = [None, exceptions.CommandError] - self.compute.security_group_rules.delete = ( - mock.Mock(side_effect=find_mock_result) - ) - - try: - self.cmd.take_action(parsed_args) - self.fail('CommandError should be raised.') - except exceptions.CommandError as e: - self.assertEqual('1 of 2 rules failed to delete.', str(e)) - - self.compute.security_group_rules.delete.assert_any_call( - self._security_group_rules[0].id) - self.compute.security_group_rules.delete.assert_any_call( - 'unexist_rule') - - class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group to hold the rules. @@ -1165,131 +801,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self.assertEqual(self.expected_data_no_group, list(data)) -class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group to hold the rules. - _security_group = \ - compute_fakes.FakeSecurityGroup.create_one_security_group() - - # The security group rule to be listed. - _security_group_rule_tcp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'tcp', - 'from_port': 80, - 'to_port': 80, - 'group': {'name': _security_group.name}, - }) - _security_group_rule_icmp = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({ - 'ip_protocol': 'icmp', - 'from_port': -1, - 'to_port': -1, - 'ip_range': {'cidr': '10.0.2.0/24'}, - 'group': {'name': _security_group.name}, - }) - _security_group.rules = [_security_group_rule_tcp._info, - _security_group_rule_icmp._info] - - expected_columns_with_group = ( - 'ID', - 'IP Protocol', - 'IP Range', - 'Port Range', - 'Remote Security Group', - ) - expected_columns_no_group = \ - expected_columns_with_group + ('Security Group',) - - expected_data_with_group = [] - expected_data_no_group = [] - for _security_group_rule in _security_group.rules: - rule = network_utils.transform_compute_security_group_rule( - _security_group_rule - ) - expected_rule_with_group = ( - rule['id'], - rule['ip_protocol'], - rule['ip_range'], - rule['port_range'], - rule['remote_security_group'], - ) - expected_rule_no_group = expected_rule_with_group + \ - (_security_group_rule['parent_group_id'],) - expected_data_with_group.append(expected_rule_with_group) - expected_data_no_group.append(expected_rule_no_group) - - def setUp(self): - super(TestListSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - self.compute.security_groups.get.return_value = \ - self._security_group - self.compute.security_groups.list.return_value = \ - [self._security_group] - - # Get the command object to test - self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None) - - def test_list_default(self): - parsed_args = self.check_parser(self.cmd, [], []) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_group(self): - arglist = [ - self._security_group.id, - ] - verifylist = [ - ('group', self._security_group.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.get.assert_called_once_with( - self._security_group.id - ) - self.assertEqual(self.expected_columns_with_group, columns) - self.assertEqual(self.expected_data_with_group, list(data)) - - def test_list_all_projects(self): - arglist = [ - '--all-projects', - ] - verifylist = [ - ('all_projects', True), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': True} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - def test_list_with_ignored_options(self): - arglist = [ - '--long', - ] - verifylist = [ - ('long', False), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - self.compute.security_groups.list.assert_called_once_with( - search_opts={'all_tenants': False} - ) - self.assertEqual(self.expected_columns_no_group, columns) - self.assertEqual(self.expected_data_no_group, list(data)) - - class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): # The security group rule to be shown. @@ -1353,49 +864,3 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): self._security_group_rule.id, ignore_missing=False) self.assertEqual(self.columns, columns) self.assertEqual(self.data, data) - - -class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): - - # The security group rule to be shown. - _security_group_rule = \ - compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule() - - columns, data = \ - security_group_rule._format_security_group_rule_show( - _security_group_rule._info) - - def setUp(self): - super(TestShowSecurityGroupRuleCompute, self).setUp() - - self.app.client_manager.network_endpoint_enabled = False - - # Build a security group fake customized for this test. - security_group_rules = [self._security_group_rule._info] - security_group = fakes.FakeResource( - info=copy.deepcopy({'rules': security_group_rules}), - loaded=True) - security_group.rules = security_group_rules - self.compute.security_groups.list.return_value = [security_group] - - # Get the command object to test - self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None) - - def test_show_no_options(self): - self.assertRaises(tests_utils.ParserException, - self.check_parser, self.cmd, [], []) - - def test_show_all_options(self): - arglist = [ - self._security_group_rule.id, - ] - verifylist = [ - ('rule', self._security_group_rule.id), - ] - parsed_args = self.check_parser(self.cmd, arglist, verifylist) - - columns, data = self.cmd.take_action(parsed_args) - - self.compute.security_groups.list.assert_called_once_with() - self.assertEqual(self.columns, columns) - self.assertEqual(self.data, data)