crypto combine crypto.py with crypto_utils.py

...and move crypto_utils.py to swift/common/middleware

Also delete unused method and remove some unnecessary
mocking from test_decrypter.py

Change-Id: Ia4a2699db53eb4753c7f73db18fc86c84535b344
This commit is contained in:
Alistair Coles 2016-06-01 16:12:44 +01:00
parent e24838afc7
commit 79d401033d
12 changed files with 397 additions and 438 deletions

View File

@ -1,171 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
from hashlib import md5
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from swift.common.exceptions import EncryptionException
from swift.common.utils import get_logger
# AES will accept several key sizes - we are using 256 bits i.e. 32 bytes
KEY_LENGTH = 32
class Crypto(object):
"""
Used by middleware: Calls crypto alg
"""
def __init__(self, conf=None):
conf = {} if conf is None else conf
self.logger = get_logger(conf, log_route="crypto")
def create_encryption_ctxt(self, key, iv):
"""
Creates a crypto context for encrypting
:param key: 256-bit key
:param iv: 128-bit iv or nonce used for encryption
:raises: ValueError on invalid key or iv
:returns: an instance of :class:`CryptoContext`
"""
self.check_key(key)
engine = Cipher(algorithms.AES(key), modes.CTR(iv),
backend=default_backend())
enc = engine.encryptor()
return CryptoContext(enc, iv, 0)
def create_decryption_ctxt(self, key, iv, offset):
"""
Creates a crypto context for decrypting
:param key: 256-bit key
:param iv: 128-bit iv or nonce used for decryption
:param offset: offset into the message; used for range reads
:returns: an instance of :class:`CryptoContext`
"""
self.check_key(key)
if offset < 0:
raise ValueError('Offset must not be negative')
if offset > 0:
# Adjust IV so that it is correct for decryption at offset.
# ( 1<< (16 *8)) is to make 'ivl' big enough so that the following
# bytearray.fromhex() can be successful in all conditions.
ivl = long(binascii.hexlify(iv), 16)
ivl += int(offset / 16) + (1 << (16 * 8))
ivstr = format(ivl, 'x')
iv = str(bytearray.fromhex(ivstr[(len(ivstr) - 2 * 16):]))
engine = Cipher(algorithms.AES(key), modes.CTR(iv),
backend=default_backend())
dec = engine.decryptor()
# Adjust decryption boundary to AES block size of 16 bytes
dec.update('*' * (offset % 16))
return CryptoContext(dec, iv, offset)
def get_required_iv_length(self):
return algorithms.AES.block_size / 8
def _get_derived_iv(self, base):
target_length = self.get_required_iv_length()
if len(base) < target_length:
return base.zfill(target_length)
elif len(base) > target_length:
hash = md5()
hash.update(base)
return hash.hexdigest()[-target_length:]
else:
return base
def _get_random_iv(self):
# this method is separated out here so that tests can mock it
return os.urandom(self.get_required_iv_length())
def create_iv(self, iv_base=None):
if iv_base:
return self._get_derived_iv(iv_base)
return self._get_random_iv()
def get_cipher(self):
return 'AES_CTR_256'
def create_crypto_meta(self, iv_base=None):
# create a set of parameters
return {'iv': self.create_iv(iv_base), 'cipher': self.get_cipher()}
def check_crypto_meta(self, meta):
"""
Check that crypto meta dict has valid items.
:param meta: a dict
:raises EncryptionException: if an error is found in the crypto meta
"""
try:
if meta['cipher'] != self.get_cipher():
raise EncryptionException('Bad crypto meta: Cipher must be %s'
% self.get_cipher())
if len(meta['iv']) != self.get_required_iv_length():
raise EncryptionException(
'Bad crypto meta: IV must be length %s bytes'
% self.get_required_iv_length())
except KeyError as err:
raise EncryptionException(
'Bad crypto meta: Missing %s' % err)
def create_random_key(self):
# helper method to create random key of correct length
return os.urandom(KEY_LENGTH)
def wrap_key(self, wrapping_key, key_to_wrap):
# we don't use an RFC 3394 key wrap algorithm such as cryptography's
# aes_wrap_key because it's slower and we have iv material readily
# available so don't need a deterministic algorithm
iv = self._get_random_iv()
encryptor = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
backend=default_backend()).encryptor()
return {'key': encryptor.update(key_to_wrap), 'iv': iv}
def unwrap_key(self, wrapping_key, context):
# unwrap a key from dict of form returned by wrap_key
# check the key length early - unwrapping won't change the length
self.check_key(context['key'])
decryptor = Cipher(algorithms.AES(wrapping_key),
modes.CTR(context['iv']),
backend=default_backend()).decryptor()
return decryptor.update(context['key'])
def check_key(self, key):
if len(key) != KEY_LENGTH:
raise ValueError("Key must be length %s bytes" % KEY_LENGTH)
class CryptoContext(object):
"""
Crypto context used in encryption middleware. Created by calling
:func:`create_encryption_ctxt` or :func:`create_decryption_ctxt`.
"""
def __init__(self, engine, iv, offset):
self.engine = engine
self.iv = iv
self.offset = offset
def update(self, chunk):
return self.engine.update(chunk)
def get_iv(self):
return self.iv

