Show tenant_id in *-list by admin

If API call is done by admin, neutron API returns all resources
from all tenants. We received several times a request that
tenant_id is useful in such case.

This patch displays tenant_id column when *-list is invoked
by a user with 'admin' role by default. If columns to be displayed
are specified by '-c' option, the specified columns are honored
and tenant_id columns is not added.

Add functional tests to cover this change. This feature depends
on keystoneauth1 and unit tests cannot cover this change.

Closes-Bug: #1526238
Change-Id: Id58a0b795f76b4fe24aaf89f1f6486c23f19a553
This commit is contained in:
Akihiro Motoki 2016-01-09 13:18:36 +09:00
parent a1f75e3abb
commit 9dbdced729
8 changed files with 97 additions and 11 deletions

@ -271,6 +271,9 @@ class HTTPClient(object):
'auth_user_id': self.auth_user_id,
'endpoint_url': self.endpoint_url}
def get_auth_ref(self):
return getattr(self, 'auth_ref', None)
class SessionClient(adapter.Adapter):
@ -341,6 +344,9 @@ class SessionClient(adapter.Adapter):
return auth_info
def get_auth_ref(self):
return self.session.auth.get_auth_ref(self.session)
# FIXME(bklei): Should refactor this to use kwargs and only
# explicitly list arguments that are not None.

@ -681,7 +681,8 @@ class ListCommand(NeutronCommand, lister.Lister):
# if no -c(s) by user and list_columns, we use columns in
# both list_columns and returned resource.
# Also Keep their order the same as in list_columns
_columns = [x for x in self.list_columns if x in _columns]
_columns = self._setup_columns_with_tenant_id(self.list_columns,
_columns)
formatters = self._formatters
if hasattr(self, '_formatters_csv') and parsed_args.formatter == 'csv':
@ -691,6 +692,32 @@ class ListCommand(NeutronCommand, lister.Lister):
s, _columns, formatters=formatters, )
for s in info), )
def _setup_columns_with_tenant_id(self, display_columns, avail_columns):
_columns = [x for x in display_columns if x in avail_columns]
if 'tenant_id' in display_columns:
return _columns
if 'tenant_id' not in avail_columns:
return _columns
if not self.is_admin_role():
return _columns
try:
pos_id = _columns.index('id')
except ValueError:
pos_id = 0
try:
pos_name = _columns.index('name')
except ValueError:
pos_name = 0
_columns.insert(max(pos_id, pos_name) + 1, 'tenant_id')
return _columns
def is_admin_role(self):
client = self.get_client()
auth_ref = client.httpclient.get_auth_ref()
if not auth_ref:
return False
return 'admin' in auth_ref.role_names
def take_action(self, parsed_args):
self.log.debug('run(%s)', parsed_args)
self.set_extra_attrs(parsed_args)

@ -47,19 +47,28 @@ class ClientTestBase(base.ClientTestBase):
"""
def _get_clients(self):
self.creds = credentials()
def _get_clients_from_os_cloud_config(self, cloud='devstack-admin'):
creds = credentials(cloud)
cli_dir = os.environ.get(
'OS_NEUTRONCLIENT_EXEC_DIR',
os.path.join(os.path.abspath('.'), '.tox/functional/bin'))
return base.CLIClient(
username=self.creds['username'],
password=self.creds['password'],
tenant_name=self.creds['project_name'],
uri=self.creds['auth_url'],
username=creds['username'],
password=creds['password'],
tenant_name=creds['project_name'],
uri=creds['auth_url'],
cli_dir=cli_dir)
def _get_clients(self):
return self._get_clients_from_os_cloud_config()
def neutron(self, *args, **kwargs):
return self.clients.neutron(*args,
**kwargs)
def neutron_non_admin(self, *args, **kwargs):
if not hasattr(self, '_non_admin_clients'):
self._non_admin_clients = self._get_clients_from_os_cloud_config(
cloud='devstack')
return self._non_admin_clients.neutron(*args, **kwargs)

@ -91,6 +91,13 @@ class LibraryTestCase(object):
with testtools.ExpectedException(exceptions.NetworkNotFoundClient):
self.client.show_network(net_id)
def test_get_auth_ref(self):
# Call some API call to ensure the client is authenticated.
self.client.list_networks()
auth_ref = self.client.httpclient.get_auth_ref()
self.assertIsNotNone(auth_ref)
self.assertIsNotNone(auth_ref.role_names)
class LibraryHTTPClientTest(LibraryTestCase, Libv2HTTPClientTestBase):
pass

@ -0,0 +1,33 @@
# Copyright 2016 NEC Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.tests.functional import base
class CLICommonFeatureTest(base.ClientTestBase):
def test_tenant_id_shown_in_list_by_admin(self):
nets = self.parser.table(self.neutron('net-list'))
self.assertIn('tenant_id', nets['headers'])
def test_tenant_id_not_shown_in_list_with_columns(self):
nets = self.parser.table(self.neutron('net-list -c id -c name'))
self.assertNotIn('tenant_id', nets['headers'])
self.assertListEqual(['id', 'name'], nets['headers'])
def test_tenant_id_not_shown_in_list_by_non_admin(self):
output = self.neutron_non_admin('net-list')
self.assertNotIn('tenant_id', self.parser.table(output)['headers'])
self.assertTableStruct(self.parser.listing(output),
['id', 'name'])

@ -271,9 +271,8 @@ class CLITestV20NetworkJSON(test_cli20.CLITestV20Base):
cmd = network.ListNetwork(test_cli20.MyApp(sys.stdout), None)
self.mox.StubOutWithMock(cmd, 'get_client')
self.mox.StubOutWithMock(self.client.httpclient, 'request')
cmd.get_client().AndReturn(self.client)
cmd.get_client().MultipleTimes().AndReturn(self.client)
setup_list_stub('networks', data, '')
cmd.get_client().AndReturn(self.client)
filters = ''
for n in data:
for s in n['subnets']:

@ -383,13 +383,12 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
test_cli20.MyApp(sys.stdout), None)
self.mox.StubOutWithMock(cmd, 'get_client')
self.mox.StubOutWithMock(self.client.httpclient, 'request')
cmd.get_client().AndReturn(self.client)
cmd.get_client().MultipleTimes().AndReturn(self.client)
query = ''
if query_fields:
query = '&'.join(['fields=' + f for f in query_fields])
setup_list_stub('security_group_rules', api_data, query)
if conv:
cmd.get_client().AndReturn(self.client)
sec_ids = set()
for n in api_data:
sec_ids.add(n['security_group_id'])

@ -0,0 +1,6 @@
---
features:
- Show tenant_id when ``*-list`` command is run by admin. In neutron
the list operations by admin retrieve all resources from all tenants.
It is not easy to distinguish resources without tenant_id.
This feature is useful for admin operations.