Fix SSRF vulnerabilities in image import API

Fixed Server-Side Request Forgery (SSRF) vulnerabilities in Glance's image
import functionality that could allow attackers to bypass URL validation
and access internal resources.

The fix includes:
- IP address validation using Python's ipaddress module to reject encoded
  IP formats (decimal, hexadecimal, octal) that could bypass blacklist checks
- HTTP redirect validation for web-download, glance-download, and OVF
  processing to prevent redirect-based SSRF attacks
- URI validation for OVF processing which previously had no protection

The implementation uses Python's built-in ipaddress module which inherently
rejects all non-standard IP encodings and only accepts standard formats,
providing robust protection against IP encoding bypass attacks.

Depends-On: https://review.opendev.org/c/openstack/tempest/+/981329

Assisted-by: Used Cursor (Auto) for unit tests.

Closes-Bug: #2138602
Closes-Bug: #2138672
Closes-Bug: #2138675
SecurityImpact

Conflicts:
    glance/tests/functional/v2/test_images.py
    glance/tests/unit/common/test_utils.py

Change-Id: Ib8d337dc68411d18c70d5712cc4f0986ef6205f4
Signed-off-by: Abhishek Kekane <akekane@redhat.com>
(cherry picked from commit 38040d086bf6c159f45d201f361a71e7a32e62af)
(cherry picked from commit e9cdb9ff2bc44677e028a370cb5606343227beb6)
Signed-off-by: Brian Rosmaita <rosmaita.fossdev@gmail.com>
This commit is contained in:
Abhishek Kekane
2026-01-20 19:02:08 +00:00
committed by Brian Rosmaita
parent d88f696b76
commit bb0d59fff7
13 changed files with 887 additions and 18 deletions

View File

