Refactor security group list to use SDK

Refactored the 'os security group list' command to use the SDK
when neutron is enabled, but continue to use the nova client
when nova network is enabled.

This refactor also removes the logic for displaying project names
instead of project IDs when the --all-projects option is specified.
This logic was removed because it is inconsistent with the other
network commands.

Since neutron will always display security groups across all
projects for an admin, the --all-projects option is now hidden
when neutron is enabled and the Project column is always
displayed.

Change-Id: I934a1f5084ef3c5f929d0ffd38ebf5064d799941
Partial-Bug: #1519511
Related-to: blueprint neutron-client
This commit is contained in:
Richard Theis 2016-02-09 07:21:01 -06:00
parent 5310cfb8b7
commit 842882f3cb
8 changed files with 267 additions and 126 deletions

View File

@ -54,6 +54,9 @@ List security groups
Display information from all projects (admin only)
*Network version 2 ignores this option and will always display information*
*for all projects.*
security group set
------------------

View File

@ -18,8 +18,6 @@
import six
from keystoneauth1 import exceptions as ks_exc
try:
from novaclient.v2 import security_group_rules
except ImportError:
@ -169,58 +167,6 @@ class CreateSecurityGroupRule(command.ShowOne):
return zip(*sorted(six.iteritems(info)))
class ListSecurityGroup(command.Lister):
"""List security groups"""
def get_parser(self, prog_name):
parser = super(ListSecurityGroup, self).get_parser(prog_name)
parser.add_argument(
'--all-projects',
action='store_true',
default=False,
help='Display information from all projects (admin only)',
)
return parser
def take_action(self, parsed_args):
def _get_project(project_id):
try:
return getattr(project_hash[project_id], 'name', project_id)
except KeyError:
return project_id
compute_client = self.app.client_manager.compute
columns = (
"ID",
"Name",
"Description",
)
column_headers = columns
if parsed_args.all_projects:
# TODO(dtroyer): Translate Project_ID to Project (name)
columns = columns + ('Tenant ID',)
column_headers = column_headers + ('Project',)
search = {'all_tenants': parsed_args.all_projects}
data = compute_client.security_groups.list(search_opts=search)
project_hash = {}
try:
projects = self.app.client_manager.identity.projects.list()
except ks_exc.ClientException:
# This fails when the user is not an admin, just move along
pass
else:
for project in projects:
project_hash[project.id] = project
return (column_headers,
(utils.get_item_properties(
s, columns,
formatters={'Tenant ID': _get_project},
) for s in data))
class ListSecurityGroupRule(command.Lister):
"""List security group rules"""

View File

@ -13,6 +13,8 @@
"""Security Group action implementations"""
import argparse
from openstackclient.common import utils
from openstackclient.network import common
@ -38,3 +40,51 @@ class DeleteSecurityGroup(common.NetworkAndComputeCommand):
parsed_args.group,
)
client.security_groups.delete(data.id)
class ListSecurityGroup(common.NetworkAndComputeLister):
"""List security groups"""
def update_parser_network(self, parser):
# Maintain and hide the argument for backwards compatibility.
# Network will always return all projects for an admin.
parser.add_argument(
'--all-projects',
action='store_true',
default=False,
help=argparse.SUPPRESS,
)
return parser
def update_parser_compute(self, parser):
parser.add_argument(
'--all-projects',
action='store_true',
default=False,
help='Display information from all projects (admin only)',
)
return parser
def _get_return_data(self, data, include_project=True):
columns = (
"ID",
"Name",
"Description",
)
column_headers = columns
if include_project:
columns = columns + ('Tenant ID',)
column_headers = column_headers + ('Project',)
return (column_headers,
(utils.get_item_properties(
s, columns,
) for s in data))
def take_action_network(self, client, parsed_args):
return self._get_return_data(client.security_groups())
def take_action_compute(self, client, parsed_args):
search = {'all_tenants': parsed_args.all_projects}
data = client.security_groups.list(search_opts=search)
return self._get_return_data(data,
include_project=parsed_args.all_projects)

