oci: Enable embedded authentication passing

For the OCI artifact retrieval case, to enable authentication to be
passed from the conductor (in the form of a bearer token), we need to
be able to handle the case where this data is present, and then
initiate the connection with the appropriate token.

Change-Id: I380b32671cbc3a640bc5012ac241a7244750d117
This commit is contained in:
Julia Kreger
2025-02-11 13:43:47 -08:00
parent a6d1921056
commit a132e167f4
2 changed files with 74 additions and 1 deletions

View File

@@ -72,6 +72,36 @@ def _verify_basic_auth_creds(user, password, image_id):
)
class SuppliedAuth(requests.auth.HTTPBasicAuth):
def __init__(self, authorization):
self.authorization = authorization
def __call__(self, r):
r.headers["Authorization"] = self.authorization
return r
def __eq__(self, other):
return all(
[
self.authorization == getattr(other, "authorization", None)
]
)
def __ne__(self, other):
return not self == other
def _load_supplied_authorization(image_info):
req_auth = image_info.get('image_request_authorization')
if req_auth:
req_auth = base64.standard_b64decode(req_auth).decode()
return SuppliedAuth(req_auth)
else:
return None
def _gen_auth_from_image_info_user_pass(image_info, image_id):
"""This function is used to pass the credentials to the chosen
@@ -166,7 +196,9 @@ def _download_with_proxy(image_info, url, image_id):
}
# NOTE(Adam) `image_info` is prioritized over `oslo.conf` for credential
# collection and auth strategy selection
auth_object = _gen_auth_from_image_info_user_pass(image_info, image_id)
auth_object = _load_supplied_authorization(image_info)
if auth_object is None:
auth_object = _gen_auth_from_image_info_user_pass(image_info, image_id)
if auth_object is None:
auth_object = _gen_auth_from_oslo_conf_user_pass(image_id)
if auth_object is not None:

View File

@@ -257,6 +257,17 @@ class TestStandbyExtension(base.IronicAgentTest):
standby._gen_auth_from_oslo_conf_user_pass(image_info['id'])
self.assertIsNone(return_auth)
def test_load_auth_header_from_image_info(self):
image_info = _build_fake_image_info()
image_info['image_request_authorization'] = b'QmVhcmVyIGYwMA=='
return_auth = standby._load_supplied_authorization(image_info)
self.assertEqual('Bearer f00', return_auth.authorization)
def test_load_auth_header_from_image_info_none(self):
image_info = _build_fake_image_info()
return_auth = standby._load_supplied_authorization(image_info)
self.assertIsNone(return_auth)
def test_verify_basic_auth_creds_empty_user(self):
image_info = _build_fake_image_info()
self.assertRaises(errors.ImageDownloadError,
@@ -694,6 +705,36 @@ class TestStandbyExtension(base.IronicAgentTest):
write.assert_any_call('content')
self.assertEqual(2, write.call_count)
@mock.patch('hashlib.new', autospec=True)
@mock.patch('builtins.open', autospec=True)
@mock.patch('requests.get', autospec=True)
def test_download_image_conductor_auth(self,
requests_mock,
open_mock,
hash_mock):
image_info = _build_fake_image_info()
image_info['image_request_authorization'] = b'QmVhcmVyIGYwMA=='
correct_auth = standby.SuppliedAuth('Bearer f00')
response = requests_mock.return_value
response.status_code = 200
response.iter_content.return_value = ['some', 'content']
file_mock = mock.Mock()
open_mock.return_value.__enter__.return_value = file_mock
file_mock.read.return_value = None
hexdigest_mock = hash_mock.return_value.hexdigest
hexdigest_mock.return_value = image_info['os_hash_value']
standby._download_image(image_info)
requests_mock.assert_called_once_with(image_info['urls'][0],
cert=None, verify=True,
stream=True, proxies={},
timeout=60, auth=correct_auth)
self.assertEqual('Bearer f00', correct_auth.authorization)
write = file_mock.write
write.assert_any_call('some')
write.assert_any_call('content')
self.assertEqual(2, write.call_count)
def test_download_image_bad_basic_auth_conf_credential(self):
self.config(image_download_connection_retry_interval=0)
image_info = _build_fake_image_info()