Merge "Add base64decode function to common/utils"
This commit is contained in:
commit
f9fdb17c18
@ -12,17 +12,13 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import string
|
||||
|
||||
import six
|
||||
|
||||
from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK
|
||||
from swift.common.swob import Request, HTTPException
|
||||
from swift.common.utils import readconf
|
||||
from swift.common.utils import readconf, base64decode
|
||||
from swift.common.wsgi import WSGIContext
|
||||
|
||||
|
||||
@ -141,17 +137,12 @@ class KeyMaster(object):
|
||||
conf = readconf(self.keymaster_config_path, 'keymaster')
|
||||
b64_root_secret = conf.get('encryption_root_secret')
|
||||
try:
|
||||
# b64decode will silently discard bad characters, but we should
|
||||
# treat them as an error
|
||||
if not isinstance(b64_root_secret, six.string_types) or any(
|
||||
c not in string.digits + string.ascii_letters + '/+\r\n'
|
||||
for c in b64_root_secret.strip('\r\n=')):
|
||||
raise ValueError
|
||||
binary_root_secret = base64.b64decode(b64_root_secret)
|
||||
binary_root_secret = base64decode(b64_root_secret,
|
||||
allow_line_breaks=True)
|
||||
if len(binary_root_secret) < 32:
|
||||
raise ValueError
|
||||
return binary_root_secret
|
||||
except (TypeError, ValueError):
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
'encryption_root_secret option in %s must be a base64 '
|
||||
'encoding of at least 32 raw bytes' % (
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import errno
|
||||
import fcntl
|
||||
@ -28,6 +29,7 @@ import operator
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import string
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
@ -4341,6 +4343,36 @@ def safe_json_loads(value):
|
||||
return None
|
||||
|
||||
|
||||
def base64decode(value, allow_line_breaks=False):
|
||||
'''
|
||||
Validate and decode Base64-encoded data.
|
||||
|
||||
The stdlib base64 module silently discards bad characters, but we often
|
||||
want to treat them as an error.
|
||||
|
||||
:param value: some base64-encoded data
|
||||
:param allow_line_breaks: if True, ignore carriage returns and newlines
|
||||
:returns: the decoded data
|
||||
:raises ValueError: if ``value`` is not a string, contains invalid
|
||||
characters, or has insufficient padding
|
||||
'''
|
||||
if not isinstance(value, six.string_types):
|
||||
raise ValueError
|
||||
# b64decode will silently discard bad characters, but we want to
|
||||
# treat them as an error
|
||||
valid_chars = string.digits + string.ascii_letters + '/+'
|
||||
strip_chars = '='
|
||||
if allow_line_breaks:
|
||||
valid_chars += '\r\n'
|
||||
strip_chars += '\r\n'
|
||||
if any(c not in valid_chars for c in value.strip(strip_chars)):
|
||||
raise ValueError
|
||||
try:
|
||||
return base64.b64decode(value)
|
||||
except (TypeError, binascii.Error): # (py2 error, py3 error)
|
||||
raise ValueError
|
||||
|
||||
|
||||
MD5_BLOCK_READ_BYTES = 4096
|
||||
|
||||
|
||||
|
@ -3895,6 +3895,50 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.fail('Invalid results from pure function:\n%s' %
|
||||
'\n'.join(failures))
|
||||
|
||||
def test_base64decode(self):
|
||||
expectations = {
|
||||
None: ValueError,
|
||||
0: ValueError,
|
||||
b'': b'',
|
||||
u'': b'',
|
||||
b'A': ValueError,
|
||||
b'AA': ValueError,
|
||||
b'AAA': ValueError,
|
||||
b'AAAA': b'\x00\x00\x00',
|
||||
u'AAAA': b'\x00\x00\x00',
|
||||
b'////': b'\xff\xff\xff',
|
||||
u'////': b'\xff\xff\xff',
|
||||
b'A===': ValueError,
|
||||
b'AA==': b'\x00',
|
||||
b'AAA=': b'\x00\x00',
|
||||
b' AAAA': ValueError,
|
||||
b'AAAA ': ValueError,
|
||||
b'AAAA============': b'\x00\x00\x00',
|
||||
b'AA&AA==': ValueError,
|
||||
b'====': b'',
|
||||
}
|
||||
|
||||
failures = []
|
||||
for value, expected in expectations.items():
|
||||
if expected is ValueError:
|
||||
try:
|
||||
result = utils.base64decode(value)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
failures.append('%r => %r (expected to raise ValueError)' %
|
||||
(value, result))
|
||||
else:
|
||||
try:
|
||||
result = utils.base64decode(value)
|
||||
self.assertEqual(expected, result)
|
||||
except AssertionError:
|
||||
failures.append('%r => %r (expected %r)' % (
|
||||
value, result, expected))
|
||||
if failures:
|
||||
self.fail('Invalid results from pure function:\n%s' %
|
||||
'\n'.join(failures))
|
||||
|
||||
def test_replace_partition_in_path(self):
|
||||
# Check for new part = part * 2
|
||||
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'
|
||||
|
Loading…
x
Reference in New Issue
Block a user