Enable use of service user token with admin context

When the [service_user] section is configured in nova.conf, nova will
have the ability to send a service user token alongside the user's
token. The service user token is sent when nova calls other services'
REST APIs to authenticate as a service, and service calls can sometimes
have elevated privileges.

Currently, nova does not however have the ability to send a service user
token with an admin context. This means that when nova makes REST API
calls to other services with an anonymous admin RequestContext (such as
in nova-manage or periodic tasks), it will not be authenticated as a
service.

This adds a keyword argument to service_auth.get_auth_plugin() to
enable callers to provide a user_auth object instead of attempting to
extract the user_auth from the RequestContext.

The cinder and neutron client modules are also adjusted to make use of
the new user_auth keyword argument so that nova calls made with
anonymous admin request contexts can authenticate as a service when
configured.

Related-Bug: #2004555

Change-Id: I14df2d55f4b2f0be58f1a6ad3f19e48f7a6bfcb4
(cherry picked from commit 41c64b94b0)
(cherry picked from commit 1f781423ee)
(cherry picked from commit 0d6dd6c67f)
(cherry picked from commit 98c3e3707c)
(cherry picked from commit 6cc4e7fb9a)
This commit is contained in:
melanie witt 2023-05-09 03:11:25 +00:00
parent 5b4cb92aa8
commit 48150a6fba
6 changed files with 51 additions and 8 deletions

View File

@ -208,13 +208,15 @@ def _get_auth_plugin(context, admin=False):
# support some services (metadata API) where an admin context is used
# without an auth token.
global _ADMIN_AUTH
user_auth = None
if admin or (context.is_admin and not context.auth_token):
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
return _ADMIN_AUTH
user_auth = _ADMIN_AUTH
if context.auth_token:
return service_auth.get_auth_plugin(context)
if context.auth_token or user_auth:
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
# We did not get a user token and we should not be using
# an admin token so log an error

View File

@ -30,8 +30,10 @@ def reset_globals():
_SERVICE_AUTH = None
def get_auth_plugin(context):
user_auth = context.get_auth_plugin()
def get_auth_plugin(context, user_auth=None):
# user_auth may be passed in when the RequestContext is anonymous, such as
# when get_admin_context() is used for API calls by nova-manage.
user_auth = user_auth or context.get_auth_plugin()
if CONF.service_user.send_service_user_token:
global _SERVICE_AUTH

View File

@ -143,6 +143,22 @@ class TestNeutronClient(test.NoDBTestCase):
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.network.neutron._ADMIN_AUTH')
@mock.patch.object(ks_loading, 'load_auth_from_conf_options')
def test_admin_with_service_token(
self, mock_load, mock_admin_auth, mock_service_auth
):
self.flags(send_service_user_token=True, group='service_user')
admin_context = context.get_admin_context()
cl = neutronapi.get_client(admin_context)
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
self.assertEqual(mock_admin_auth, cl.httpclient.auth.user_auth)
self.assertEqual(mock_service_auth, cl.httpclient.auth.service_auth)
@mock.patch.object(client.Client, "list_networks",
side_effect=exceptions.Unauthorized())
def test_Unauthorized_user(self, mock_list_networks):

View File

@ -55,3 +55,13 @@ class ServiceAuthTestCase(test.NoDBTestCase):
result = service_auth.get_auth_plugin(self.ctx)
self.assertEqual(1, mock_load.call_count)
self.assertNotIsInstance(result, service_token.ServiceTokenAuthWrapper)
@mock.patch.object(ks_loading, 'load_auth_from_conf_options',
new=mock.Mock())
def test_get_auth_plugin_user_auth(self):
self.flags(send_service_user_token=True, group='service_user')
user_auth = mock.Mock()
result = service_auth.get_auth_plugin(self.ctx, user_auth=user_auth)
self.assertEqual(user_auth, result.user_auth)

View File

@ -1264,3 +1264,14 @@ class CinderClientTestCase(test.NoDBTestCase):
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(params[0], mock_admin_auth)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.volume.cinder._ADMIN_AUTH')
def test_admin_context_without_user_token_but_with_service_token(
self, mock_admin_auth, mock_service_auth
):
self.flags(send_service_user_token=True, group='service_user')
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(mock_admin_auth, params[0].user_auth)
self.assertEqual(mock_service_auth, params[0].service_auth)

View File

@ -91,12 +91,14 @@ def _get_auth(context):
# from them generated from 'context.get_admin_context'
# which only set is_admin=True but is without token.
# So add load_auth_plugin when this condition appear.
user_auth = None
if context.is_admin and not context.auth_token:
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
return _ADMIN_AUTH
else:
return service_auth.get_auth_plugin(context)
user_auth = _ADMIN_AUTH
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
# NOTE(efried): Bug #1752152