View File

@ -298,6 +298,70 @@ class FakehypervisorStats(object):
return hypervisors
class FakeSecurityGroup(object):
"""Fake one or more security groups."""
@staticmethod
def create_one_security_group(attrs=None, methods=None):
"""Create a fake security group.
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:return:
A FakeResource object, with id, name, etc.
"""
if attrs is None:
attrs = {}
if methods is None:
methods = {}
# Set default attributes.
security_group_attrs = {
'id': 'security-group-id-' + uuid.uuid4().hex,
'name': 'security-group-name-' + uuid.uuid4().hex,
'description': 'security-group-description-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
'rules': [],
}
# Overwrite default attributes.
security_group_attrs.update(attrs)
# Set default methods.
security_group_methods = {}
# Overwrite default methods.
security_group_methods.update(methods)
security_group = fakes.FakeResource(
info=copy.deepcopy(security_group_attrs),
methods=copy.deepcopy(security_group_methods),
loaded=True)
return security_group
@staticmethod
def create_security_groups(attrs=None, methods=None, count=2):
"""Create multiple fake security groups.
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:param int count:
The number of security groups to fake
:return:
A list of FakeResource objects faking the security groups
"""
security_groups = []
for i in range(0, count):
security_groups.append(
FakeSecurityGroup.create_one_security_group(attrs, methods))
return security_groups
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""

View File

