Enable use of service user token with admin context

When the [service_user] section is configured in nova.conf, nova will
have the ability to send a service user token alongside the user's
token. The service user token is sent when nova calls other services'
REST APIs to authenticate as a service, and service calls can sometimes
have elevated privileges.

Currently, nova does not however have the ability to send a service user
token with an admin context. This means that when nova makes REST API
calls to other services with an anonymous admin RequestContext (such as
in nova-manage or periodic tasks), it will not be authenticated as a
service.

This adds a keyword argument to service_auth.get_auth_plugin() to
enable callers to provide a user_auth object instead of attempting to
extract the user_auth from the RequestContext.

The cinder and neutron client modules are also adjusted to make use of
the new user_auth keyword argument so that nova calls made with
anonymous admin request contexts can authenticate as a service when
configured.

Related-Bug: #2004555

Change-Id: I14df2d55f4b2f0be58f1a6ad3f19e48f7a6bfcb4
(cherry picked from commit 41c64b94b0)
This commit is contained in:
melanie witt 2023-05-09 03:11:25 +00:00
parent efb01985db
commit 1f781423ee
6 changed files with 51 additions and 8 deletions

View File

@ -222,13 +222,15 @@ def _get_auth_plugin(context, admin=False):
# support some services (metadata API) where an admin context is used
# without an auth token.
global _ADMIN_AUTH
user_auth = None
if admin or (context.is_admin and not context.auth_token):
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
return _ADMIN_AUTH
user_auth = _ADMIN_AUTH
if context.auth_token:
return service_auth.get_auth_plugin(context)
if context.auth_token or user_auth:
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
# We did not get a user token and we should not be using
# an admin token so log an error

View File

@ -30,8 +30,10 @@ def reset_globals():
_SERVICE_AUTH = None
def get_auth_plugin(context):
user_auth = context.get_auth_plugin()
def get_auth_plugin(context, user_auth=None):
# user_auth may be passed in when the RequestContext is anonymous, such as
# when get_admin_context() is used for API calls by nova-manage.
user_auth = user_auth or context.get_auth_plugin()
if CONF.service_user.send_service_user_token:
global _SERVICE_AUTH

View File

@ -142,6 +142,22 @@ class TestNeutronClient(test.NoDBTestCase):
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.network.neutron._ADMIN_AUTH')
@mock.patch.object(ks_loading, 'load_auth_from_conf_options')
def test_admin_with_service_token(
self, mock_load, mock_admin_auth, mock_service_auth
):
self.flags(send_service_user_token=True, group='service_user')
admin_context = context.get_admin_context()
cl = neutronapi.get_client(admin_context)
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
self.assertEqual(mock_admin_auth, cl.httpclient.auth.user_auth)
self.assertEqual(mock_service_auth, cl.httpclient.auth.service_auth)
@mock.patch.object(client.Client, "list_networks",
side_effect=exceptions.Unauthorized())
def test_Unauthorized_user(self, mock_list_networks):

View File

@ -56,3 +56,13 @@ class ServiceAuthTestCase(test.NoDBTestCase):
result = service_auth.get_auth_plugin(self.ctx)
self.assertEqual(1, mock_load.call_count)
self.assertNotIsInstance(result, service_token.ServiceTokenAuthWrapper)
@mock.patch.object(ks_loading, 'load_auth_from_conf_options',
new=mock.Mock())
def test_get_auth_plugin_user_auth(self):
self.flags(send_service_user_token=True, group='service_user')
user_auth = mock.Mock()
result = service_auth.get_auth_plugin(self.ctx, user_auth=user_auth)
self.assertEqual(user_auth, result.user_auth)

View File

@ -1276,3 +1276,14 @@ class CinderClientTestCase(test.NoDBTestCase):
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(params[0], mock_admin_auth)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.volume.cinder._ADMIN_AUTH')
def test_admin_context_without_user_token_but_with_service_token(
self, mock_admin_auth, mock_service_auth
):
self.flags(send_service_user_token=True, group='service_user')
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(mock_admin_auth, params[0].user_auth)
self.assertEqual(mock_service_auth, params[0].service_auth)

View File

@ -91,12 +91,14 @@ def _get_auth(context):
# from them generated from 'context.get_admin_context'
# which only set is_admin=True but is without token.
# So add load_auth_plugin when this condition appear.
user_auth = None
if context.is_admin and not context.auth_token:
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
return _ADMIN_AUTH
else:
return service_auth.get_auth_plugin(context)
user_auth = _ADMIN_AUTH
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
# NOTE(efried): Bug #1752152