Stop using cgi.parse_header
With py311 we started seeing warnings like DeprecationWarning: 'cgi' is deprecated and slated for removal in Python 3.13 The recommended replacement isn't quite up to snuff (doesn't handle multiple parameters), but we already have a reasonably close replacement. Add a "loose" mode to parse_content_type to make it suitable when there may be a slash in a parameter token. Add a new utils.parse_header function that takes advantage of the parse_content_type "loose" mode. Closes-Bug: #2084472 Change-Id: Ie281ff90796f2d68840952c95669f264480b1b4c
This commit is contained in:

committed by
Alistair Coles

parent
fc88b0e816
commit
c390c637d1
@@ -23,9 +23,8 @@ import urllib.parse
|
||||
|
||||
from swift.common.exceptions import EncryptionException, UnknownSecretIdError
|
||||
from swift.common.swob import HTTPInternalServerError
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, parse_header
|
||||
from swift.common.wsgi import WSGIContext
|
||||
from cgi import parse_header
|
||||
|
||||
CRYPTO_KEY_CALLBACK = 'swift.callback.fetch_crypto_keys'
|
||||
|
||||
|
@@ -141,7 +141,6 @@ https://github.com/swiftstack/s3compat in detail.
|
||||
|
||||
"""
|
||||
|
||||
from cgi import parse_header
|
||||
import json
|
||||
from paste.deploy import loadwsgi
|
||||
from urllib.parse import parse_qs
|
||||
@@ -159,7 +158,8 @@ from swift.common.middleware.s3api.s3request import get_request_class
|
||||
from swift.common.middleware.s3api.s3response import ErrorResponse, \
|
||||
InternalError, MethodNotAllowed, S3ResponseBase, S3NotImplemented
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
config_positive_int_value, split_path, closing_if_possible, list_from_csv
|
||||
config_positive_int_value, split_path, closing_if_possible, \
|
||||
list_from_csv, parse_header
|
||||
from swift.common.middleware.s3api.utils import Config
|
||||
from swift.common.middleware.s3api.acl_handlers import get_acl_handler
|
||||
from swift.common.registry import register_swift_info, \
|
||||
|
@@ -339,7 +339,6 @@ metadata which can be used for stats and billing purposes.
|
||||
"""
|
||||
|
||||
import base64
|
||||
from cgi import parse_header
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
import json
|
||||
@@ -362,7 +361,7 @@ from swift.common.utils import get_logger, config_true_value, \
|
||||
override_bytes_from_content_type, split_path, \
|
||||
RateLimitedIterator, quote, closing_if_possible, \
|
||||
LRUCache, StreamingPile, strict_b64decode, Timestamp, friendly_close, \
|
||||
md5
|
||||
md5, parse_header
|
||||
from swift.common.registry import register_swift_info
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
|
||||
|
@@ -199,11 +199,10 @@ configuration steps are required:
|
||||
|
||||
import json
|
||||
import os
|
||||
from cgi import parse_header
|
||||
|
||||
from swift.common.utils import get_logger, split_path, \
|
||||
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
|
||||
config_true_value, drain_and_close
|
||||
config_true_value, drain_and_close, parse_header
|
||||
from swift.common.registry import register_swift_info
|
||||
from swift.common.constraints import check_account_format
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest, \
|
||||
@@ -289,7 +288,7 @@ def _validate_and_prep_request_headers(req):
|
||||
request=req, content_type='text/plain')
|
||||
etag = normalize_etag(req.headers.get(TGT_ETAG_SYMLINK_HDR, None))
|
||||
if etag and any(c in etag for c in ';"\\'):
|
||||
# See cgi.parse_header for why the above chars are problematic
|
||||
# See utils.parse_header for why the above chars are problematic
|
||||
raise HTTPBadRequest(
|
||||
body='Bad %s format' % TGT_ETAG_SYMLINK_HDR.title(),
|
||||
request=req, content_type='text/plain')
|
||||
|
@@ -147,7 +147,6 @@ import itertools
|
||||
import json
|
||||
import time
|
||||
|
||||
from cgi import parse_header
|
||||
from urllib.parse import unquote
|
||||
|
||||
from swift.common.constraints import MAX_FILE_SIZE, valid_api_version, \
|
||||
@@ -169,7 +168,7 @@ from swift.common.swob import HTTPPreconditionFailed, HTTPServiceUnavailable, \
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import get_logger, Timestamp, drain_and_close, \
|
||||
config_true_value, close_if_possible, closing_if_possible, \
|
||||
FileLikeIter, split_path, parse_content_type, RESERVED_STR
|
||||
FileLikeIter, split_path, parse_content_type, parse_header, RESERVED_STR
|
||||
from swift.common.wsgi import WSGIContext, make_pre_authed_request
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
|
||||
|
@@ -2840,6 +2840,11 @@ _rfc_extension_pattern = re.compile(
|
||||
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
|
||||
r'|"(?:[^"\\]|\\.)*"))?)')
|
||||
|
||||
_loose_token = r'[^()<>@,;:\"\[\]?={}\x00-\x20\x7f]+' # nosec B105
|
||||
_loose_extension_pattern = re.compile(
|
||||
r'(?:\s*;\s*(' + _loose_token + r")\s*(?:=\s*(" + _loose_token +
|
||||
r'|"(?:[^"\\]|\\.)*"))?)')
|
||||
|
||||
_content_range_pattern = re.compile(r'^bytes (\d+)-(\d+)/(\d+)$')
|
||||
|
||||
|
||||
@@ -2861,7 +2866,7 @@ def parse_content_range(content_range):
|
||||
return tuple(int(x) for x in found.groups())
|
||||
|
||||
|
||||
def parse_content_type(content_type):
|
||||
def parse_content_type(content_type, strict=True):
|
||||
"""
|
||||
Parse a content-type and its parameters into values.
|
||||
RFC 2616 sec 14.17 and 3.7 are pertinent.
|
||||
@@ -2873,19 +2878,48 @@ def parse_content_type(content_type):
|
||||
('text/plain', [('charset, 'UTF-8'), ('level', '1')])
|
||||
|
||||
:param content_type: content_type to parse
|
||||
:param strict: ignore ``/`` and any following characters in parameter
|
||||
tokens. If ``strict`` is True a parameter such as ``x=a/b`` will be
|
||||
parsed as ``x=a``. If ``strict`` is False a parameter such as ``x=a/b``
|
||||
will be parsed as ``x=a/b``. The default is True.
|
||||
:returns: a tuple containing (content type, list of k, v parameter tuples)
|
||||
"""
|
||||
parm_list = []
|
||||
if ';' in content_type:
|
||||
content_type, parms = content_type.split(';', 1)
|
||||
parms = ';' + parms
|
||||
for m in _rfc_extension_pattern.findall(parms):
|
||||
pat = _rfc_extension_pattern if strict else _loose_extension_pattern
|
||||
for m in pat.findall(parms):
|
||||
key = m[0].strip()
|
||||
value = m[1].strip()
|
||||
parm_list.append((key, value))
|
||||
return content_type, parm_list
|
||||
|
||||
|
||||
def parse_header(value):
|
||||
"""
|
||||
Parse a header value to extract the first part and a dict of any
|
||||
following parameters.
|
||||
|
||||
The ``value`` to parse should be of the form:
|
||||
|
||||
``<first part>[;<key>=<value>][; <key>=<value>]...``
|
||||
|
||||
``<first part>`` should be of the form ``<token>[/<token>]``, ``<key>``
|
||||
should be a ``token``, and ``<value>`` should be either a ``token`` or
|
||||
``quoted-string``, where ``token`` and ``quoted-string`` are defined by RFC
|
||||
2616 section 2.2.
|
||||
|
||||
:param value: the header value to parse.
|
||||
:return: a tuple (first part, dict(params)).
|
||||
"""
|
||||
# note: this does not behave *exactly* like cgi.parse_header (which this
|
||||
# replaces) w.r.t. parsing non-token characters in param values (e.g. the
|
||||
# null character) , but it's sufficient for our use cases.
|
||||
token, params = parse_content_type(value, strict=False)
|
||||
return token, dict(params)
|
||||
|
||||
|
||||
def extract_swift_bytes(content_type):
|
||||
"""
|
||||
Parse a content-type and return a tuple containing:
|
||||
|
@@ -1853,6 +1853,77 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertEqual(
|
||||
utils.parse_content_type(r'text/plain; x="\""; a'),
|
||||
('text/plain', [('x', r'"\""'), ('a', '')]))
|
||||
self.assertEqual(
|
||||
utils.parse_content_type(r'text/plain; x=a/b; y'),
|
||||
('text/plain', [('x', 'a'), ('y', '')]))
|
||||
|
||||
self.assertEqual(
|
||||
utils.parse_content_type(r'text/plain; x=a/b; y', strict=True),
|
||||
('text/plain', [('x', 'a'), ('y', '')]))
|
||||
self.assertEqual(
|
||||
utils.parse_content_type(r'text/plain; x=a/b; y', strict=False),
|
||||
('text/plain', [('x', 'a/b'), ('y', '')]))
|
||||
|
||||
def test_parse_header(self):
|
||||
self.assertEqual(
|
||||
utils.parse_header('text/plain'), ('text/plain', {}))
|
||||
self.assertEqual(
|
||||
utils.parse_header('text/plain;'), ('text/plain', {}))
|
||||
self.assertEqual(
|
||||
utils.parse_header(r'text/plain; x=a/b; y = z'),
|
||||
('text/plain', {'x': 'a/b', 'y': 'z'}))
|
||||
self.assertEqual(
|
||||
utils.parse_header(r'text/plain; x=a/b; y'),
|
||||
('text/plain', {'x': 'a/b', 'y': ''}))
|
||||
self.assertEqual(
|
||||
utils.parse_header('etag; x=a/b; y'),
|
||||
('etag', {'x': 'a/b', 'y': ''}))
|
||||
|
||||
def test_parse_headers_chars_in_params(self):
|
||||
def do_test(val):
|
||||
self.assertEqual(
|
||||
utils.parse_header('text/plain; x=a%sb' % val),
|
||||
('text/plain', {'x': 'a%sb' % val}))
|
||||
|
||||
do_test('\N{SNOWMAN}')
|
||||
do_test('\\')
|
||||
do_test('%')
|
||||
do_test('-')
|
||||
do_test('-')
|
||||
do_test('&')
|
||||
# wsgi_quote'd null character is ok...
|
||||
do_test('%00')
|
||||
|
||||
def test_parse_header_non_token_chars_in_params(self):
|
||||
def do_test(val):
|
||||
# character terminates individual param parsing...
|
||||
self.assertEqual(
|
||||
utils.parse_header('text/plain; x=a%sb; y=z' % val),
|
||||
('text/plain', {'x': 'a', 'y': 'z'}),
|
||||
'val=%s' % val
|
||||
)
|
||||
|
||||
non_token_chars = '()<>@,:[]?={}\x00"'
|
||||
|
||||
for ch in non_token_chars:
|
||||
do_test(ch)
|
||||
|
||||
do_test(' space oddity ')
|
||||
|
||||
def test_parse_header_quoted_string_in_params(self):
|
||||
def do_test(val):
|
||||
self.assertEqual(
|
||||
utils.parse_header('text/plain; x="%s"; y=z' % val),
|
||||
('text/plain', {'x': '"%s"' % val, 'y': 'z'}),
|
||||
'val=%s' % val
|
||||
)
|
||||
|
||||
non_token_chars = '()<>@,:[]?={}\x00'
|
||||
|
||||
for ch in non_token_chars:
|
||||
do_test(ch)
|
||||
|
||||
do_test(' space oddity ')
|
||||
|
||||
def test_override_bytes_from_content_type(self):
|
||||
listing_dict = {
|
||||
|
Reference in New Issue
Block a user