@ -125,65 +125,3 @@ class TestSecurityGroupCreate(TestSecurityGroup):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
class TestSecurityGroupList(TestSecurityGroup):
def setUp(self):
super(TestSecurityGroupList, self).setUp()
self.secgroups_mock.list.return_value = [
FakeSecurityGroupResource(
None,
copy.deepcopy(SECURITY_GROUP),
loaded=True,
),
]
# Get the command object to test
self.cmd = security_group.ListSecurityGroup(self.app, None)
def test_security_group_list_no_options(self):
self.projects_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
),
]
arglist = []
verifylist = [
('all_projects', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# In base command class Lister in cliff, abstract method take_action()
# returns a tuple containing the column names and an iterable
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'search_opts': {
'all_tenants': False,
},
}
self.secgroups_mock.list.assert_called_with(
**kwargs
)
collist = (
'ID',
'Name',
'Description',
)
self.assertEqual(collist, columns)
datalist = ((
security_group_id,
security_group_name,
security_group_description,
), )
self.assertEqual(datalist, tuple(data))

View File

@ -14,24 +14,29 @@
import mock
from openstackclient.network.v2 import security_group
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests.network.v2 import fakes as network_fakes
class TestSecurityGroup(network_fakes.TestNetworkV2):
class TestSecurityGroupNetwork(network_fakes.TestNetworkV2):
def setUp(self):
super(TestSecurityGroup, self).setUp()
super(TestSecurityGroupNetwork, self).setUp()
# Get a shortcut to the network client
self.network = self.app.client_manager.network
# Create compute client mocks.
self.app.client_manager.compute = mock.Mock()
class TestSecurityGroupCompute(compute_fakes.TestComputev2):
def setUp(self):
super(TestSecurityGroupCompute, self).setUp()
# Get a shortcut to the compute client
self.compute = self.app.client_manager.compute
self.compute.security_groups = mock.Mock()
class TestDeleteSecurityGroupNetwork(TestSecurityGroup):
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group to be deleted.
_security_group = \
@ -64,11 +69,11 @@ class TestDeleteSecurityGroupNetwork(TestSecurityGroup):
self.assertIsNone(result)
class TestDeleteSecurityGroupCompute(TestSecurityGroup):
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
# The security group to be deleted.
_security_group = \
network_fakes.FakeSecurityGroup.create_one_security_group()
compute_fakes.FakeSecurityGroup.create_one_security_group()
def setUp(self):
super(TestDeleteSecurityGroupCompute, self).setUp()
@ -81,7 +86,7 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroup):
return_value=self._security_group)
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace)
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
def test_security_group_delete(self):
arglist = [
@ -97,3 +102,131 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroup):
self.compute.security_groups.delete.assert_called_with(
self._security_group.id)
self.assertIsNone(result)
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
# The security group to be listed.
_security_group = \
network_fakes.FakeSecurityGroup.create_one_security_group()
expected_columns = (
'ID',
'Name',
'Description',
'Project',
)
expected_data = ((
_security_group.id,
_security_group.name,
_security_group.description,
_security_group.tenant_id,
),)
def setUp(self):
super(TestListSecurityGroupNetwork, self).setUp()
self.network.security_groups = mock.Mock(
return_value=[self._security_group])
# Get the command object to test
self.cmd = security_group.ListSecurityGroup(self.app, self.namespace)
def test_security_group_list_no_options(self):
arglist = []
verifylist = [
('all_projects', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.security_groups.assert_called_with()
self.assertEqual(self.expected_columns, columns)
self.assertEqual(self.expected_data, tuple(data))
def test_security_group_list_all_projects(self):
arglist = [
'--all-projects',
]
verifylist = [
('all_projects', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.security_groups.assert_called_with()
self.assertEqual(self.expected_columns, columns)
self.assertEqual(self.expected_data, tuple(data))
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
# The security group to be listed.
_security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
expected_columns = (
'ID',
'Name',
'Description',
)
expected_columns_all_projects = (
'ID',
'Name',
'Description',
'Project',
)
expected_data = ((
_security_group.id,
_security_group.name,
_security_group.description,
),)
expected_data_all_projects = ((
_security_group.id,
_security_group.name,
_security_group.description,
_security_group.tenant_id,
),)
def setUp(self):
super(TestListSecurityGroupCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
self.compute.security_groups.list.return_value = [self._security_group]
# Get the command object to test
self.cmd = security_group.ListSecurityGroup(self.app, None)
def test_security_group_list_no_options(self):
arglist = []
verifylist = [
('all_projects', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': False}}
self.compute.security_groups.list.assert_called_with(**kwargs)
self.assertEqual(self.expected_columns, columns)
self.assertEqual(self.expected_data, tuple(data))
def test_security_group_list_all_projects(self):
arglist = [
'--all-projects',
]
verifylist = [
('all_projects', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': True}}
self.compute.security_groups.list.assert_called_with(**kwargs)
self.assertEqual(self.expected_columns_all_projects, columns)
self.assertEqual(self.expected_data_all_projects, tuple(data))

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Ignore the ``security group list`` command ``--all-projects`` option
for Network v2 since security groups will be displayed for all projects
by default (admin only).
[Bug `1519511 <https://bugs.launchpad.net/bugs/1519511>`_]

View File

@ -100,7 +100,6 @@ openstack.compute.v2 =
keypair_show = openstackclient.compute.v2.keypair:ShowKeypair
security_group_create = openstackclient.compute.v2.security_group:CreateSecurityGroup
security_group_list = openstackclient.compute.v2.security_group:ListSecurityGroup
security_group_set = openstackclient.compute.v2.security_group:SetSecurityGroup
security_group_show = openstackclient.compute.v2.security_group:ShowSecurityGroup
security_group_rule_create = openstackclient.compute.v2.security_group:CreateSecurityGroupRule
@ -340,6 +339,7 @@ openstack.network.v2 =
router_set = openstackclient.network.v2.router:SetRouter
router_show = openstackclient.network.v2.router:ShowRouter
security_group_delete = openstackclient.network.v2.security_group:DeleteSecurityGroup
security_group_list = openstackclient.network.v2.security_group:ListSecurityGroup
security_group_rule_delete = openstackclient.network.v2.security_group_rule:DeleteSecurityGroupRule
security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule
subnet_list = openstackclient.network.v2.subnet:ListSubnet