@@ -24,6 +24,7 @@ from taskflow.patterns import linear_flow as lf
from glance.async_.flows._internal_plugins import base_download
from glance.async_ import utils
from glance.common import exception
from glance.common.scripts import utils as script_utils
from glance.common import utils as common_utils
from glance.i18n import _, _LI, _LE
@@ -66,7 +67,9 @@ class _DownloadGlanceImage(base_download.BaseDownload):
token = self.context.auth_token
request = urllib.request.Request(image_download_url,
headers={'X-Auth-Token': token})
data = urllib.request.urlopen(request)
opener = urllib.request.build_opener(
script_utils.SafeRedirectHandler)
data = opener.open(request)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(

View File

@@ -815,7 +815,9 @@ class _ImportMetadata(task.Task):
token = self.context.auth_token
request = urllib.request.Request(image_download_metadata_url,
headers={'X-Auth-Token': token})
with urllib.request.urlopen(request) as payload:
opener = urllib.request.build_opener(
script_utils.SafeRedirectHandler)
with opener.open(request) as payload:
data = json.loads(payload.read().decode('utf-8'))
if data.get('status') != 'active':

View File

@@ -18,6 +18,7 @@ import re
import shutil
import tarfile
import urllib
import urllib.request
from defusedxml import ElementTree as etree
@@ -27,6 +28,9 @@ from oslo_serialization import jsonutils as json
from taskflow.patterns import linear_flow as lf
from taskflow import task
from glance.common import exception
from glance.common.scripts import utils as script_utils
from glance.common import utils as common_utils
from glance.i18n import _, _LW
LOG = logging.getLogger(__name__)
@@ -75,7 +79,13 @@ class _OVF_Process(task.Task):
uri = uri.split("file://")[-1]
return open(uri, "rb")
return urllib.request.urlopen(uri)
if not common_utils.validate_import_uri(uri):
msg = (_("URI for OVF processing does not pass filtering: %s") %
uri)
raise exception.ImportTaskError(msg)
opener = urllib.request.build_opener(script_utils.SafeRedirectHandler)
return opener.open(uri)
def execute(self, image_id, file_path):
"""

View File

@@ -19,14 +19,18 @@ __all__ = [
'set_base_image_properties',
'validate_location_uri',
'get_image_data_iter',
'SafeRedirectHandler',
]
import urllib
import urllib.error
import urllib.request
from oslo_log import log as logging
from oslo_utils import timeutils
from glance.common import exception
from glance.common import utils as common_utils
from glance.i18n import _, _LE
LOG = logging.getLogger(__name__)
@@ -125,6 +129,15 @@ def validate_location_uri(location):
raise urllib.error.URLError(msg)
class SafeRedirectHandler(urllib.request.HTTPRedirectHandler):
"""HTTP redirect handler that validates redirect destinations."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
if not common_utils.validate_import_uri(newurl):
msg = (_("Redirect to disallowed URL: %s") % newurl)
raise exception.ImportTaskError(msg)
return super().redirect_request(req, fp, code, msg, headers, newurl)
def get_image_data_iter(uri):
"""Returns iterable object either for local file or uri
@@ -148,7 +161,8 @@ def get_image_data_iter(uri):
# into memory. Some images may be quite heavy.
return open(uri, "rb")
return urllib.request.urlopen(uri)
opener = urllib.request.build_opener(SafeRedirectHandler)
return opener.open(uri)
class CallbackIterator(object):

View File

@@ -21,6 +21,7 @@ System-level utilities and helper functions.
"""
import errno
import ipaddress
from eventlet.green import socket
@@ -126,6 +127,65 @@ CONF.import_group('import_filtering_opts',
'glance.async_.flows._internal_plugins')
def normalize_hostname(host):
"""Normalize IP address to standard format or return hostname.
Uses ipaddress module to validate and normalize IP addresses, rejecting
encoded formats. For hostnames, requires DNS resolution to ensure they
are valid and not encoded IP attempts.
:param host: hostname or IP address
:returns: normalized IP address, hostname unchanged, or None
"""
if not host:
return host
# NOTE(abhishekk): Try to parse as IPv4. ipaddress module only accepts
# standard format like 127.0.0.1. It rejects encoded formats like
# decimal (2130706433), hex (0x7f000001), or octal (017700000001).
try:
return str(ipaddress.IPv4Address(host))
except ValueError:
pass
# NOTE(abhishekk): Try to parse as IPv6. ipaddress module only accepts
# standard IPv6 format and rejects encoded formats.
try:
return str(ipaddress.IPv6Address(host))
except ValueError:
pass
# NOTE(abhishekk): Not valid IP address, check as hostname. Reject pure
# numeric strings like "2130706433" (decimal encoded IP). ipaddress module
# rejected it, but OS might still resolve using inet_aton() if not blocked.
if host.isdigit():
return None
# NOTE(abhishekk): Reject all numeric strings with dots like "127.1" or
# "10.1". These are shorthand IP addresses. ipaddress module rejects them
# because they need 4 octets, but OS may still resolve them. We block to
# prevent SSRF bypass attacks.
if all(c.isdigit() or c == '.' for c in host):
return None
# NOTE(abhishekk): Add trailing dot to force DNS lookup instead of numeric
# parsing. This blocks encoded IP formats like 0x7f000001 or 127.0x0.0.1
# because they fail DNS lookup. Only real hostnames that resolve via DNS
# are allowed.
testhost = host
if not testhost.endswith('.'):
testhost += '.'
try:
socket.getaddrinfo(testhost, 80)
except socket.gaierror:
# NOTE(abhishekk): DNS resolution failed, reject the hostname
return None
# NOTE(abhishekk): Valid and resolvable hostname, return unchanged
return host
def validate_import_uri(uri):
"""Validate requested uri for Image Import web-download.
@@ -163,9 +223,14 @@ def validate_import_uri(uri):
if not scheme or ((wl_schemes and scheme not in wl_schemes) or
parsed_uri.scheme in bl_schemes):
return False
if not host or ((wl_hosts and host not in wl_hosts) or
host in bl_hosts):
normalized_host = normalize_hostname(host)
if not normalized_host or (
(wl_hosts and normalized_host not in wl_hosts) or
normalized_host in bl_hosts):
return False
if port and ((wl_ports and port not in wl_ports) or
port in bl_ports):
return False

View File

@@ -15,9 +15,11 @@
import hashlib
import http.client as http
import http.server as http_server
import os
import subprocess
import tempfile
import threading
import time
import urllib
import uuid
@@ -389,6 +391,164 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_web_download_redirect_validation(self):
"""Test that redirect destinations are validated."""
self.config(allowed_ports=[80], group='import_filtering_opts')
self.config(disallowed_hosts=['127.0.0.1'],
group='import_filtering_opts')
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json'})
data = jsonutils.dumps({
'name': 'redirect-test', 'type': 'kernel',
'disk_format': 'aki', 'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.CREATED, response.status_code)
image = jsonutils.loads(response.text)
image_id = image['id']
# Try to import with redirect to disallowed host
# The redirect destination should be validated and rejected
# Create a local redirect server
def _get_redirect_handler_class():
class RedirectHTTPRequestHandler(
http_server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(http.FOUND)
self.send_header('Location', 'http://127.0.0.1:80/')
self.end_headers()
return
def log_message(self, *args, **kwargs):
return
server_address = ('127.0.0.1', 0)
handler_class = _get_redirect_handler_class()
redirect_httpd = http_server.HTTPServer(server_address, handler_class)
redirect_port = redirect_httpd.socket.getsockname()[1]
redirect_thread = threading.Thread(target=redirect_httpd.serve_forever)
redirect_thread.daemon = True
redirect_thread.start()
redirect_uri = 'http://127.0.0.1:%s/' % redirect_port
path = self._url('/v2/images/%s/import' % image_id)
headers = self._headers({
'content-type': 'application/json',
'X-Roles': 'admin',
})
data = jsonutils.dumps({'method': {
'name': 'web-download',
'uri': redirect_uri
}})
# Import request may be accepted (202) but should fail during
# processing when redirect destination is validated
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.ACCEPTED, response.status_code)
# Clean up redirect server
redirect_httpd.shutdown()
redirect_httpd.server_close()
# Wait for the task to process and verify it failed
# The redirect validation in get_image_data_iter should prevent
# access to disallowed host (127.0.0.1)
time.sleep(5) # Give task time to process
# Verify the task failed due to redirect validation
path = self._url('/v2/images/%s/tasks' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(http.OK, response.status_code)
tasks = jsonutils.loads(response.text)['tasks']
tasks = sorted(tasks, key=lambda t: t['updated_at'])
self.assertGreater(len(tasks), 0)
task = tasks[-1]
self.assertEqual('failure', task['status'])
# Verify the image status is still queued (not active)
# since the import failed - this proves the SSRF was prevented
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(http.OK, response.status_code)
image = jsonutils.loads(response.text)
self.assertEqual('queued', image['status'])
# Image should not have checksum or size since import failed
# If checksum/size exist, it means data was downloaded (SSRF succeeded)
self.assertIsNone(image.get('checksum'))
# Image should not have size since import failed
self.assertIsNone(image.get('size'))
# Clean up
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(http.NO_CONTENT, response.status_code)
self.stop_servers()
def test_web_download_ip_normalization(self):
"""Test that encoded IP addresses are normalized and blocked."""
self.config(allowed_ports=[80], group='import_filtering_opts')
self.config(disallowed_hosts=['127.0.0.1'],
group='import_filtering_opts')
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json'})
data = jsonutils.dumps({
'name': 'ip-encoding-test', 'type': 'kernel',
'disk_format': 'aki', 'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(http.CREATED, response.status_code)
image = jsonutils.loads(response.text)
image_id = image['id']
# Test that encoded IP (2130706433 = 127.0.0.1) is blocked
# after normalization
encoded_ip_uri = 'http://2130706433:80/'
path = self._url('/v2/images/%s/import' % image_id)
headers = self._headers({
'content-type': 'application/json',
'X-Roles': 'admin',
})
data = jsonutils.dumps({'method': {
'name': 'web-download',
'uri': encoded_ip_uri
}})
# Should be rejected because encoded IP normalizes to 127.0.0.1
# which is in disallowed_hosts
# Validation happens at API level, so should return 400
response = requests.post(path, headers=headers, data=data)
# Should be rejected with 400 Bad Request
self.assertEqual(400, response.status_code)
# Test octal integer encoded IP (017700000001 = 127.0.0.1)
octal_int_uri = 'http://017700000001:80/'
data = jsonutils.dumps({'method': {
'name': 'web-download',
'uri': octal_int_uri
}})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(400, response.status_code)
# Test octal dotted-decimal encoded IP (0177.0.0.01 = 127.0.0.1)
octal_dotted_uri = 'http://0177.0.0.01:80/'
data = jsonutils.dumps({'method': {
'name': 'web-download',
'uri': octal_dotted_uri
}})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(400, response.status_code)
# Clean up
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(http.NO_CONTENT, response.status_code)
self.stop_servers()
def test_image_lifecycle(self):
# Image list should be empty
self.api_server.show_multiple_locations = True

View File

@@ -25,6 +25,7 @@ import taskflow
import glance.async_.flows.api_image_import as import_flow
from glance.common import exception
from glance.common.scripts.image_import import main as image_import
from glance.common.scripts import utils as script_utils
from glance import context
from glance.domain import ExtraProperties
from glance import gateway
@@ -1176,6 +1177,11 @@ class TestImportMetadata(test_utils.BaseTestCase):
'extra_metadata': 'hello',
'size': '12345'
}
mock_opener = mock.MagicMock()
mock_payload = mock.MagicMock()
mock_payload.read.return_value = b'{"status": "active"}'
mock_opener.open.return_value.__enter__.return_value = mock_payload
mock_request.build_opener.return_value = mock_opener
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
@@ -1184,6 +1190,7 @@ class TestImportMetadata(test_utils.BaseTestCase):
'https://other.cloud.foo/image/v2/images/%s' % (
IMAGE_ID1),
headers={'X-Auth-Token': self.context.auth_token})
mock_request.build_opener.assert_called_once()
mock_gge.assert_called_once_with(self.context, 'RegionTwo', 'public')
action.set_image_attribute.assert_called_once_with(
disk_format='qcow2',
@@ -1212,8 +1219,11 @@ class TestImportMetadata(test_utils.BaseTestCase):
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_execute_fail_remote_glance_unreachable(self, mock_gge, mock_r):
action = self.wrapper.__enter__.return_value
mock_r.urlopen.side_effect = urllib.error.HTTPError(
mock_gge.return_value = 'https://other.cloud.foo/image'
mock_opener = mock.MagicMock()
mock_opener.open.side_effect = urllib.error.HTTPError(
'/file', 400, 'Test Fail', {}, None)
mock_r.build_opener.return_value = mock_opener
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
@@ -1231,6 +1241,11 @@ class TestImportMetadata(test_utils.BaseTestCase):
mock_json.loads.return_value = {
'status': 'queued',
}
mock_opener = mock.MagicMock()
mock_payload = mock.MagicMock()
mock_payload.read.return_value = b'{"status": "queued"}'
mock_opener.open.return_value.__enter__.return_value = mock_payload
mock_request.build_opener.return_value = mock_opener
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
@@ -1254,6 +1269,11 @@ class TestImportMetadata(test_utils.BaseTestCase):
'os_hash': 'hash',
'extra_metadata': 'hello',
}
mock_opener = mock.MagicMock()
mock_payload = mock.MagicMock()
mock_payload.read.return_value = b'{"status": "active"}'
mock_opener.open.return_value.__enter__.return_value = mock_payload
mock_request.build_opener.return_value = mock_opener
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
@@ -1262,6 +1282,7 @@ class TestImportMetadata(test_utils.BaseTestCase):
'https://other.cloud.foo/image/v2/images/%s' % (
IMAGE_ID1),
headers={'X-Auth-Token': self.context.auth_token})
mock_request.build_opener.assert_called_once()
mock_gge.assert_called_once_with(self.context, 'RegionTwo', 'public')
action.set_image_attribute.assert_called_once_with(
disk_format='qcow2',
@@ -1271,6 +1292,71 @@ class TestImportMetadata(test_utils.BaseTestCase):
'os_hash': 'hash'
})
@mock.patch('urllib.request')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_import_metadata_redirect_validation(self, mock_gge,
mock_request):
"""Test redirect destinations are validated during metadata fetch."""
mock_gge.return_value = 'https://other.cloud.foo/image'
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
mock_opener = mock.MagicMock()
# Simulate redirect to disallowed URL
mock_opener.open.side_effect = exception.ImportTaskError(
"Redirect to disallowed URL: http://127.0.0.1:5000/")
mock_request.build_opener.return_value = mock_opener
self.assertRaises(exception.ImportTaskError, task.execute)
# Verify SafeRedirectHandler is used
mock_request.build_opener.assert_called_once()
# Verify the handler passed is SafeRedirectHandler
call_args = mock_request.build_opener.call_args
# Check if SafeRedirectHandler class or instance is in args
found_handler = (
any(isinstance(arg, script_utils.SafeRedirectHandler)
for arg in call_args.args) or
script_utils.SafeRedirectHandler in call_args.args)
self.assertTrue(
found_handler,
"SafeRedirectHandler should be used for redirect validation")
@mock.patch('urllib.request')
@mock.patch('glance.async_.flows.api_image_import.json')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_import_metadata_uses_safe_redirect_handler(self, mock_gge,
mock_json,
mock_request):
"""Test that SafeRedirectHandler is used and allows valid redirects."""
mock_gge.return_value = 'https://other.cloud.foo/image'
mock_json.loads.return_value = {
'status': 'active',
'disk_format': 'qcow2',
'container_format': 'bare',
'size': '12345'
}
mock_opener = mock.MagicMock()
mock_payload = mock.MagicMock()
mock_payload.read.return_value = b'{"status": "active"}'
mock_opener.open.return_value.__enter__.return_value = mock_payload
mock_request.build_opener.return_value = mock_opener
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,
self.context, self.wrapper,
self.import_req)
# Execute should succeed with valid redirect
result = task.execute()
# Verify build_opener was called with SafeRedirectHandler
mock_request.build_opener.assert_called_once()
call_args = mock_request.build_opener.call_args
found_handler = (
any(isinstance(arg, script_utils.SafeRedirectHandler)
for arg in call_args.args) or
script_utils.SafeRedirectHandler in call_args.args)
self.assertTrue(
found_handler,
"SafeRedirectHandler should be passed to build_opener")
# Verify execution succeeded (handler allows valid redirects)
self.assertEqual(12345, result)
def test_revert_rollback_metadata_value(self):
action = self.wrapper.__enter__.return_value
task = import_flow._ImportMetadata(TASK_ID1, TASK_TYPE,

View File

@@ -22,7 +22,8 @@ from oslo_utils.fixture import uuidsentinel
from glance.async_.flows._internal_plugins import glance_download
from glance.async_.flows import api_image_import
import glance.common.exception
from glance.common import exception
from glance.common.scripts import utils as script_utils
import glance.context
from glance import domain
import glance.tests.utils as test_utils
@@ -72,37 +73,49 @@ class TestGlanceDownloadTask(test_utils.BaseTestCase):
self.image_repo.get.return_value = mock.MagicMock(
extra_properties={'os_glance_import_task': self.task_id})
@mock.patch('glance.common.utils.socket.getaddrinfo')
@mock.patch.object(filesystem.Store, 'add')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_glance_download(self, mock_gge, mock_add):
def test_glance_download(self, mock_gge, mock_add, mock_getaddrinfo):
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
mock_gge.return_value = 'https://other.cloud.foo/image'
glance_download_task = glance_download._DownloadGlanceImage(
self.context, self.task.task_id, self.task_type,
self.action_wrapper, ['foo'],
'RegionTwo', uuidsentinel.remote_image, 'public')
with mock.patch('urllib.request') as mock_request:
mock_opener = mock.MagicMock()
mock_response = mock.MagicMock()
mock_opener.open.return_value = mock_response
mock_request.build_opener.return_value = mock_opener
mock_add.return_value = ["path", 12345]
self.assertEqual(glance_download_task.execute(12345), "path")
mock_add.assert_called_once_with(
self.image_id,
mock_request.urlopen.return_value, 0)
mock_response, 0)
mock_request.Request.assert_called_once_with(
'https://other.cloud.foo/image/v2/images/%s/file' % (
uuidsentinel.remote_image),
headers={'X-Auth-Token': self.context.auth_token})
mock_request.build_opener.assert_called_once()
mock_gge.assert_called_once_with(self.context, 'RegionTwo', 'public')
@mock.patch('glance.common.utils.socket.getaddrinfo')
@mock.patch.object(filesystem.Store, 'add')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_glance_download_failed(self, mock_gge, mock_add):
def test_glance_download_failed(self, mock_gge, mock_add,
mock_getaddrinfo):
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
mock_gge.return_value = 'https://other.cloud.foo/image'
glance_download_task = glance_download._DownloadGlanceImage(
self.context, self.task.task_id, self.task_type,
self.action_wrapper, ['foo'],
'RegionTwo', uuidsentinel.remote_image, 'public')
with mock.patch('urllib.request') as mock_request:
mock_request.urlopen.side_effect = urllib.error.HTTPError(
mock_opener = mock.MagicMock()
mock_opener.open.side_effect = urllib.error.HTTPError(
'/file', 400, 'Test Fail', {}, None)
mock_request.build_opener.return_value = mock_opener
self.assertRaises(urllib.error.HTTPError,
glance_download_task.execute,
12345)
@@ -127,21 +140,28 @@ class TestGlanceDownloadTask(test_utils.BaseTestCase):
glance_download_task.execute, 12345)
mock_request.assert_not_called()
@mock.patch('glance.common.utils.socket.getaddrinfo')
@mock.patch.object(filesystem.Store, 'add')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_glance_download_size_mismatch(self, mock_gge, mock_add):
def test_glance_download_size_mismatch(self, mock_gge, mock_add,
mock_getaddrinfo):
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
mock_gge.return_value = 'https://other.cloud.foo/image'
glance_download_task = glance_download._DownloadGlanceImage(
self.context, self.task.task_id, self.task_type,
self.action_wrapper, ['foo'],
'RegionTwo', uuidsentinel.remote_image, 'public')
with mock.patch('urllib.request') as mock_request:
mock_opener = mock.MagicMock()
mock_response = mock.MagicMock()
mock_opener.open.return_value = mock_response
mock_request.build_opener.return_value = mock_opener
mock_add.return_value = ["path", 1]
self.assertRaises(glance.common.exception.ImportTaskError,
glance_download_task.execute, 12345)
mock_add.assert_called_once_with(
self.image_id,
mock_request.urlopen.return_value, 0)
mock_response, 0)
mock_request.Request.assert_called_once_with(
'https://other.cloud.foo/image/v2/images/%s/file' % (
uuidsentinel.remote_image),
@@ -165,3 +185,70 @@ class TestGlanceDownloadTask(test_utils.BaseTestCase):
mock_validate.assert_called_once_with(
'https://other.cloud.foo/image/v2/images/%s/file' % (
uuidsentinel.remote_image))
@mock.patch('glance.common.utils.socket.getaddrinfo')
@mock.patch('urllib.request')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_glance_download_redirect_validation(self, mock_gge,
mock_request,
mock_getaddrinfo):
"""Test redirect destinations are validated during image download."""
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
mock_gge.return_value = 'https://other.cloud.foo/image'
glance_download_task = glance_download._DownloadGlanceImage(
self.context, self.task.task_id, self.task_type,
self.action_wrapper, ['foo'],
'RegionTwo', uuidsentinel.remote_image, 'public')
mock_opener = mock.MagicMock()
# Simulate redirect to disallowed URL
mock_opener.open.side_effect = exception.ImportTaskError(
"Redirect to disallowed URL: http://127.0.0.1:5000/")
mock_request.build_opener.return_value = mock_opener
self.assertRaises(exception.ImportTaskError,
glance_download_task.execute, 12345)
# Verify SafeRedirectHandler is used
mock_request.build_opener.assert_called_once()
# Verify the handler passed is SafeRedirectHandler
call_args = mock_request.build_opener.call_args
# Check if SafeRedirectHandler class or instance is in args
found_handler = (
any(isinstance(arg, script_utils.SafeRedirectHandler)
for arg in call_args.args) or
script_utils.SafeRedirectHandler in call_args.args)
self.assertTrue(
found_handler,
"SafeRedirectHandler should be used for redirect validation")
@mock.patch('glance.common.utils.socket.getaddrinfo')
@mock.patch.object(filesystem.Store, 'add')
@mock.patch('urllib.request')
@mock.patch('glance.async_.utils.get_glance_endpoint')
def test_glance_download_uses_safe_redirect_handler(
self, mock_gge, mock_request, mock_add, mock_getaddrinfo):
"""Test that SafeRedirectHandler is used and allows valid execution."""
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
mock_gge.return_value = 'https://other.cloud.foo/image'
glance_download_task = glance_download._DownloadGlanceImage(
self.context, self.task.task_id, self.task_type,
self.action_wrapper, ['foo'],
'RegionTwo', uuidsentinel.remote_image, 'public')
mock_opener = mock.MagicMock()
mock_response = mock.MagicMock()
mock_opener.open.return_value = mock_response
mock_request.build_opener.return_value = mock_opener
mock_add.return_value = ["path", 12345]
result = glance_download_task.execute(12345)
# Verify build_opener was called with SafeRedirectHandler
mock_request.build_opener.assert_called_once()
# Verify SafeRedirectHandler was passed
call_args = mock_request.build_opener.call_args
# Check if SafeRedirectHandler class or instance is in args
found_handler = (
any(isinstance(arg, script_utils.SafeRedirectHandler)
for arg in call_args.args) or
script_utils.SafeRedirectHandler in call_args.args)
self.assertTrue(
found_handler,
"SafeRedirectHandler should be passed to build_opener")
# Verify execution succeeded (handler allows valid execution)
self.assertEqual("path", result)

View File

@@ -17,7 +17,6 @@ import io
import json
import os
from unittest import mock
import urllib
import glance_store
from oslo_concurrency import processutils as putils
@@ -371,9 +370,15 @@ class TestImportTask(test_utils.BaseTestCase):
self.img_repo.get.return_value = self.image
img_factory.new_image.side_effect = create_image
with mock.patch.object(urllib.request, 'urlopen') as umock:
content = b"TEST_IMAGE"
umock.return_value = io.BytesIO(content)
# Mock get_image_data_iter to avoid actual network calls
# and to work with our SafeRedirectHandler changes
content = b"TEST_IMAGE"
mock_response = io.BytesIO(content)
mock_response.headers = {}
with mock.patch(
'glance.common.scripts.utils.get_image_data_iter') as umock:
umock.return_value = mock_response
with mock.patch.object(import_flow, "_get_import_flows") as imock:
imock.return_value = (x for x in [])

View File

@@ -18,10 +18,12 @@ import shutil
import tarfile
import tempfile
from unittest import mock
import urllib.error
from defusedxml.ElementTree import ParseError
from glance.async_.flows import ovf_process
from glance.common import exception
import glance.tests.utils as test_utils
from oslo_config import cfg
@@ -164,3 +166,56 @@ class TestOvfProcessTask(test_utils.BaseTestCase):
iextractor = ovf_process.OVAImageExtractor()
with open(ova_file_path, 'rb') as ova_file:
self.assertRaises(ParseError, iextractor._parse_OVF, ova_file)
@mock.patch('glance.common.utils.validate_import_uri')
def test_get_ova_iter_objects_uri_validation_fails(self, mock_validate):
"""Test that disallowed URIs raise ImportTaskError"""
mock_validate.return_value = False
oprocess = ovf_process._OVF_Process('task_id', 'ovf_proc',
self.img_repo)
self.assertRaises(exception.ImportTaskError,
oprocess._get_ova_iter_objects,
'http://127.0.0.1:5000/package.ova')
mock_validate.assert_called_once_with(
'http://127.0.0.1:5000/package.ova')
@mock.patch('urllib.request')
@mock.patch('glance.common.utils.validate_import_uri')
def test_get_ova_iter_objects_uri_validation_passes(self, mock_validate,
mock_request):
"""Test that allowed URIs use SafeRedirectHandler"""
mock_validate.return_value = True
mock_opener = mock.MagicMock()
mock_response = mock.MagicMock()
mock_opener.open.return_value = mock_response
mock_request.build_opener.return_value = mock_opener
oprocess = ovf_process._OVF_Process('task_id', 'ovf_proc',
self.img_repo)
result = oprocess._get_ova_iter_objects(
'http://example.com/package.ova')
self.assertEqual(mock_response, result)
mock_validate.assert_called_once_with(
'http://example.com/package.ova')
mock_request.build_opener.assert_called_once()
@mock.patch('urllib.request')
@mock.patch('glance.common.utils.validate_import_uri')
def test_get_ova_iter_objects_redirect_validation(self, mock_validate,
mock_request):
"""Test that redirects to disallowed URLs are blocked"""
# First call (initial URL) passes validation
# Second call (redirect destination) fails validation
mock_validate.side_effect = [True, False]
mock_opener = mock.MagicMock()
# Simulate redirect to disallowed URL
mock_opener.open.side_effect = urllib.error.URLError(
"Redirect to disallowed URL: http://127.0.0.1:5000/package.ova")
mock_request.build_opener.return_value = mock_opener
oprocess = ovf_process._OVF_Process('task_id', 'ovf_proc',
self.img_repo)
self.assertRaises(urllib.error.URLError,
oprocess._get_ova_iter_objects,
'http://example.com/package.ova')
mock_validate.assert_called_once_with(
'http://example.com/package.ova')
mock_request.build_opener.assert_called_once()

View File

@@ -15,6 +15,8 @@
from unittest import mock
import urllib
import urllib.error
import urllib.request
from glance.common import exception
from glance.common.scripts import utils as script_utils
@@ -230,3 +232,110 @@ class TestCallbackIterator(test_utils.BaseTestCase):
# call the callback with that.
callback.assert_has_calls([mock.call(2, 2),
mock.call(1, 3)])
class TestSafeRedirectHandler(test_utils.BaseTestCase):
"""Test SafeRedirectHandler for redirect validation."""
def setUp(self):
super(TestSafeRedirectHandler, self).setUp()
@mock.patch('glance.common.utils.validate_import_uri')
def test_redirect_to_allowed_url(self, mock_validate):
"""Test redirect to allowed URL is accepted."""
mock_validate.return_value = True
handler = script_utils.SafeRedirectHandler()
req = mock.Mock()
req.full_url = 'http://example.com/redirect'
fp = mock.Mock()
headers = mock.Mock()
# Redirect to allowed URL
# redirect_request should call super().redirect_request
# which returns a request
with mock.patch.object(urllib.request.HTTPRedirectHandler,
'redirect_request') as mock_super:
mock_super.return_value = mock.Mock()
result = handler.redirect_request(
req, fp, 302, 'Found', headers, 'http://allowed.com/target'
)
mock_validate.assert_called_once_with('http://allowed.com/target')
# Should return a request object (not None)
self.assertIsNotNone(result)
@mock.patch('glance.common.utils.validate_import_uri')
def test_redirect_to_disallowed_url(self, mock_validate):
"""Test redirect to disallowed URL raises error."""
mock_validate.return_value = False
handler = script_utils.SafeRedirectHandler()
req = mock.Mock()
req.full_url = 'http://example.com/redirect'
fp = mock.Mock()
headers = mock.Mock()
# Redirect to disallowed URL should raise ImportTaskError
self.assertRaises(
exception.ImportTaskError,
handler.redirect_request,
req, fp, 302, 'Found', headers, 'http://127.0.0.1:5000/'
)
mock_validate.assert_called_once_with('http://127.0.0.1:5000/')
class TestGetImageDataIter(test_utils.BaseTestCase):
"""Test get_image_data_iter with redirect validation."""
def setUp(self):
super(TestGetImageDataIter, self).setUp()
@mock.patch('builtins.open', create=True)
def test_get_image_data_iter_file_uri(self, mock_open):
"""Test file:// URI handling."""
mock_file = mock.Mock()
mock_open.return_value = mock_file
result = script_utils.get_image_data_iter("file:///tmp/test.img")
mock_open.assert_called_once_with("/tmp/test.img", "rb")
self.assertEqual(result, mock_file)
@mock.patch('urllib.request.build_opener')
def test_get_image_data_iter_http_uri(self, mock_build_opener):
"""Test HTTP URI handling with redirect validation."""
mock_opener = mock.Mock()
mock_response = mock.Mock()
mock_opener.open.return_value = mock_response
mock_build_opener.return_value = mock_opener
result = script_utils.get_image_data_iter("http://example.com/image")
# Should use build_opener with SafeRedirectHandler
mock_build_opener.assert_called_once()
# Check that SafeRedirectHandler was passed as an argument
call_args = mock_build_opener.call_args
# build_opener can be called with *args or keyword args
# Check both positional and keyword arguments
found_handler = False
if call_args.args:
found_handler = any(
isinstance(arg, script_utils.SafeRedirectHandler)
for arg in call_args.args)
if not found_handler and call_args.kwargs:
found_handler = any(
isinstance(v, script_utils.SafeRedirectHandler)
for v in call_args.kwargs.values())
# Also check if it's passed as a handler class (not instance)
if not found_handler:
found_handler = (
script_utils.SafeRedirectHandler in call_args.args)
self.assertTrue(
found_handler,
"SafeRedirectHandler should be passed to build_opener")
mock_opener.open.assert_called_once_with("http://example.com/image")
self.assertEqual(result, mock_response)

View File

@@ -15,6 +15,8 @@
# under the License.
import io
import ipaddress
import socket
import tempfile
from unittest import mock
@@ -1013,3 +1015,179 @@ class ImportURITestCase(test_utils.BaseTestCase):
group='import_filtering_opts')
self.assertTrue(utils.validate_import_uri("ftp://foo.com:8484"))
mock_run.assert_called_once()
def test_validate_import_uri_ip_rejection(self):
"""Test that encoded IP addresses are rejected (not normalized)."""
# Test that standard IP is blocked when in blacklist
self.config(disallowed_hosts=['127.0.0.1'],
group='import_filtering_opts')
self.config(allowed_ports=[80],
group='import_filtering_opts')
self.assertFalse(utils.validate_import_uri("http://127.0.0.1:80/"))
# Test that encoded IP (decimal) is rejected
result = utils.validate_import_uri("http://2130706433:80/")
self.assertFalse(result)
# Test that shorthand IP addresses are rejected
self.assertFalse(utils.validate_import_uri("http://127.1:80/"))
self.assertFalse(utils.validate_import_uri("http://10.1:80/"))
self.assertFalse(utils.validate_import_uri("http://192.168.1:80/"))
# Test with allowed host - encoded IP should still be rejected
self.config(disallowed_hosts=[],
group='import_filtering_opts')
self.config(allowed_hosts=['127.0.0.1'],
group='import_filtering_opts')
self.assertTrue(utils.validate_import_uri("http://127.0.0.1:80/"))
self.assertFalse(utils.validate_import_uri("http://2130706433:80/"))
@mock.patch('glance.common.utils.socket.getaddrinfo')
def test_normalize_hostname(self, mock_getaddrinfo):
"""Test the normalize_hostname function."""
# Test standard IPv4 - should return normalized
result = utils.normalize_hostname("127.0.0.1")
self.assertEqual(result, "127.0.0.1")
mock_getaddrinfo.assert_not_called()
# Test standard IPv4 with different format - should normalize
result = utils.normalize_hostname("192.168.1.1")
self.assertEqual(result, "192.168.1.1")
mock_getaddrinfo.assert_not_called()
# Test encoded IP (decimal) - should be rejected (fails isdigit check)
result = utils.normalize_hostname("2130706433")
self.assertIsNone(result)
mock_getaddrinfo.assert_not_called()
# Test hex encoded IP - should be rejected (DNS resolution fails)
mock_getaddrinfo.reset_mock()
mock_getaddrinfo.side_effect = socket.gaierror(
"Name or service not known")
result = utils.normalize_hostname("0x7f000001")
self.assertIsNone(result)
mock_getaddrinfo.assert_called_once_with("0x7f000001.", 80)
# Test octal integer encoded IP - should be rejected
# (fails isdigit check)
mock_getaddrinfo.reset_mock()
result = utils.normalize_hostname("017700000001")
self.assertIsNone(result)
mock_getaddrinfo.assert_not_called()
# Test octal dotted-decimal encoded IP - should be rejected
# (all digits/dots)
mock_getaddrinfo.reset_mock()
result = utils.normalize_hostname("0177.0.0.01")
self.assertIsNone(result)
mock_getaddrinfo.assert_not_called()
# Test mixed octal/decimal dotted-decimal - should be rejected
# (all digits/dots)
mock_getaddrinfo.reset_mock()
result = utils.normalize_hostname("0177.0.0.1")
self.assertIsNone(result)
mock_getaddrinfo.assert_not_called()
# Test IPv6 address - should normalize to standard format
mock_getaddrinfo.reset_mock()
result = utils.normalize_hostname("::1")
self.assertEqual(result, "::1")
mock_getaddrinfo.assert_not_called()
result = utils.normalize_hostname("2001:db8::1")
self.assertEqual(result, "2001:db8::1")
mock_getaddrinfo.assert_not_called()
# Test IPv6-mapped IPv4 - should normalize
result = utils.normalize_hostname("::ffff:127.0.0.1")
ipv6 = ipaddress.IPv6Address(result)
expected = ipaddress.IPv6Address("::ffff:127.0.0.1")
self.assertEqual(ipv6, expected)
mock_getaddrinfo.assert_not_called()
# Test shorthand IP addresses - should be rejected
mock_getaddrinfo.reset_mock()
result = utils.normalize_hostname("127.1")
self.assertIsNone(result)
mock_getaddrinfo.assert_not_called()
result = utils.normalize_hostname("10.1")
self.assertIsNone(result)
result = utils.normalize_hostname("192.168.1")
self.assertIsNone(result)
# Test valid hostname - should return unchanged
mock_getaddrinfo.reset_mock()
mock_getaddrinfo.side_effect = None
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
result = utils.normalize_hostname("example.com")
self.assertEqual(result, "example.com")
mock_getaddrinfo.assert_called_once_with("example.com.", 80)
# Test valid domain starting with digit (3m.com)
mock_getaddrinfo.reset_mock()
mock_getaddrinfo.side_effect = None
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
result = utils.normalize_hostname("3m.com")
self.assertEqual(result, "3m.com")
mock_getaddrinfo.assert_called_once_with("3m.com.", 80)
# Test valid domain starting with 0x (0xdeadbeef.com)
mock_getaddrinfo.reset_mock()
mock_getaddrinfo.side_effect = None
mock_getaddrinfo.return_value = [('', '', '', '', ('', 80))]
result = utils.normalize_hostname("0xdeadbeef.com")
self.assertEqual(result, "0xdeadbeef.com")
mock_getaddrinfo.assert_called_once_with(
"0xdeadbeef.com.", 80)
# Test invalid/unresolvable hostname - should be rejected
mock_getaddrinfo.reset_mock()
mock_getaddrinfo.side_effect = socket.gaierror(
"Name or service not known")
result = utils.normalize_hostname("invalid-hostname-12345")
self.assertIsNone(result)
mock_getaddrinfo.assert_called_once_with(
"invalid-hostname-12345.", 80)
def test_validate_import_uri_ipv6_validation(self):
"""Test IPv6 addresses are properly validated against blacklist."""
# Test that IPv6 localhost is blocked when in blacklist
self.config(disallowed_hosts=['::1'],
group='import_filtering_opts')
self.config(allowed_ports=[80],
group='import_filtering_opts')
# IPv6 addresses in URLs are in brackets, but urlparse removes them
# So we test with the hostname directly
self.assertFalse(
utils.validate_import_uri("http://[::1]:80/"))
# Test that IPv6 address not in blacklist is allowed
self.config(disallowed_hosts=[],
group='import_filtering_opts')
self.config(allowed_hosts=['2001:db8::1'],
group='import_filtering_opts')
self.assertTrue(utils.validate_import_uri("http://[2001:db8::1]:80/"))
# Test that IPv6 localhost can be blocked separately from IPv4
# This ensures IPv6 addresses are properly normalized and can be
# blacklisted
self.config(disallowed_hosts=['127.0.0.1'],
group='import_filtering_opts')
self.config(allowed_hosts=[], # Explicitly clear whitelist
group='import_filtering_opts')
self.config(allowed_ports=[80],
group='import_filtering_opts')
# IPv6 localhost should pass if not in blacklist (no whitelist)
# The fix ensures IPv6 is normalized and can be blacklisted separately
result = utils.validate_import_uri("http://[::1]:80/")
# If ::1 is not in blacklist and no whitelist, it will pass
# Administrators should add both IPv4 and IPv6 to blacklist if needed
self.assertTrue(result)
# Test that IPv6 can be blacklisted separately
self.config(disallowed_hosts=['127.0.0.1', '::1'],
group='import_filtering_opts')
self.assertFalse(utils.validate_import_uri("http://[::1]:80/"))

