Fix Neutron Authentication for Metadata Service
A recent change to the way Nova creates a Neutron client 51e5f52e4cb60e266ccde71f205c91eb8c97b48b changed the conditions under which it re-authenticates using the neutron admin credentials from "if admin" to "if admin or context.is_admin". However this meant that any user with admin role in Nova interacted with Neutron as a different tenant, preventing them from non-admin actions such as allocating floating IPS. This was then addressed by 1c1371c78b990447aeaa4377b512f8887e6ff3ce which only reauthenticated as admin on the explicit use of "admin=True" parameter to neutonv2.get_client(). However the metadata service does not explicitly create the neutron client so it has to pass in a context created by context.get_admin_context() which has is_admin=True but no auth_token. This change therefore accepts this combination of context values as a valid reason to re-authenticate as admin, while still allowing users with the admin role to use their own token. As a tidy up two tests for getting an admin client are moved from TestNeutonClient to join the other admin client tests in TestNeutronClientForAdminScenarios Change-Id: Ic2a6f383a85f6bb606d8d65feaefa0d23329adb9 Closes-Bug: 1255577
This commit is contained in:
parent
271e21868b
commit
652620d12f
@ -47,14 +47,19 @@ def _get_client(token=None):
|
||||
|
||||
|
||||
def get_client(context, admin=False):
|
||||
# NOTE(dims): We need to use admin token, let us cache a
|
||||
# thread local copy for re-using this client
|
||||
# multiple times and to avoid excessive calls
|
||||
# to neutron to fetch tokens. Some of the hackiness in this code
|
||||
# will go away once BP auth-plugins is implemented.
|
||||
# That blue print will ensure that tokens can be shared
|
||||
# across clients as well
|
||||
if admin:
|
||||
# NOTE(dprince): In the case where no auth_token is present
|
||||
# we allow use of neutron admin tenant credentials if
|
||||
# it is an admin context.
|
||||
# This is to support some services (metadata API) where
|
||||
# an admin context is used without an auth token.
|
||||
if admin or (context.is_admin and not context.auth_token):
|
||||
# NOTE(dims): We need to use admin token, let us cache a
|
||||
# thread local copy for re-using this client
|
||||
# multiple times and to avoid excessive calls
|
||||
# to neutron to fetch tokens. Some of the hackiness in this code
|
||||
# will go away once BP auth-plugins is implemented.
|
||||
# That blue print will ensure that tokens can be shared
|
||||
# across clients as well
|
||||
if not hasattr(local.strong_store, 'neutron_client'):
|
||||
local.strong_store.neutron_client = _get_client(token=None)
|
||||
return local.strong_store.neutron_client
|
||||
|
@ -136,16 +136,6 @@ class TestNeutronClient(test.TestCase):
|
||||
# our own token
|
||||
neutronv2.get_client(my_context)
|
||||
|
||||
def test_withouttoken_context_is_admin(self):
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
is_admin=True)
|
||||
# Note that although we have admin set in the context we
|
||||
# are not asking for an admin client, and so we auth with
|
||||
# our own token - which is null, and so this is an error
|
||||
self.assertRaises(exceptions.Unauthorized,
|
||||
neutronv2.get_client,
|
||||
my_context)
|
||||
|
||||
def test_withouttoken_keystone_connection_error(self):
|
||||
self.flags(neutron_auth_strategy='keystone')
|
||||
self.flags(neutron_url='http://anyhost/')
|
||||
@ -154,30 +144,6 @@ class TestNeutronClient(test.TestCase):
|
||||
neutronv2.get_client,
|
||||
my_context)
|
||||
|
||||
def test_admin(self):
|
||||
self.flags(neutron_auth_strategy=None)
|
||||
self.flags(neutron_url='http://anyhost/')
|
||||
self.flags(neutron_url_timeout=30)
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
auth_token='token')
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
client.Client.__init__(
|
||||
auth_url=CONF.neutron_admin_auth_url,
|
||||
password=CONF.neutron_admin_password,
|
||||
tenant_name=CONF.neutron_admin_tenant_name,
|
||||
username=CONF.neutron_admin_username,
|
||||
endpoint_url=CONF.neutron_url,
|
||||
auth_strategy=None,
|
||||
timeout=CONF.neutron_url_timeout,
|
||||
insecure=False,
|
||||
ca_cert=None).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
# Note that the context is not elevated, but the True is passed in
|
||||
# which will force an elevation to admin credentials even though
|
||||
# the context has an auth_token
|
||||
neutronv2.get_client(my_context, True)
|
||||
|
||||
|
||||
class TestNeutronv2Base(test.TestCase):
|
||||
|
||||
@ -1909,3 +1875,60 @@ class TestNeutronClientForAdminScenarios(test.TestCase):
|
||||
self.assertRaises(exceptions.Unauthorized,
|
||||
neutronv2.get_client,
|
||||
my_context)
|
||||
|
||||
def test_get_client_for_admin(self):
|
||||
|
||||
self.flags(neutron_auth_strategy=None)
|
||||
self.flags(neutron_url='http://anyhost/')
|
||||
self.flags(neutron_url_timeout=30)
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
auth_token='token')
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
client.Client.__init__(
|
||||
auth_url=CONF.neutron_admin_auth_url,
|
||||
password=CONF.neutron_admin_password,
|
||||
tenant_name=CONF.neutron_admin_tenant_name,
|
||||
username=CONF.neutron_admin_username,
|
||||
endpoint_url=CONF.neutron_url,
|
||||
auth_strategy=None,
|
||||
timeout=CONF.neutron_url_timeout,
|
||||
insecure=False,
|
||||
ca_cert=None).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
# clear the cache
|
||||
if hasattr(local.strong_store, 'neutron_client'):
|
||||
delattr(local.strong_store, 'neutron_client')
|
||||
|
||||
# Note that the context is not elevated, but the True is passed in
|
||||
# which will force an elevation to admin credentials even though
|
||||
# the context has an auth_token.
|
||||
neutronv2.get_client(my_context, True)
|
||||
|
||||
def test_get_client_for_admin_context(self):
|
||||
|
||||
self.flags(neutron_auth_strategy=None)
|
||||
self.flags(neutron_url='http://anyhost/')
|
||||
self.flags(neutron_url_timeout=30)
|
||||
my_context = context.get_admin_context()
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
client.Client.__init__(
|
||||
auth_url=CONF.neutron_admin_auth_url,
|
||||
password=CONF.neutron_admin_password,
|
||||
tenant_name=CONF.neutron_admin_tenant_name,
|
||||
username=CONF.neutron_admin_username,
|
||||
endpoint_url=CONF.neutron_url,
|
||||
auth_strategy=None,
|
||||
timeout=CONF.neutron_url_timeout,
|
||||
insecure=False,
|
||||
ca_cert=None).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
# clear the cache
|
||||
if hasattr(local.strong_store, 'neutron_client'):
|
||||
delattr(local.strong_store, 'neutron_client')
|
||||
|
||||
# Note that the context does not contain a token but is
|
||||
# an admin context which will force an elevation to admin
|
||||
# credentials.
|
||||
neutronv2.get_client(my_context)
|
||||
|
Loading…
x
Reference in New Issue
Block a user