Merge "Split security group tests"
This commit is contained in:
commit
54723427b6
openstackclient/tests/unit/network/v2
@ -0,0 +1,403 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from mock import call
|
||||||
|
|
||||||
|
from osc_lib import exceptions
|
||||||
|
|
||||||
|
from openstackclient.network.v2 import security_group
|
||||||
|
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
||||||
|
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
||||||
|
from openstackclient.tests.unit import utils as tests_utils
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityGroupCompute(compute_fakes.TestComputev2):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
# Get a shortcut to the compute client
|
||||||
|
self.compute = self.app.client_manager.compute
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
|
domain = identity_fakes.FakeDomain.create_one_domain()
|
||||||
|
# The security group to be shown.
|
||||||
|
_security_group = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
|
|
||||||
|
columns = (
|
||||||
|
'description',
|
||||||
|
'id',
|
||||||
|
'name',
|
||||||
|
'project_id',
|
||||||
|
'rules',
|
||||||
|
)
|
||||||
|
|
||||||
|
data = (
|
||||||
|
_security_group.description,
|
||||||
|
_security_group.id,
|
||||||
|
_security_group.name,
|
||||||
|
_security_group.tenant_id,
|
||||||
|
'',
|
||||||
|
)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCreateSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.create.return_value = self._security_group
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group.CreateSecurityGroup(self.app, None)
|
||||||
|
|
||||||
|
def test_create_no_options(self):
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
|
def test_create_network_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--project', self.project.name,
|
||||||
|
'--project-domain', self.domain.name,
|
||||||
|
self._security_group.name,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_min_options(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group.name,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('name', self._security_group.name),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.create.assert_called_once_with(
|
||||||
|
self._security_group.name,
|
||||||
|
self._security_group.name)
|
||||||
|
self.assertEqual(self.columns, columns)
|
||||||
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
def test_create_all_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--description', self._security_group.description,
|
||||||
|
self._security_group.name,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('description', self._security_group.description),
|
||||||
|
('name', self._security_group.name),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.create.assert_called_once_with(
|
||||||
|
self._security_group.name,
|
||||||
|
self._security_group.description)
|
||||||
|
self.assertEqual(self.columns, columns)
|
||||||
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
|
# The security groups to be deleted.
|
||||||
|
_security_groups = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_security_groups()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestDeleteSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.delete = mock.Mock(return_value=None)
|
||||||
|
|
||||||
|
self.compute.security_groups.get = (
|
||||||
|
compute_fakes.FakeSecurityGroup.get_security_groups(
|
||||||
|
self._security_groups)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
|
||||||
|
|
||||||
|
def test_security_group_delete(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_groups[0].id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('group', [self._security_groups[0].id]),
|
||||||
|
]
|
||||||
|
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.delete.assert_called_once_with(
|
||||||
|
self._security_groups[0].id)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_multi_security_groups_delete(self):
|
||||||
|
arglist = []
|
||||||
|
verifylist = []
|
||||||
|
|
||||||
|
for s in self._security_groups:
|
||||||
|
arglist.append(s.id)
|
||||||
|
verifylist = [
|
||||||
|
('group', arglist),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
for s in self._security_groups:
|
||||||
|
calls.append(call(s.id))
|
||||||
|
self.compute.security_groups.delete.assert_has_calls(calls)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_multi_security_groups_delete_with_exception(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_groups[0].id,
|
||||||
|
'unexist_security_group',
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('group',
|
||||||
|
[self._security_groups[0].id, 'unexist_security_group']),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
find_mock_result = [self._security_groups[0], exceptions.CommandError]
|
||||||
|
self.compute.security_groups.get = (
|
||||||
|
mock.Mock(side_effect=find_mock_result)
|
||||||
|
)
|
||||||
|
self.compute.security_groups.find.side_effect = (
|
||||||
|
exceptions.NotFound(None))
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.cmd.take_action(parsed_args)
|
||||||
|
self.fail('CommandError should be raised.')
|
||||||
|
except exceptions.CommandError as e:
|
||||||
|
self.assertEqual('1 of 2 groups failed to delete.', str(e))
|
||||||
|
|
||||||
|
self.compute.security_groups.get.assert_any_call(
|
||||||
|
self._security_groups[0].id)
|
||||||
|
self.compute.security_groups.get.assert_any_call(
|
||||||
|
'unexist_security_group')
|
||||||
|
self.compute.security_groups.delete.assert_called_once_with(
|
||||||
|
self._security_groups[0].id
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
|
# The security group to be listed.
|
||||||
|
_security_groups = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
|
||||||
|
|
||||||
|
columns = (
|
||||||
|
'ID',
|
||||||
|
'Name',
|
||||||
|
'Description',
|
||||||
|
)
|
||||||
|
columns_all_projects = (
|
||||||
|
'ID',
|
||||||
|
'Name',
|
||||||
|
'Description',
|
||||||
|
'Project',
|
||||||
|
)
|
||||||
|
|
||||||
|
data = []
|
||||||
|
for grp in _security_groups:
|
||||||
|
data.append((
|
||||||
|
grp.id,
|
||||||
|
grp.name,
|
||||||
|
grp.description,
|
||||||
|
))
|
||||||
|
data_all_projects = []
|
||||||
|
for grp in _security_groups:
|
||||||
|
data_all_projects.append((
|
||||||
|
grp.id,
|
||||||
|
grp.name,
|
||||||
|
grp.description,
|
||||||
|
grp.tenant_id,
|
||||||
|
))
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestListSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
self.compute.security_groups.list.return_value = self._security_groups
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group.ListSecurityGroup(self.app, None)
|
||||||
|
|
||||||
|
def test_security_group_list_no_options(self):
|
||||||
|
arglist = []
|
||||||
|
verifylist = [
|
||||||
|
('all_projects', False),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
kwargs = {'search_opts': {'all_tenants': False}}
|
||||||
|
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
||||||
|
self.assertEqual(self.columns, columns)
|
||||||
|
self.assertEqual(self.data, list(data))
|
||||||
|
|
||||||
|
def test_security_group_list_all_projects(self):
|
||||||
|
arglist = [
|
||||||
|
'--all-projects',
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('all_projects', True),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
kwargs = {'search_opts': {'all_tenants': True}}
|
||||||
|
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
||||||
|
self.assertEqual(self.columns_all_projects, columns)
|
||||||
|
self.assertEqual(self.data_all_projects, list(data))
|
||||||
|
|
||||||
|
|
||||||
|
class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
|
# The security group to be set.
|
||||||
|
_security_group = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestSetSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.update = mock.Mock(return_value=None)
|
||||||
|
|
||||||
|
self.compute.security_groups.get = mock.Mock(
|
||||||
|
return_value=self._security_group)
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group.SetSecurityGroup(self.app, None)
|
||||||
|
|
||||||
|
def test_set_no_options(self):
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
|
def test_set_no_updates(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group.name,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('group', self._security_group.name),
|
||||||
|
]
|
||||||
|
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.update.assert_called_once_with(
|
||||||
|
self._security_group,
|
||||||
|
self._security_group.name,
|
||||||
|
self._security_group.description
|
||||||
|
)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_set_all_options(self):
|
||||||
|
new_name = 'new-' + self._security_group.name
|
||||||
|
new_description = 'new-' + self._security_group.description
|
||||||
|
arglist = [
|
||||||
|
'--name', new_name,
|
||||||
|
'--description', new_description,
|
||||||
|
self._security_group.name,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('description', new_description),
|
||||||
|
('group', self._security_group.name),
|
||||||
|
('name', new_name),
|
||||||
|
]
|
||||||
|
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.update.assert_called_once_with(
|
||||||
|
self._security_group,
|
||||||
|
new_name,
|
||||||
|
new_description
|
||||||
|
)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
|
||||||
|
class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
|
# The security group rule to be shown with the group.
|
||||||
|
_security_group_rule = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
|
||||||
|
|
||||||
|
# The security group to be shown.
|
||||||
|
_security_group = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_one_security_group(
|
||||||
|
attrs={'rules': [_security_group_rule._info]}
|
||||||
|
)
|
||||||
|
|
||||||
|
columns = (
|
||||||
|
'description',
|
||||||
|
'id',
|
||||||
|
'name',
|
||||||
|
'project_id',
|
||||||
|
'rules',
|
||||||
|
)
|
||||||
|
|
||||||
|
data = (
|
||||||
|
_security_group.description,
|
||||||
|
_security_group.id,
|
||||||
|
_security_group.name,
|
||||||
|
_security_group.tenant_id,
|
||||||
|
security_group._format_compute_security_group_rules(
|
||||||
|
[_security_group_rule._info]),
|
||||||
|
)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestShowSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.get.return_value = self._security_group
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group.ShowSecurityGroup(self.app, None)
|
||||||
|
|
||||||
|
def test_show_no_options(self):
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
|
def test_show_all_options(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.get.assert_called_once_with(
|
||||||
|
self._security_group.id)
|
||||||
|
self.assertEqual(self.columns, columns)
|
||||||
|
self.assertEqual(self.data, data)
|
@ -17,7 +17,6 @@ from mock import call
|
|||||||
from osc_lib import exceptions
|
from osc_lib import exceptions
|
||||||
|
|
||||||
from openstackclient.network.v2 import security_group
|
from openstackclient.network.v2 import security_group
|
||||||
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
|
||||||
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
||||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||||
from openstackclient.tests.unit import utils as tests_utils
|
from openstackclient.tests.unit import utils as tests_utils
|
||||||
@ -36,15 +35,6 @@ class TestSecurityGroupNetwork(network_fakes.TestNetworkV2):
|
|||||||
self.domains_mock = self.app.client_manager.identity.domains
|
self.domains_mock = self.app.client_manager.identity.domains
|
||||||
|
|
||||||
|
|
||||||
class TestSecurityGroupCompute(compute_fakes.TestComputev2):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
# Get a shortcut to the compute client
|
|
||||||
self.compute = self.app.client_manager.compute
|
|
||||||
|
|
||||||
|
|
||||||
class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
|
class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
|
||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
@ -129,90 +119,6 @@ class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
|
|||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
|
||||||
class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
|
||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
|
||||||
domain = identity_fakes.FakeDomain.create_one_domain()
|
|
||||||
# The security group to be shown.
|
|
||||||
_security_group = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
|
||||||
|
|
||||||
columns = (
|
|
||||||
'description',
|
|
||||||
'id',
|
|
||||||
'name',
|
|
||||||
'project_id',
|
|
||||||
'rules',
|
|
||||||
)
|
|
||||||
|
|
||||||
data = (
|
|
||||||
_security_group.description,
|
|
||||||
_security_group.id,
|
|
||||||
_security_group.name,
|
|
||||||
_security_group.tenant_id,
|
|
||||||
'',
|
|
||||||
)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestCreateSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.create.return_value = self._security_group
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group.CreateSecurityGroup(self.app, None)
|
|
||||||
|
|
||||||
def test_create_no_options(self):
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, [], [])
|
|
||||||
|
|
||||||
def test_create_network_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--project', self.project.name,
|
|
||||||
'--project-domain', self.domain.name,
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_min_options(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('name', self._security_group.name),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.create.assert_called_once_with(
|
|
||||||
self._security_group.name,
|
|
||||||
self._security_group.name)
|
|
||||||
self.assertEqual(self.columns, columns)
|
|
||||||
self.assertEqual(self.data, data)
|
|
||||||
|
|
||||||
def test_create_all_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--description', self._security_group.description,
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('description', self._security_group.description),
|
|
||||||
('name', self._security_group.name),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.create.assert_called_once_with(
|
|
||||||
self._security_group.name,
|
|
||||||
self._security_group.description)
|
|
||||||
self.assertEqual(self.columns, columns)
|
|
||||||
self.assertEqual(self.data, data)
|
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
|
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
|
||||||
|
|
||||||
# The security groups to be deleted.
|
# The security groups to be deleted.
|
||||||
@ -297,94 +203,6 @@ class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
|
||||||
|
|
||||||
# The security groups to be deleted.
|
|
||||||
_security_groups = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_security_groups()
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestDeleteSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.delete = mock.Mock(return_value=None)
|
|
||||||
|
|
||||||
self.compute.security_groups.get = (
|
|
||||||
compute_fakes.FakeSecurityGroup.get_security_groups(
|
|
||||||
self._security_groups)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
|
|
||||||
|
|
||||||
def test_security_group_delete(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_groups[0].id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('group', [self._security_groups[0].id]),
|
|
||||||
]
|
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.delete.assert_called_once_with(
|
|
||||||
self._security_groups[0].id)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
def test_multi_security_groups_delete(self):
|
|
||||||
arglist = []
|
|
||||||
verifylist = []
|
|
||||||
|
|
||||||
for s in self._security_groups:
|
|
||||||
arglist.append(s.id)
|
|
||||||
verifylist = [
|
|
||||||
('group', arglist),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
calls = []
|
|
||||||
for s in self._security_groups:
|
|
||||||
calls.append(call(s.id))
|
|
||||||
self.compute.security_groups.delete.assert_has_calls(calls)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
def test_multi_security_groups_delete_with_exception(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_groups[0].id,
|
|
||||||
'unexist_security_group',
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('group',
|
|
||||||
[self._security_groups[0].id, 'unexist_security_group']),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
find_mock_result = [self._security_groups[0], exceptions.CommandError]
|
|
||||||
self.compute.security_groups.get = (
|
|
||||||
mock.Mock(side_effect=find_mock_result)
|
|
||||||
)
|
|
||||||
self.compute.security_groups.find.side_effect = (
|
|
||||||
exceptions.NotFound(None))
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.cmd.take_action(parsed_args)
|
|
||||||
self.fail('CommandError should be raised.')
|
|
||||||
except exceptions.CommandError as e:
|
|
||||||
self.assertEqual('1 of 2 groups failed to delete.', str(e))
|
|
||||||
|
|
||||||
self.compute.security_groups.get.assert_any_call(
|
|
||||||
self._security_groups[0].id)
|
|
||||||
self.compute.security_groups.get.assert_any_call(
|
|
||||||
'unexist_security_group')
|
|
||||||
self.compute.security_groups.delete.assert_called_once_with(
|
|
||||||
self._security_groups[0].id
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
|
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
|
||||||
|
|
||||||
# The security group to be listed.
|
# The security group to be listed.
|
||||||
@ -483,80 +301,6 @@ class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
|
|||||||
self.assertEqual(self.data, list(data))
|
self.assertEqual(self.data, list(data))
|
||||||
|
|
||||||
|
|
||||||
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
|
||||||
|
|
||||||
# The security group to be listed.
|
|
||||||
_security_groups = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
|
|
||||||
|
|
||||||
columns = (
|
|
||||||
'ID',
|
|
||||||
'Name',
|
|
||||||
'Description',
|
|
||||||
)
|
|
||||||
columns_all_projects = (
|
|
||||||
'ID',
|
|
||||||
'Name',
|
|
||||||
'Description',
|
|
||||||
'Project',
|
|
||||||
)
|
|
||||||
|
|
||||||
data = []
|
|
||||||
for grp in _security_groups:
|
|
||||||
data.append((
|
|
||||||
grp.id,
|
|
||||||
grp.name,
|
|
||||||
grp.description,
|
|
||||||
))
|
|
||||||
data_all_projects = []
|
|
||||||
for grp in _security_groups:
|
|
||||||
data_all_projects.append((
|
|
||||||
grp.id,
|
|
||||||
grp.name,
|
|
||||||
grp.description,
|
|
||||||
grp.tenant_id,
|
|
||||||
))
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestListSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
self.compute.security_groups.list.return_value = self._security_groups
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group.ListSecurityGroup(self.app, None)
|
|
||||||
|
|
||||||
def test_security_group_list_no_options(self):
|
|
||||||
arglist = []
|
|
||||||
verifylist = [
|
|
||||||
('all_projects', False),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
kwargs = {'search_opts': {'all_tenants': False}}
|
|
||||||
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
|
||||||
self.assertEqual(self.columns, columns)
|
|
||||||
self.assertEqual(self.data, list(data))
|
|
||||||
|
|
||||||
def test_security_group_list_all_projects(self):
|
|
||||||
arglist = [
|
|
||||||
'--all-projects',
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('all_projects', True),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
kwargs = {'search_opts': {'all_tenants': True}}
|
|
||||||
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
|
||||||
self.assertEqual(self.columns_all_projects, columns)
|
|
||||||
self.assertEqual(self.data_all_projects, list(data))
|
|
||||||
|
|
||||||
|
|
||||||
class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
|
class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
|
||||||
|
|
||||||
# The security group to be set.
|
# The security group to be set.
|
||||||
@ -623,72 +367,6 @@ class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
|
|||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
|
||||||
class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
|
||||||
|
|
||||||
# The security group to be set.
|
|
||||||
_security_group = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestSetSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.update = mock.Mock(return_value=None)
|
|
||||||
|
|
||||||
self.compute.security_groups.get = mock.Mock(
|
|
||||||
return_value=self._security_group)
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group.SetSecurityGroup(self.app, None)
|
|
||||||
|
|
||||||
def test_set_no_options(self):
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, [], [])
|
|
||||||
|
|
||||||
def test_set_no_updates(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('group', self._security_group.name),
|
|
||||||
]
|
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.update.assert_called_once_with(
|
|
||||||
self._security_group,
|
|
||||||
self._security_group.name,
|
|
||||||
self._security_group.description
|
|
||||||
)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
def test_set_all_options(self):
|
|
||||||
new_name = 'new-' + self._security_group.name
|
|
||||||
new_description = 'new-' + self._security_group.description
|
|
||||||
arglist = [
|
|
||||||
'--name', new_name,
|
|
||||||
'--description', new_description,
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('description', new_description),
|
|
||||||
('group', self._security_group.name),
|
|
||||||
('name', new_name),
|
|
||||||
]
|
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.update.assert_called_once_with(
|
|
||||||
self._security_group,
|
|
||||||
new_name,
|
|
||||||
new_description
|
|
||||||
)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
|
|
||||||
class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
|
class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
|
||||||
|
|
||||||
# The security group rule to be shown with the group.
|
# The security group rule to be shown with the group.
|
||||||
@ -746,63 +424,3 @@ class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
|
|||||||
self._security_group.id, ignore_missing=False)
|
self._security_group.id, ignore_missing=False)
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
|
||||||
class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
|
||||||
|
|
||||||
# The security group rule to be shown with the group.
|
|
||||||
_security_group_rule = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
|
|
||||||
|
|
||||||
# The security group to be shown.
|
|
||||||
_security_group = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group(
|
|
||||||
attrs={'rules': [_security_group_rule._info]}
|
|
||||||
)
|
|
||||||
|
|
||||||
columns = (
|
|
||||||
'description',
|
|
||||||
'id',
|
|
||||||
'name',
|
|
||||||
'project_id',
|
|
||||||
'rules',
|
|
||||||
)
|
|
||||||
|
|
||||||
data = (
|
|
||||||
_security_group.description,
|
|
||||||
_security_group.id,
|
|
||||||
_security_group.name,
|
|
||||||
_security_group.tenant_id,
|
|
||||||
security_group._format_compute_security_group_rules(
|
|
||||||
[_security_group_rule._info]),
|
|
||||||
)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestShowSecurityGroupCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = self._security_group
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group.ShowSecurityGroup(self.app, None)
|
|
||||||
|
|
||||||
def test_show_no_options(self):
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, [], [])
|
|
||||||
|
|
||||||
def test_show_all_options(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.get.assert_called_once_with(
|
|
||||||
self._security_group.id)
|
|
||||||
self.assertEqual(self.columns, columns)
|
|
||||||
self.assertEqual(self.data, data)
|
|
@ -0,0 +1,556 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import mock
|
||||||
|
from mock import call
|
||||||
|
|
||||||
|
from osc_lib import exceptions
|
||||||
|
|
||||||
|
from openstackclient.network import utils as network_utils
|
||||||
|
from openstackclient.network.v2 import security_group_rule
|
||||||
|
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
||||||
|
from openstackclient.tests.unit import fakes
|
||||||
|
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
||||||
|
from openstackclient.tests.unit import utils as tests_utils
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestSecurityGroupRuleCompute, self).setUp()
|
||||||
|
|
||||||
|
# Get a shortcut to the network client
|
||||||
|
self.compute = self.app.client_manager.compute
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||||
|
|
||||||
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
|
domain = identity_fakes.FakeDomain.create_one_domain()
|
||||||
|
# The security group rule to be created.
|
||||||
|
_security_group_rule = None
|
||||||
|
|
||||||
|
# The security group that will contain the rule created.
|
||||||
|
_security_group = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
|
|
||||||
|
def _setup_security_group_rule(self, attrs=None):
|
||||||
|
self._security_group_rule = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
|
||||||
|
attrs)
|
||||||
|
self.compute.security_group_rules.create.return_value = \
|
||||||
|
self._security_group_rule
|
||||||
|
expected_columns, expected_data = \
|
||||||
|
security_group_rule._format_security_group_rule_show(
|
||||||
|
self._security_group_rule._info)
|
||||||
|
return expected_columns, expected_data
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCreateSecurityGroupRuleCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.get.return_value = self._security_group
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
|
def test_create_no_options(self):
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
|
def test_create_all_source_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--src-ip', '10.10.0.0/24',
|
||||||
|
'--src-group', self._security_group.id,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_all_remote_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--remote-ip', '10.10.0.0/24',
|
||||||
|
'--remote-group', self._security_group.id,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_bad_protocol(self):
|
||||||
|
arglist = [
|
||||||
|
'--protocol', 'foo',
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_all_protocol_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--protocol', 'tcp',
|
||||||
|
'--proto', 'tcp',
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_network_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--ingress',
|
||||||
|
'--ethertype', 'IPv4',
|
||||||
|
'--icmp-type', '3',
|
||||||
|
'--icmp-code', '11',
|
||||||
|
'--project', self.project.name,
|
||||||
|
'--project-domain', self.domain.name,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
|
def test_create_default_rule(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule()
|
||||||
|
dst_port = str(self._security_group_rule.from_port) + ':' + \
|
||||||
|
str(self._security_group_rule.to_port)
|
||||||
|
arglist = [
|
||||||
|
'--dst-port', dst_port,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port)),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
def test_create_source_group(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
|
'from_port': 22,
|
||||||
|
'to_port': 22,
|
||||||
|
'group': {'name': self._security_group.name},
|
||||||
|
})
|
||||||
|
arglist = [
|
||||||
|
'--dst-port', str(self._security_group_rule.from_port),
|
||||||
|
'--src-group', self._security_group.name,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port)),
|
||||||
|
('src_group', self._security_group.name),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
self._security_group.id,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
def test_create_remote_group(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
|
'from_port': 22,
|
||||||
|
'to_port': 22,
|
||||||
|
'group': {'name': self._security_group.name},
|
||||||
|
})
|
||||||
|
arglist = [
|
||||||
|
'--dst-port', str(self._security_group_rule.from_port),
|
||||||
|
'--remote-group', self._security_group.name,
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port)),
|
||||||
|
('remote_group', self._security_group.name),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
self._security_group.id,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
def test_create_source_ip(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
|
'ip_protocol': 'icmp',
|
||||||
|
'from_port': -1,
|
||||||
|
'to_port': -1,
|
||||||
|
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||||
|
})
|
||||||
|
arglist = [
|
||||||
|
'--protocol', self._security_group_rule.ip_protocol,
|
||||||
|
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('protocol', self._security_group_rule.ip_protocol),
|
||||||
|
('src_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
def test_create_remote_ip(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
|
'ip_protocol': 'icmp',
|
||||||
|
'from_port': -1,
|
||||||
|
'to_port': -1,
|
||||||
|
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||||
|
})
|
||||||
|
arglist = [
|
||||||
|
'--protocol', self._security_group_rule.ip_protocol,
|
||||||
|
'--remote-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('protocol', self._security_group_rule.ip_protocol),
|
||||||
|
('remote_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
def test_create_proto_option(self):
|
||||||
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
|
'ip_protocol': 'icmp',
|
||||||
|
'from_port': -1,
|
||||||
|
'to_port': -1,
|
||||||
|
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||||
|
})
|
||||||
|
arglist = [
|
||||||
|
'--proto', self._security_group_rule.ip_protocol,
|
||||||
|
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('proto', self._security_group_rule.ip_protocol),
|
||||||
|
('protocol', None),
|
||||||
|
('src_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
|
self._security_group.id,
|
||||||
|
self._security_group_rule.ip_protocol,
|
||||||
|
self._security_group_rule.from_port,
|
||||||
|
self._security_group_rule.to_port,
|
||||||
|
self._security_group_rule.ip_range['cidr'],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_columns, columns)
|
||||||
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||||
|
|
||||||
|
# The security group rule to be deleted.
|
||||||
|
_security_group_rules = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
|
||||||
|
count=2)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestDeleteSecurityGroupRuleCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
|
def test_security_group_rule_delete(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group_rules[0].id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('rule', [self._security_group_rules[0].id]),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_group_rules.delete.assert_called_once_with(
|
||||||
|
self._security_group_rules[0].id)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_multi_security_group_rules_delete(self):
|
||||||
|
arglist = []
|
||||||
|
verifylist = []
|
||||||
|
|
||||||
|
for s in self._security_group_rules:
|
||||||
|
arglist.append(s.id)
|
||||||
|
verifylist = [
|
||||||
|
('rule', arglist),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
for s in self._security_group_rules:
|
||||||
|
calls.append(call(s.id))
|
||||||
|
self.compute.security_group_rules.delete.assert_has_calls(calls)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_multi_security_group_rules_delete_with_exception(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group_rules[0].id,
|
||||||
|
'unexist_rule',
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('rule',
|
||||||
|
[self._security_group_rules[0].id, 'unexist_rule']),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
find_mock_result = [None, exceptions.CommandError]
|
||||||
|
self.compute.security_group_rules.delete = (
|
||||||
|
mock.Mock(side_effect=find_mock_result)
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.cmd.take_action(parsed_args)
|
||||||
|
self.fail('CommandError should be raised.')
|
||||||
|
except exceptions.CommandError as e:
|
||||||
|
self.assertEqual('1 of 2 rules failed to delete.', str(e))
|
||||||
|
|
||||||
|
self.compute.security_group_rules.delete.assert_any_call(
|
||||||
|
self._security_group_rules[0].id)
|
||||||
|
self.compute.security_group_rules.delete.assert_any_call(
|
||||||
|
'unexist_rule')
|
||||||
|
|
||||||
|
|
||||||
|
class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||||
|
|
||||||
|
# The security group to hold the rules.
|
||||||
|
_security_group = \
|
||||||
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
|
|
||||||
|
# The security group rule to be listed.
|
||||||
|
_security_group_rule_tcp = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
||||||
|
'ip_protocol': 'tcp',
|
||||||
|
'from_port': 80,
|
||||||
|
'to_port': 80,
|
||||||
|
'group': {'name': _security_group.name},
|
||||||
|
})
|
||||||
|
_security_group_rule_icmp = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
||||||
|
'ip_protocol': 'icmp',
|
||||||
|
'from_port': -1,
|
||||||
|
'to_port': -1,
|
||||||
|
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||||
|
'group': {'name': _security_group.name},
|
||||||
|
})
|
||||||
|
_security_group.rules = [_security_group_rule_tcp._info,
|
||||||
|
_security_group_rule_icmp._info]
|
||||||
|
|
||||||
|
expected_columns_with_group = (
|
||||||
|
'ID',
|
||||||
|
'IP Protocol',
|
||||||
|
'IP Range',
|
||||||
|
'Port Range',
|
||||||
|
'Remote Security Group',
|
||||||
|
)
|
||||||
|
expected_columns_no_group = \
|
||||||
|
expected_columns_with_group + ('Security Group',)
|
||||||
|
|
||||||
|
expected_data_with_group = []
|
||||||
|
expected_data_no_group = []
|
||||||
|
for _security_group_rule in _security_group.rules:
|
||||||
|
rule = network_utils.transform_compute_security_group_rule(
|
||||||
|
_security_group_rule
|
||||||
|
)
|
||||||
|
expected_rule_with_group = (
|
||||||
|
rule['id'],
|
||||||
|
rule['ip_protocol'],
|
||||||
|
rule['ip_range'],
|
||||||
|
rule['port_range'],
|
||||||
|
rule['remote_security_group'],
|
||||||
|
)
|
||||||
|
expected_rule_no_group = expected_rule_with_group + \
|
||||||
|
(_security_group_rule['parent_group_id'],)
|
||||||
|
expected_data_with_group.append(expected_rule_with_group)
|
||||||
|
expected_data_no_group.append(expected_rule_no_group)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestListSecurityGroupRuleCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
self.compute.security_groups.get.return_value = \
|
||||||
|
self._security_group
|
||||||
|
self.compute.security_groups.list.return_value = \
|
||||||
|
[self._security_group]
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
|
def test_list_default(self):
|
||||||
|
parsed_args = self.check_parser(self.cmd, [], [])
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
self.compute.security_groups.list.assert_called_once_with(
|
||||||
|
search_opts={'all_tenants': False}
|
||||||
|
)
|
||||||
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
|
def test_list_with_group(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('group', self._security_group.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
self.compute.security_groups.get.assert_called_once_with(
|
||||||
|
self._security_group.id
|
||||||
|
)
|
||||||
|
self.assertEqual(self.expected_columns_with_group, columns)
|
||||||
|
self.assertEqual(self.expected_data_with_group, list(data))
|
||||||
|
|
||||||
|
def test_list_all_projects(self):
|
||||||
|
arglist = [
|
||||||
|
'--all-projects',
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('all_projects', True),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
self.compute.security_groups.list.assert_called_once_with(
|
||||||
|
search_opts={'all_tenants': True}
|
||||||
|
)
|
||||||
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
|
def test_list_with_ignored_options(self):
|
||||||
|
arglist = [
|
||||||
|
'--long',
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('long', False),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
self.compute.security_groups.list.assert_called_once_with(
|
||||||
|
search_opts={'all_tenants': False}
|
||||||
|
)
|
||||||
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
|
|
||||||
|
class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
||||||
|
|
||||||
|
# The security group rule to be shown.
|
||||||
|
_security_group_rule = \
|
||||||
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
|
||||||
|
|
||||||
|
columns, data = \
|
||||||
|
security_group_rule._format_security_group_rule_show(
|
||||||
|
_security_group_rule._info)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestShowSecurityGroupRuleCompute, self).setUp()
|
||||||
|
|
||||||
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
|
# Build a security group fake customized for this test.
|
||||||
|
security_group_rules = [self._security_group_rule._info]
|
||||||
|
security_group = fakes.FakeResource(
|
||||||
|
info=copy.deepcopy({'rules': security_group_rules}),
|
||||||
|
loaded=True)
|
||||||
|
security_group.rules = security_group_rules
|
||||||
|
self.compute.security_groups.list.return_value = [security_group]
|
||||||
|
|
||||||
|
# Get the command object to test
|
||||||
|
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
|
def test_show_no_options(self):
|
||||||
|
self.assertRaises(tests_utils.ParserException,
|
||||||
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
|
def test_show_all_options(self):
|
||||||
|
arglist = [
|
||||||
|
self._security_group_rule.id,
|
||||||
|
]
|
||||||
|
verifylist = [
|
||||||
|
('rule', self._security_group_rule.id),
|
||||||
|
]
|
||||||
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
self.compute.security_groups.list.assert_called_once_with()
|
||||||
|
self.assertEqual(self.columns, columns)
|
||||||
|
self.assertEqual(self.data, data)
|
@ -11,16 +11,12 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
import copy
|
|
||||||
import mock
|
import mock
|
||||||
from mock import call
|
from mock import call
|
||||||
|
|
||||||
from osc_lib import exceptions
|
from osc_lib import exceptions
|
||||||
|
|
||||||
from openstackclient.network import utils as network_utils
|
|
||||||
from openstackclient.network.v2 import security_group_rule
|
from openstackclient.network.v2 import security_group_rule
|
||||||
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
|
||||||
from openstackclient.tests.unit import fakes
|
|
||||||
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
||||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||||
from openstackclient.tests.unit import utils as tests_utils
|
from openstackclient.tests.unit import utils as tests_utils
|
||||||
@ -39,15 +35,6 @@ class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
|
|||||||
self.domains_mock = self.app.client_manager.identity.domains
|
self.domains_mock = self.app.client_manager.identity.domains
|
||||||
|
|
||||||
|
|
||||||
class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestSecurityGroupRuleCompute, self).setUp()
|
|
||||||
|
|
||||||
# Get a shortcut to the network client
|
|
||||||
self.compute = self.app.client_manager.compute
|
|
||||||
|
|
||||||
|
|
||||||
class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
@ -548,280 +535,6 @@ class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
|||||||
self.assertEqual(self.expected_data, data)
|
self.assertEqual(self.expected_data, data)
|
||||||
|
|
||||||
|
|
||||||
class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|
||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
|
||||||
domain = identity_fakes.FakeDomain.create_one_domain()
|
|
||||||
# The security group rule to be created.
|
|
||||||
_security_group_rule = None
|
|
||||||
|
|
||||||
# The security group that will contain the rule created.
|
|
||||||
_security_group = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
|
||||||
|
|
||||||
def _setup_security_group_rule(self, attrs=None):
|
|
||||||
self._security_group_rule = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
|
|
||||||
attrs)
|
|
||||||
self.compute.security_group_rules.create.return_value = \
|
|
||||||
self._security_group_rule
|
|
||||||
expected_columns, expected_data = \
|
|
||||||
security_group_rule._format_security_group_rule_show(
|
|
||||||
self._security_group_rule._info)
|
|
||||||
return expected_columns, expected_data
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestCreateSecurityGroupRuleCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = self._security_group
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
|
|
||||||
|
|
||||||
def test_create_no_options(self):
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, [], [])
|
|
||||||
|
|
||||||
def test_create_all_source_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--src-ip', '10.10.0.0/24',
|
|
||||||
'--src-group', self._security_group.id,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_all_remote_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--remote-ip', '10.10.0.0/24',
|
|
||||||
'--remote-group', self._security_group.id,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_bad_protocol(self):
|
|
||||||
arglist = [
|
|
||||||
'--protocol', 'foo',
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_all_protocol_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--protocol', 'tcp',
|
|
||||||
'--proto', 'tcp',
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_network_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--ingress',
|
|
||||||
'--ethertype', 'IPv4',
|
|
||||||
'--icmp-type', '3',
|
|
||||||
'--icmp-code', '11',
|
|
||||||
'--project', self.project.name,
|
|
||||||
'--project-domain', self.domain.name,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_default_rule(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule()
|
|
||||||
dst_port = str(self._security_group_rule.from_port) + ':' + \
|
|
||||||
str(self._security_group_rule.to_port)
|
|
||||||
arglist = [
|
|
||||||
'--dst-port', dst_port,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port)),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
def test_create_source_group(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
|
||||||
'from_port': 22,
|
|
||||||
'to_port': 22,
|
|
||||||
'group': {'name': self._security_group.name},
|
|
||||||
})
|
|
||||||
arglist = [
|
|
||||||
'--dst-port', str(self._security_group_rule.from_port),
|
|
||||||
'--src-group', self._security_group.name,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port)),
|
|
||||||
('src_group', self._security_group.name),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
self._security_group.id,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
def test_create_remote_group(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
|
||||||
'from_port': 22,
|
|
||||||
'to_port': 22,
|
|
||||||
'group': {'name': self._security_group.name},
|
|
||||||
})
|
|
||||||
arglist = [
|
|
||||||
'--dst-port', str(self._security_group_rule.from_port),
|
|
||||||
'--remote-group', self._security_group.name,
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port)),
|
|
||||||
('remote_group', self._security_group.name),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
self._security_group.id,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
def test_create_source_ip(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
|
||||||
'ip_protocol': 'icmp',
|
|
||||||
'from_port': -1,
|
|
||||||
'to_port': -1,
|
|
||||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
|
||||||
})
|
|
||||||
arglist = [
|
|
||||||
'--protocol', self._security_group_rule.ip_protocol,
|
|
||||||
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('protocol', self._security_group_rule.ip_protocol),
|
|
||||||
('src_ip', self._security_group_rule.ip_range['cidr']),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
def test_create_remote_ip(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
|
||||||
'ip_protocol': 'icmp',
|
|
||||||
'from_port': -1,
|
|
||||||
'to_port': -1,
|
|
||||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
|
||||||
})
|
|
||||||
arglist = [
|
|
||||||
'--protocol', self._security_group_rule.ip_protocol,
|
|
||||||
'--remote-ip', self._security_group_rule.ip_range['cidr'],
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('protocol', self._security_group_rule.ip_protocol),
|
|
||||||
('remote_ip', self._security_group_rule.ip_range['cidr']),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
def test_create_proto_option(self):
|
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
|
||||||
'ip_protocol': 'icmp',
|
|
||||||
'from_port': -1,
|
|
||||||
'to_port': -1,
|
|
||||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
|
||||||
})
|
|
||||||
arglist = [
|
|
||||||
'--proto', self._security_group_rule.ip_protocol,
|
|
||||||
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('proto', self._security_group_rule.ip_protocol),
|
|
||||||
('protocol', None),
|
|
||||||
('src_ip', self._security_group_rule.ip_range['cidr']),
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
|
||||||
self._security_group.id,
|
|
||||||
self._security_group_rule.ip_protocol,
|
|
||||||
self._security_group_rule.from_port,
|
|
||||||
self._security_group_rule.to_port,
|
|
||||||
self._security_group_rule.ip_range['cidr'],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_columns, columns)
|
|
||||||
self.assertEqual(expected_data, data)
|
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||||
|
|
||||||
# The security group rules to be deleted.
|
# The security group rules to be deleted.
|
||||||
@ -909,83 +622,6 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|
||||||
|
|
||||||
# The security group rule to be deleted.
|
|
||||||
_security_group_rules = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
|
|
||||||
count=2)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestDeleteSecurityGroupRuleCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
|
|
||||||
|
|
||||||
def test_security_group_rule_delete(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group_rules[0].id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('rule', [self._security_group_rules[0].id]),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_group_rules.delete.assert_called_once_with(
|
|
||||||
self._security_group_rules[0].id)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
def test_multi_security_group_rules_delete(self):
|
|
||||||
arglist = []
|
|
||||||
verifylist = []
|
|
||||||
|
|
||||||
for s in self._security_group_rules:
|
|
||||||
arglist.append(s.id)
|
|
||||||
verifylist = [
|
|
||||||
('rule', arglist),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
result = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
calls = []
|
|
||||||
for s in self._security_group_rules:
|
|
||||||
calls.append(call(s.id))
|
|
||||||
self.compute.security_group_rules.delete.assert_has_calls(calls)
|
|
||||||
self.assertIsNone(result)
|
|
||||||
|
|
||||||
def test_multi_security_group_rules_delete_with_exception(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group_rules[0].id,
|
|
||||||
'unexist_rule',
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('rule',
|
|
||||||
[self._security_group_rules[0].id, 'unexist_rule']),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
find_mock_result = [None, exceptions.CommandError]
|
|
||||||
self.compute.security_group_rules.delete = (
|
|
||||||
mock.Mock(side_effect=find_mock_result)
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.cmd.take_action(parsed_args)
|
|
||||||
self.fail('CommandError should be raised.')
|
|
||||||
except exceptions.CommandError as e:
|
|
||||||
self.assertEqual('1 of 2 rules failed to delete.', str(e))
|
|
||||||
|
|
||||||
self.compute.security_group_rules.delete.assert_any_call(
|
|
||||||
self._security_group_rules[0].id)
|
|
||||||
self.compute.security_group_rules.delete.assert_any_call(
|
|
||||||
'unexist_rule')
|
|
||||||
|
|
||||||
|
|
||||||
class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||||
|
|
||||||
# The security group to hold the rules.
|
# The security group to hold the rules.
|
||||||
@ -1165,131 +801,6 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
|||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
|
|
||||||
class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|
||||||
|
|
||||||
# The security group to hold the rules.
|
|
||||||
_security_group = \
|
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
|
||||||
|
|
||||||
# The security group rule to be listed.
|
|
||||||
_security_group_rule_tcp = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
|
||||||
'ip_protocol': 'tcp',
|
|
||||||
'from_port': 80,
|
|
||||||
'to_port': 80,
|
|
||||||
'group': {'name': _security_group.name},
|
|
||||||
})
|
|
||||||
_security_group_rule_icmp = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
|
||||||
'ip_protocol': 'icmp',
|
|
||||||
'from_port': -1,
|
|
||||||
'to_port': -1,
|
|
||||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
|
||||||
'group': {'name': _security_group.name},
|
|
||||||
})
|
|
||||||
_security_group.rules = [_security_group_rule_tcp._info,
|
|
||||||
_security_group_rule_icmp._info]
|
|
||||||
|
|
||||||
expected_columns_with_group = (
|
|
||||||
'ID',
|
|
||||||
'IP Protocol',
|
|
||||||
'IP Range',
|
|
||||||
'Port Range',
|
|
||||||
'Remote Security Group',
|
|
||||||
)
|
|
||||||
expected_columns_no_group = \
|
|
||||||
expected_columns_with_group + ('Security Group',)
|
|
||||||
|
|
||||||
expected_data_with_group = []
|
|
||||||
expected_data_no_group = []
|
|
||||||
for _security_group_rule in _security_group.rules:
|
|
||||||
rule = network_utils.transform_compute_security_group_rule(
|
|
||||||
_security_group_rule
|
|
||||||
)
|
|
||||||
expected_rule_with_group = (
|
|
||||||
rule['id'],
|
|
||||||
rule['ip_protocol'],
|
|
||||||
rule['ip_range'],
|
|
||||||
rule['port_range'],
|
|
||||||
rule['remote_security_group'],
|
|
||||||
)
|
|
||||||
expected_rule_no_group = expected_rule_with_group + \
|
|
||||||
(_security_group_rule['parent_group_id'],)
|
|
||||||
expected_data_with_group.append(expected_rule_with_group)
|
|
||||||
expected_data_no_group.append(expected_rule_no_group)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestListSecurityGroupRuleCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = \
|
|
||||||
self._security_group
|
|
||||||
self.compute.security_groups.list.return_value = \
|
|
||||||
[self._security_group]
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
|
|
||||||
|
|
||||||
def test_list_default(self):
|
|
||||||
parsed_args = self.check_parser(self.cmd, [], [])
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
|
||||||
search_opts={'all_tenants': False}
|
|
||||||
)
|
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
|
||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
|
||||||
|
|
||||||
def test_list_with_group(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('group', self._security_group.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
self.compute.security_groups.get.assert_called_once_with(
|
|
||||||
self._security_group.id
|
|
||||||
)
|
|
||||||
self.assertEqual(self.expected_columns_with_group, columns)
|
|
||||||
self.assertEqual(self.expected_data_with_group, list(data))
|
|
||||||
|
|
||||||
def test_list_all_projects(self):
|
|
||||||
arglist = [
|
|
||||||
'--all-projects',
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('all_projects', True),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
|
||||||
search_opts={'all_tenants': True}
|
|
||||||
)
|
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
|
||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
|
||||||
|
|
||||||
def test_list_with_ignored_options(self):
|
|
||||||
arglist = [
|
|
||||||
'--long',
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('long', False),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
|
||||||
search_opts={'all_tenants': False}
|
|
||||||
)
|
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
|
||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
|
||||||
|
|
||||||
|
|
||||||
class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
||||||
|
|
||||||
# The security group rule to be shown.
|
# The security group rule to be shown.
|
||||||
@ -1353,49 +864,3 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
|
|||||||
self._security_group_rule.id, ignore_missing=False)
|
self._security_group_rule.id, ignore_missing=False)
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
|
||||||
class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|
||||||
|
|
||||||
# The security group rule to be shown.
|
|
||||||
_security_group_rule = \
|
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
|
|
||||||
|
|
||||||
columns, data = \
|
|
||||||
security_group_rule._format_security_group_rule_show(
|
|
||||||
_security_group_rule._info)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestShowSecurityGroupRuleCompute, self).setUp()
|
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
|
||||||
|
|
||||||
# Build a security group fake customized for this test.
|
|
||||||
security_group_rules = [self._security_group_rule._info]
|
|
||||||
security_group = fakes.FakeResource(
|
|
||||||
info=copy.deepcopy({'rules': security_group_rules}),
|
|
||||||
loaded=True)
|
|
||||||
security_group.rules = security_group_rules
|
|
||||||
self.compute.security_groups.list.return_value = [security_group]
|
|
||||||
|
|
||||||
# Get the command object to test
|
|
||||||
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
|
|
||||||
|
|
||||||
def test_show_no_options(self):
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, [], [])
|
|
||||||
|
|
||||||
def test_show_all_options(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group_rule.id,
|
|
||||||
]
|
|
||||||
verifylist = [
|
|
||||||
('rule', self._security_group_rule.id),
|
|
||||||
]
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
|
||||||
|
|
||||||
self.compute.security_groups.list.assert_called_once_with()
|
|
||||||
self.assertEqual(self.columns, columns)
|
|
||||||
self.assertEqual(self.data, data)
|
|
Loading…
x
Reference in New Issue
Block a user