View File

@@ -0,0 +1,95 @@
---
security:
- |
Fixed multiple Server-Side Request Forgery (SSRF) vulnerabilities in
Glance's image import functionality. These vulnerabilities could allow
attackers to bypass URL validation and access internal resources.
**web-download Import Method SSRF:**
The web-download import method had two SSRF vulnerabilities:
*HTTP Redirect Bypass:*
The web-download import method did not validate redirect destinations when
following HTTP redirects. An attacker could provide an initial URL that
passed validation, but redirect to an internal or disallowed resource that
would bypass the security checks. This has been fixed by implementing
``SafeRedirectHandler`` that validates redirect destinations before
following them using the same ``validate_import_uri()`` checks as the
initial URL.
*IP Address Encoding Bypass:*
The web-download import method's URL validation could be bypassed by
encoding IP addresses in alternative formats (decimal integer,
hexadecimal, octal). For example, ``127.0.0.1`` could be encoded as
``2130706433`` (decimal) or ``0x7f000001`` (hexadecimal) to bypass
blacklist checks. This has been fixed by implementing
``normalize_hostname()`` function that uses Python's ``ipaddress`` module
to validate IP addresses. The ``ipaddress`` module only accepts standard
dotted-decimal notation for IPv4 and standard format for IPv6, automatically
rejecting all encoded formats (decimal, hexadecimal, octal). Any attempt to
use encoded IP formats is rejected, preventing SSRF bypass attacks.
**glance-download Import Method SSRF:**
The glance-download import method had redirect validation bypass
vulnerabilities in two steps of the import flow:
*Image Data Download:*
When downloading image data from a remote Glance endpoint, redirects were
not validated, allowing attackers to redirect to internal services.
*Metadata Fetch:*
When fetching image metadata from a remote Glance endpoint, redirects were
not validated, allowing attackers to redirect to internal services.
Both steps have been fixed by using ``SafeRedirectHandler`` to validate
redirect destinations before following them.
**OVF Processing SSRF:**
The OVF processing functionality had a critical SSRF vulnerability with
zero protection - no URI validation, no redirect validation, and no IP
normalization. The code directly called ``urllib.request.urlopen(uri)``
without any validation checks. This has been fixed by adding URI
validation using ``validate_import_uri()`` and redirect validation using
``SafeRedirectHandler``.
**Affected Components:**
- ``glance.common.scripts.utils.get_image_data_iter()``
- ``glance.common.utils.validate_import_uri()``
- ``glance.async_.flows._internal_plugins.glance_download._DownloadGlanceImage.execute()``
- ``glance.async_.flows.api_image_import._ImportMetadata.execute()``
- ``glance.async_.flows.ovf_process._OVF_Process._get_ova_iter_objects()``
**Impact:**
- Severity: High (web-download, glance-download), Critical (OVF processing)
- Affected Versions: All versions prior to this fix
- Workaround: Administrators can temporarily disable affected import
methods by removing them from the ``enabled_import_methods``
configuration option
Bugs `#2138602 <https://bugs.launchpad.net/glance/+bug/2138602>`_,
`#2138672 <https://bugs.launchpad.net/glance/+bug/2138672>`_,
`#2138675 <https://bugs.launchpad.net/glance/+bug/2138675>`_
fixes:
- |
`Bug 2138602 <https://bugs.launchpad.net/glance/+bug/2138602>`__: Fixed SSRF
vulnerability in web-download import method via HTTP redirect bypass and
IP address encoding bypass. Added redirect validation using
``SafeRedirectHandler`` and IP address validation using Python's
``ipaddress`` module to reject encoded IP formats and prevent bypass
attacks.
`Bug 2138672 <https://bugs.launchpad.net/glance/+bug/2138672>`__: Fixed SSRF
vulnerability in glance-download import method via HTTP redirect bypass.
Added redirect validation for both image data download and metadata fetch
operations.
`Bug 2138675 <https://bugs.launchpad.net/glance/+bug/2138675>`__: Fixed SSRF
vulnerability in OVF processing functionality which lacked URI validation.
Added URI validation and redirect validation to prevent SSRF attacks when
processing OVA files.