View File

@ -13,19 +13,171 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import binascii
import json
import os
import urllib
from hashlib import md5
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from swift import gettext_ as _
from swift.common.exceptions import EncryptionException
from swift.common.swob import HTTPInternalServerError
from swift.common.wsgi import WSGIContext
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_object_transient_sysmeta_prefix
from swift.common.swob import HTTPInternalServerError
from swift.common.utils import get_logger
from swift.common.wsgi import WSGIContext
CRYPTO_KEY_CALLBACK = 'swift.callback.fetch_crypto_keys'
# AES will accept several key sizes - we are using 256 bits i.e. 32 bytes
KEY_LENGTH = 32
class Crypto(object):
"""
Used by middleware: Calls crypto alg
"""
def __init__(self, conf=None):
conf = {} if conf is None else conf
self.logger = get_logger(conf, log_route="crypto")
def create_encryption_ctxt(self, key, iv):
"""
Creates a crypto context for encrypting
:param key: 256-bit key
:param iv: 128-bit iv or nonce used for encryption
:raises: ValueError on invalid key or iv
:returns: an instance of :class:`CryptoContext`
"""
self.check_key(key)
engine = Cipher(algorithms.AES(key), modes.CTR(iv),
backend=default_backend())
enc = engine.encryptor()
return CryptoContext(enc, iv, 0)
def create_decryption_ctxt(self, key, iv, offset):
"""
Creates a crypto context for decrypting
:param key: 256-bit key
:param iv: 128-bit iv or nonce used for decryption
:param offset: offset into the message; used for range reads
:returns: an instance of :class:`CryptoContext`
"""
self.check_key(key)
if offset < 0:
raise ValueError('Offset must not be negative')
if offset > 0:
# Adjust IV so that it is correct for decryption at offset.
# ( 1<< (16 *8)) is to make 'ivl' big enough so that the following
# bytearray.fromhex() can be successful in all conditions.
ivl = long(binascii.hexlify(iv), 16)
ivl += int(offset / 16) + (1 << (16 * 8))
ivstr = format(ivl, 'x')
iv = str(bytearray.fromhex(ivstr[(len(ivstr) - 2 * 16):]))
engine = Cipher(algorithms.AES(key), modes.CTR(iv),
backend=default_backend())
dec = engine.decryptor()
# Adjust decryption boundary to AES block size of 16 bytes
dec.update('*' * (offset % 16))
return CryptoContext(dec, iv, offset)
def get_required_iv_length(self):
return algorithms.AES.block_size / 8
def _get_derived_iv(self, base):
target_length = self.get_required_iv_length()
if len(base) < target_length:
return base.zfill(target_length)
elif len(base) > target_length:
hash = md5()
hash.update(base)
return hash.hexdigest()[-target_length:]
else:
return base
def _get_random_iv(self):
# this method is separated out here so that tests can mock it
return os.urandom(self.get_required_iv_length())
def create_iv(self, iv_base=None):
if iv_base:
return self._get_derived_iv(iv_base)
return self._get_random_iv()
def get_cipher(self):
return 'AES_CTR_256'
def create_crypto_meta(self, iv_base=None):
# create a set of parameters
return {'iv': self.create_iv(iv_base), 'cipher': self.get_cipher()}
def check_crypto_meta(self, meta):
"""
Check that crypto meta dict has valid items.
:param meta: a dict
:raises EncryptionException: if an error is found in the crypto meta
"""
try:
if meta['cipher'] != self.get_cipher():
raise EncryptionException('Bad crypto meta: Cipher must be %s'
% self.get_cipher())
if len(meta['iv']) != self.get_required_iv_length():
raise EncryptionException(
'Bad crypto meta: IV must be length %s bytes'
% self.get_required_iv_length())
except KeyError as err:
raise EncryptionException(
'Bad crypto meta: Missing %s' % err)
def create_random_key(self):
# helper method to create random key of correct length
return os.urandom(KEY_LENGTH)
def wrap_key(self, wrapping_key, key_to_wrap):
# we don't use an RFC 3394 key wrap algorithm such as cryptography's
# aes_wrap_key because it's slower and we have iv material readily
# available so don't need a deterministic algorithm
iv = self._get_random_iv()
encryptor = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
backend=default_backend()).encryptor()
return {'key': encryptor.update(key_to_wrap), 'iv': iv}
def unwrap_key(self, wrapping_key, context):
# unwrap a key from dict of form returned by wrap_key
# check the key length early - unwrapping won't change the length
self.check_key(context['key'])
decryptor = Cipher(algorithms.AES(wrapping_key),
modes.CTR(context['iv']),
backend=default_backend()).decryptor()
return decryptor.update(context['key'])
def check_key(self, key):
if len(key) != KEY_LENGTH:
raise ValueError("Key must be length %s bytes" % KEY_LENGTH)
class CryptoContext(object):
"""
Crypto context used in encryption middleware. Created by calling
:func:`create_encryption_ctxt` or :func:`create_decryption_ctxt`.
"""
def __init__(self, engine, iv, offset):
self.engine = engine
self.iv = iv
self.offset = offset
def update(self, chunk):
return self.engine.update(chunk)
class CryptoWSGIContext(WSGIContext):
"""
Base class for contexts used by crypto middlewares.

View File

@ -21,10 +21,9 @@ except ImportError:
import xml.etree.ElementTree as ElementTree
from swift.common.http import is_success
from swift.common.crypto_utils import CryptoWSGIContext, load_crypto_meta, \
extract_crypto_meta
from swift.common.middleware.crypto_utils import CryptoWSGIContext, \
load_crypto_meta, extract_crypto_meta, Crypto
from swift.common.exceptions import EncryptionException
from swift.common.middleware.crypto import Crypto
from swift.common.request_helpers import strip_user_meta_prefix, is_user_meta,\
get_object_transient_sysmeta, get_listing_content_type
from swift.common.swob import Request, HTTPException, HTTPInternalServerError

View File

@ -12,20 +12,19 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from contextlib import contextmanager
from hashlib import md5
import base64
import os
from swift.common.crypto_utils import CryptoWSGIContext, dump_crypto_meta, \
append_crypto_meta
from swift.common.utils import get_logger, config_true_value
from swift.common.constraints import check_metadata
from swift.common.middleware.crypto_utils import CryptoWSGIContext, \
dump_crypto_meta, append_crypto_meta, Crypto
from swift.common.request_helpers import get_object_transient_sysmeta, \
strip_user_meta_prefix, is_user_meta, update_etag_is_at_header
from swift.common.swob import Request, Match, HTTPException, \
HTTPUnprocessableEntity
from swift.common.middleware.crypto import Crypto
from swift.common.constraints import check_metadata
from swift.common.utils import get_logger, config_true_value
def encrypt_header_val(crypto, value, key, iv_base=None):

View File

@ -28,11 +28,12 @@ import hashlib
import hmac
import os
from swift.common.utils import get_logger, split_path
from swift.common.crypto_utils import is_crypto_meta, CRYPTO_KEY_CALLBACK
from swift.common.middleware.crypto_utils import (
is_crypto_meta, CRYPTO_KEY_CALLBACK)
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.wsgi import WSGIContext
from swift.common.swob import Request, HTTPException, HTTPUnprocessableEntity
from swift.common.utils import get_logger, split_path
from swift.common.wsgi import WSGIContext
class KeyMasterContext(WSGIContext):

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
from swift.common.middleware.crypto import Crypto
from swift.common.middleware.crypto_utils import Crypto
def fetch_crypto_keys():

View File

@ -1,221 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from swift.common.exceptions import EncryptionException
from swift.common.middleware.crypto import Crypto
class TestCrypto(unittest.TestCase):
def setUp(self):
self.crypto = Crypto({})
def test_create_encryption_context(self):
value = 'encrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_encryption_ctxt(key, iv)
self.assertEqual(iv, ctxt.iv)
self.assertEqual(0, ctxt.offset)
expected = Cipher(
algorithms.AES(key), modes.CTR(iv),
backend=default_backend()).encryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, key, bad_iv)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, bad_key, iv)
def test_create_decryption_context(self):
# TODO: add tests here for non-zero offset
value = 'decrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
self.assertEqual(iv, ctxt.iv)
self.assertEqual(0, ctxt.offset)
expected = Cipher(
algorithms.AES(key), modes.CTR(iv),
backend=default_backend()).decryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, key, bad_iv, 0)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, bad_key, iv, 0)
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, key, iv, -1)
def test_enc_dec_small_chunks(self):
self.enc_dec_chunks(['encrypt me', 'because I', 'am sensitive'])
def test_enc_dec_large_chunks(self):
self.enc_dec_chunks([os.urandom(65536), os.urandom(65536)])
def enc_dec_chunks(self, chunks):
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
dec_val = [dec_ctxt.update(chunk) for chunk in enc_val]
self.assertEqual(''.join(chunks), ''.join(dec_val),
'Expected value {%s} but got {%s}' %
(''.join(chunks), ''.join(dec_val)))
def test_decrypt_range(self):
chunks = ['012345', '6789', 'abcdef']
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
# Simulate a ranged GET from byte 4 to 12 : '456789abc'
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 4)
ranged_chunks = [enc_val[0][4:], enc_val[1], enc_val[2][:3]]
dec_val = [dec_ctxt.update(chunk) for chunk in ranged_chunks]
self.assertEqual('456789abc', ''.join(dec_val),
'Expected value {%s} but got {%s}' %
('456789abc', ''.join(dec_val)))
def test_check_key(self):
for key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
with self.assertRaises(ValueError) as cm:
self.crypto.check_key(key)
self.assertEqual("Key must be length 32 bytes",
cm.exception.message)
def test_check_crypto_meta(self):
meta = {'cipher': 'AES_CTR_256'}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'iv'",
cm.exception.message)
for bad_iv in ('a little too long', 'too short'):
meta['iv'] = bad_iv
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: IV must be length 16 bytes",
cm.exception.message)
meta = {'iv': os.urandom(16)}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'cipher'",
cm.exception.message)
meta['cipher'] = 'Mystery cipher'
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Cipher must be AES_CTR_256",
cm.exception.message)
def test_create_iv(self):
self.assertEqual(16, len(self.crypto.create_iv()))
def test_shrink_iv_base(self):
base = 'base' * 5
target_length = self.crypto.get_required_iv_length()
self.assertGreater(len(base), target_length)
shrunk = self.crypto.create_iv(iv_base=base)
self.assertEqual(target_length, len(shrunk))
def test_pad_iv_base(self):
base = 'base'
target_length = self.crypto.get_required_iv_length()
self.assertLess(len(base), target_length)
padded = self.crypto.create_iv(iv_base=base)
self.assertEqual(target_length, len(padded))
def test_good_iv_base(self):
target_length = self.crypto.get_required_iv_length()
base = '1' * target_length
self.assertEqual(target_length, len(base))
same = self.crypto.create_iv(iv_base=base)
self.assertEqual(base, same)
def test_get_crypto_meta(self):
meta = self.crypto.create_crypto_meta()
self.assertIsInstance(meta, dict)
# this is deliberately brittle so that if new items are added then the
# test will need to be updated
self.assertEqual(2, len(meta))
self.assertIn('iv', meta)
self.assertEqual(16, len(meta['iv']))
self.assertIn('cipher', meta)
self.assertEqual('AES_CTR_256', meta['cipher'])
self.crypto.check_crypto_meta(meta) # sanity check
meta2 = self.crypto.create_crypto_meta()
self.assertNotEqual(meta['iv'], meta2['iv']) # crude sanity check
def test_create_random_key(self):
# crude check that we get unique keys on each call
keys = set()
for i in range(10):
key = self.crypto.create_random_key()
self.assertEqual(32, len(key))
keys.add(key)
self.assertEqual(10, len(keys))
def test_wrap_unwrap_key(self):
wrapping_key = os.urandom(32)
key_to_wrap = os.urandom(32)
iv = os.urandom(16)
with mock.patch('swift.common.middleware.crypto.Crypto._get_random_iv',
return_value=iv):
wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
cipher = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
backend=default_backend())
expected = {'key': cipher.encryptor().update(key_to_wrap),
'iv': iv}
self.assertEqual(expected, wrapped)
unwrapped = self.crypto.unwrap_key(wrapping_key, wrapped)
self.assertEqual(key_to_wrap, unwrapped)
def test_unwrap_bad_key(self):
# verify that ValueError is raised if unwrapped key is invalid
wrapping_key = os.urandom(32)
for length in (0, 16, 24, 31, 33):
key_to_wrap = os.urandom(length)
wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
with self.assertRaises(ValueError) as cm:
self.crypto.unwrap_key(wrapping_key, wrapped)
self.assertEqual(
cm.exception.message, 'Key must be length 32 bytes')
if __name__ == '__main__':
unittest.main()

View File

@ -12,11 +12,17 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from swift.common import crypto_utils
from swift.common.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.middleware.crypto import Crypto
import mock
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from swift.common.exceptions import EncryptionException
from swift.common.middleware import crypto_utils
from swift.common.middleware.crypto_utils import (
CRYPTO_KEY_CALLBACK, Crypto, CryptoWSGIContext)
from swift.common.swob import HTTPException
from test.unit import FakeLogger
from test.unit.common.middleware.crypto_helpers import fetch_crypto_keys
@ -29,7 +35,7 @@ class TestCryptoWsgiContext(unittest.TestCase):
crypto = Crypto({})
self.fake_logger = FakeLogger()
self.crypto_context = crypto_utils.CryptoWSGIContext(
self.crypto_context = CryptoWSGIContext(
FakeFilter(), 'object', self.fake_logger)
def test_get_keys(self):
@ -215,3 +221,202 @@ class TestModuleMethods(unittest.TestCase):
actual = crypto_utils.extract_crypto_meta(
crypto_utils.append_crypto_meta(val, self.meta))
self.assertEqual((val, self.meta), actual)
class TestCrypto(unittest.TestCase):
def setUp(self):
self.crypto = Crypto({})
def test_create_encryption_context(self):
value = 'encrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_encryption_ctxt(key, iv)
self.assertEqual(iv, ctxt.iv)
self.assertEqual(0, ctxt.offset)
expected = Cipher(
algorithms.AES(key), modes.CTR(iv),
backend=default_backend()).encryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, key, bad_iv)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, bad_key, iv)
def test_create_decryption_context(self):
# TODO: add tests here for non-zero offset
value = 'decrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
self.assertEqual(iv, ctxt.iv)
self.assertEqual(0, ctxt.offset)
expected = Cipher(
algorithms.AES(key), modes.CTR(iv),
backend=default_backend()).decryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, key, bad_iv, 0)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, bad_key, iv, 0)
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, key, iv, -1)
def test_enc_dec_small_chunks(self):
self.enc_dec_chunks(['encrypt me', 'because I', 'am sensitive'])
def test_enc_dec_large_chunks(self):
self.enc_dec_chunks([os.urandom(65536), os.urandom(65536)])
def enc_dec_chunks(self, chunks):
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
dec_val = [dec_ctxt.update(chunk) for chunk in enc_val]
self.assertEqual(''.join(chunks), ''.join(dec_val),
'Expected value {%s} but got {%s}' %
(''.join(chunks), ''.join(dec_val)))
def test_decrypt_range(self):
chunks = ['012345', '6789', 'abcdef']
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
# Simulate a ranged GET from byte 4 to 12 : '456789abc'
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 4)
ranged_chunks = [enc_val[0][4:], enc_val[1], enc_val[2][:3]]
dec_val = [dec_ctxt.update(chunk) for chunk in ranged_chunks]
self.assertEqual('456789abc', ''.join(dec_val),
'Expected value {%s} but got {%s}' %
('456789abc', ''.join(dec_val)))
def test_check_key(self):
for key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
with self.assertRaises(ValueError) as cm:
self.crypto.check_key(key)
self.assertEqual("Key must be length 32 bytes",
cm.exception.message)
def test_check_crypto_meta(self):
meta = {'cipher': 'AES_CTR_256'}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'iv'",
cm.exception.message)
for bad_iv in ('a little too long', 'too short'):
meta['iv'] = bad_iv
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: IV must be length 16 bytes",
cm.exception.message)
meta = {'iv': os.urandom(16)}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'cipher'",
cm.exception.message)
meta['cipher'] = 'Mystery cipher'
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Cipher must be AES_CTR_256",
cm.exception.message)
def test_create_iv(self):
self.assertEqual(16, len(self.crypto.create_iv()))
def test_shrink_iv_base(self):
base = 'base' * 5
target_length = self.crypto.get_required_iv_length()
self.assertGreater(len(base), target_length)
shrunk = self.crypto.create_iv(iv_base=base)
self.assertEqual(target_length, len(shrunk))
def test_pad_iv_base(self):
base = 'base'
target_length = self.crypto.get_required_iv_length()
self.assertLess(len(base), target_length)
padded = self.crypto.create_iv(iv_base=base)
self.assertEqual(target_length, len(padded))
def test_good_iv_base(self):
target_length = self.crypto.get_required_iv_length()
base = '1' * target_length
self.assertEqual(target_length, len(base))
same = self.crypto.create_iv(iv_base=base)
self.assertEqual(base, same)
def test_get_crypto_meta(self):
meta = self.crypto.create_crypto_meta()
self.assertIsInstance(meta, dict)
# this is deliberately brittle so that if new items are added then the
# test will need to be updated
self.assertEqual(2, len(meta))
self.assertIn('iv', meta)
self.assertEqual(16, len(meta['iv']))
self.assertIn('cipher', meta)
self.assertEqual('AES_CTR_256', meta['cipher'])
self.crypto.check_crypto_meta(meta) # sanity check
meta2 = self.crypto.create_crypto_meta()
self.assertNotEqual(meta['iv'], meta2['iv']) # crude sanity check
def test_create_random_key(self):
# crude check that we get unique keys on each call
keys = set()
for i in range(10):
key = self.crypto.create_random_key()
self.assertEqual(32, len(key))
keys.add(key)
self.assertEqual(10, len(keys))
def test_wrap_unwrap_key(self):
wrapping_key = os.urandom(32)
key_to_wrap = os.urandom(32)
iv = os.urandom(16)
with mock.patch(
'swift.common.middleware.crypto_utils.Crypto._get_random_iv',
return_value=iv):
wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
cipher = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
backend=default_backend())
expected = {'key': cipher.encryptor().update(key_to_wrap),
'iv': iv}
self.assertEqual(expected, wrapped)
unwrapped = self.crypto.unwrap_key(wrapping_key, wrapped)
self.assertEqual(key_to_wrap, unwrapped)
def test_unwrap_bad_key(self):
# verify that ValueError is raised if unwrapped key is invalid
wrapping_key = os.urandom(32)
for length in (0, 16, 24, 31, 33):
key_to_wrap = os.urandom(length)
wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
with self.assertRaises(ValueError) as cm:
self.crypto.unwrap_key(wrapping_key, wrapped)
self.assertEqual(
cm.exception.message, 'Key must be length 32 bytes')
if __name__ == '__main__':
unittest.main()

View File

@ -12,16 +12,17 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import unittest
from xml.dom import minidom
import mock
import base64
import json
from swift.common.middleware.crypto import Crypto
from swift.common.crypto_utils import CRYPTO_KEY_CALLBACK, dump_crypto_meta
import mock
from swift.common.middleware import decrypter
from swift.common.middleware.crypto_utils import CRYPTO_KEY_CALLBACK, \
dump_crypto_meta, Crypto
from swift.common.swob import Request, HTTPException, HTTPOk, \
HTTPPreconditionFailed, HTTPNotFound, HTTPPartialContent
@ -43,8 +44,6 @@ def encrypt_and_append_meta(value, key, crypto_meta=None):
get_crypto_meta_header(crypto_meta))
@mock.patch('swift.common.middleware.crypto.Crypto.create_iv',
lambda *args: FAKE_IV)
class TestDecrypterObjectRequests(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
@ -871,8 +870,6 @@ class TestDecrypterObjectRequests(unittest.TestCase):
resp.headers['x-object-sysmeta-test'])
@mock.patch('swift.common.middleware.crypto.Crypto.create_iv',
lambda *args: FAKE_IV)
class TestDecrypterContainerRequests(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()

View File

@ -15,25 +15,24 @@
import base64
import json
import os
import unittest
import urllib
import unittest
import mock
from swift.common.middleware import encrypter
from swift.common.middleware.crypto_utils import CRYPTO_KEY_CALLBACK, Crypto
from swift.common.swob import (
Request, HTTPException, HTTPCreated, HTTPAccepted, HTTPOk, HTTPBadRequest)
from swift.common.utils import FileLikeIter
from swift.common.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.middleware.crypto import Crypto
from test.unit import FakeLogger, EMPTY_ETAG
from test.unit import FakeLogger, EMPTY_ETAG
from test.unit.common.middleware.crypto_helpers import fetch_crypto_keys, \
md5hex, FAKE_IV, encrypt
from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts
@mock.patch('swift.common.middleware.crypto.Crypto._get_random_iv',
@mock.patch('swift.common.middleware.crypto_utils.Crypto._get_random_iv',
lambda *args: FAKE_IV)
class TestEncrypter(unittest.TestCase):
def setUp(self):
@ -61,7 +60,7 @@ class TestEncrypter(unittest.TestCase):
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.Crypto.create_random_key',
'swift.common.middleware.crypto_utils.Crypto.create_random_key',
return_value=body_key):
resp = req.get_response(self.encrypter)
self.assertEqual('201 Created', resp.status)
@ -240,7 +239,7 @@ class TestEncrypter(unittest.TestCase):
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.Crypto.create_random_key',
'swift.common.middleware.crypto_utils.Crypto.create_random_key',
lambda *args: body_key):
resp = req.get_response(self.encrypter)
@ -597,7 +596,7 @@ class TestEncrypter(unittest.TestCase):
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.Crypto.create_random_key',
'swift.common.middleware.crypto_utils.Crypto.create_random_key',
lambda *args: body_key):
resp = req.get_response(self.encrypter)
@ -622,7 +621,7 @@ class TestEncrypter(unittest.TestCase):
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.Crypto.create_random_key',
'swift.common.middleware.crypto_utils.Crypto.create_random_key',
lambda *args: body_key):
resp = req.get_response(self.encrypter)

View File

@ -18,15 +18,15 @@ import unittest
import uuid
from swift.common import storage_policy
from swift.common.middleware import encrypter, decrypter, keymaster, crypto
from swift.common.middleware import encrypter, decrypter, keymaster
from swift.common.middleware.crypto_utils import load_crypto_meta, Crypto
from swift.common.ring import Ring
from swift.common.swob import Request
from swift.common.crypto_utils import load_crypto_meta
from swift.obj import diskfile
from test.unit import FakeLogger
from test.unit.common.middleware.crypto_helpers import md5hex, encrypt
from test.unit.helpers import setup_servers, teardown_servers
from swift.obj import diskfile
from test.unit import FakeLogger
class TestCryptoPipelineChanges(unittest.TestCase):
@ -338,7 +338,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
body_key_meta = load_crypto_meta(
metadata['x-object-sysmeta-crypto-meta'])['body_key']
obj_key = self.km.create_key('/a/%s/o' % self.container_name)
body_key = crypto.Crypto({}).unwrap_key(obj_key, body_key_meta)
body_key = Crypto().unwrap_key(obj_key, body_key_meta)
exp_enc_body = encrypt(self.plaintext, body_key, body_iv)
ondisk_data.append((node, contents))
# verify on disk user metadata

View File

@ -13,14 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import unittest
from swift.common.middleware import keymaster
from swift.common import swob
from swift.common.middleware import keymaster
from swift.common.middleware.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.swob import Request
from swift.common.crypto_utils import CRYPTO_KEY_CALLBACK
from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts