Fix memcache encryption middleware

This fixes lp1175367 and lp1175368 by redesigning the memcache crypt
middleware to not do dangerous things. It is forward compatible, but
will invalidate any existing ephemeral encrypted or signed memcache
entries.

Change-Id: Ice8724949a48bfad3b8b7c41b5f50a18a9ad9f42
Signed-off-by: Bryan D. Payne <bdpayne@acm.org>
This commit is contained in:
Bryan D. Payne
2013-06-07 09:34:25 -07:00
committed by Thierry Carrez
parent 1e3cf4bb2f
commit eeefb784f2
5 changed files with 273 additions and 269 deletions

View File

@@ -1,5 +1,5 @@
.. ..
Copyright 2011-2012 OpenStack, LLC Copyright 2011-2013 OpenStack, LLC
All Rights Reserved. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -188,7 +188,8 @@ Configuration Options
the timeout when validating token by http). the timeout when validating token by http).
* ``auth_port``: (optional, default `35357`) the port used to validate tokens * ``auth_port``: (optional, default `35357`) the port used to validate tokens
* ``auth_protocol``: (optional, default `https`) * ``auth_protocol``: (optional, default `https`)
* ``auth_uri``: (optional, defaults to `auth_protocol`://`auth_host`:`auth_port`) * ``auth_uri``: (optional, defaults to
`auth_protocol`://`auth_host`:`auth_port`)
* ``certfile``: (required, if Keystone server requires client cert) * ``certfile``: (required, if Keystone server requires client cert)
* ``keyfile``: (required, if Keystone server requires client cert) This can be * ``keyfile``: (required, if Keystone server requires client cert) This can be
the same as the certfile if the certfile includes the private key. the same as the certfile if the certfile includes the private key.
@@ -232,22 +233,24 @@ Memcache Protection
=================== ===================
When using memcached, we are storing user tokens and token validation When using memcached, we are storing user tokens and token validation
information into the cache as raw data. Which means anyone who have access information into the cache as raw data. Which means that anyone who
to the memcache servers can read and modify data stored there. To mitigate has access to the memcache servers can read and modify data stored
this risk, ``auth_token`` middleware provides an option to either encrypt there. To mitigate this risk, ``auth_token`` middleware provides an
or authenticate the token data stored in the cache. option to authenticate and optionally encrypt the token data stored in
the cache.
* ``memcache_security_strategy``: (optional) if defined, indicate whether token * ``memcache_security_strategy``: (optional) if defined, indicate
data should be encrypted or authenticated. Acceptable values are ``ENCRYPT`` whether token data should be authenticated or authenticated and
or ``MAC``. If ``ENCRYPT``, token data is encrypted in the cache. If encrypted. Acceptable values are ``MAC`` or ``ENCRYPT``. If ``MAC``,
``MAC``, token data is authenticated (with HMAC) in the cache. If its value token data is authenticated (with HMAC) in the cache. If
is neither ``MAC`` nor ``ENCRYPT``, ``auth_token`` will raise an exception ``ENCRYPT``, token data is encrypted and authenticated in the
on initialization. cache. If the value is not one of these options or empty,
``auth_token`` will raise an exception on initialization.
* ``memcache_secret_key``: (optional, mandatory if * ``memcache_secret_key``: (optional, mandatory if
``memcache_security_strategy`` is defined) if defined, ``memcache_security_strategy`` is defined) this string is used for
a random string to be used for key derivation. If key derivation. If ``memcache_security_strategy`` is defined and
``memcache_security_strategy`` is defined and ``memcache_secret_key`` is ``memcache_secret_key`` is absent, ``auth_token`` will raise an
absent, ``auth_token`` will raise an exception on initialization. exception on initialization.
Exchanging User Information Exchanging User Information
=========================== ===========================

View File

@@ -222,6 +222,7 @@ opts = [
CONF.register_opts(opts, group='keystone_authtoken') CONF.register_opts(opts, group='keystone_authtoken')
LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0'] LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0']
CACHE_KEY_TEMPLATE = 'tokens/%s'
def will_expire_soon(expiry): def will_expire_soon(expiry):
@@ -847,91 +848,81 @@ class AuthProtocol(object):
env_key = self._header_to_env_var(key) env_key = self._header_to_env_var(key)
return env.get(env_key, default) return env.get(env_key, default)
def _protect_cache_value(self, token, data): def _cache_get(self, token, ignore_expires=False):
""" Encrypt or sign data if necessary. """
try:
if self._memcache_security_strategy == 'ENCRYPT':
return memcache_crypt.encrypt_data(token,
self._memcache_secret_key,
data)
elif self._memcache_security_strategy == 'MAC':
return memcache_crypt.sign_data(token, data)
else:
return data
except:
msg = 'Failed to encrypt/sign cache data.'
self.LOG.exception(msg)
return data
def _unprotect_cache_value(self, token, data):
""" Decrypt or verify signed data if necessary. """
if data is None:
return data
try:
if self._memcache_security_strategy == 'ENCRYPT':
return memcache_crypt.decrypt_data(token,
self._memcache_secret_key,
data)
elif self._memcache_security_strategy == 'MAC':
return memcache_crypt.verify_signed_data(token, data)
else:
return data
except:
msg = 'Failed to decrypt/verify cache data.'
self.LOG.exception(msg)
# this should have the same effect as data not found in cache
return None
def _get_cache_key(self, token):
""" Return the cache key.
Do not use clear token as key if memcache protection is on.
"""
htoken = token
if self._memcache_security_strategy in ('ENCRYPT', 'MAC'):
derv_token = token + self._memcache_secret_key
htoken = memcache_crypt.hash_data(derv_token)
return 'tokens/%s' % htoken
def _cache_get(self, token):
"""Return token information from cache. """Return token information from cache.
If token is invalid raise InvalidUserToken If token is invalid raise InvalidUserToken
return token only if fresh (not expired). return token only if fresh (not expired).
""" """
if self._cache and token: if self._cache and token:
key = self._get_cache_key(token) if self._memcache_security_strategy is None:
cached = self._cache.get(key) key = CACHE_KEY_TEMPLATE % token
cached = self._unprotect_cache_value(token, cached) serialized = self._cache.get(key)
else:
keys = memcache_crypt.derive_keys(
token,
self._memcache_secret_key,
self._memcache_security_strategy)
cache_key = CACHE_KEY_TEMPLATE % (
memcache_crypt.get_cache_key(keys))
raw_cached = self._cache.get(cache_key)
try:
# unprotect_data will return None if raw_cached is None
serialized = memcache_crypt.unprotect_data(keys,
raw_cached)
except Exception:
msg = 'Failed to decrypt/verify cache data'
self.LOG.exception(msg)
# this should have the same effect as data not
# found in cache
serialized = None
if serialized is None:
return None
# Note that 'invalid' and (data, expires) are the only
# valid types of serialized cache entries, so there is not
# a collision with json.loads(serialized) == None.
cached = json.loads(serialized)
if cached == 'invalid': if cached == 'invalid':
self.LOG.debug('Cached Token %s is marked unauthorized', token) self.LOG.debug('Cached Token %s is marked unauthorized', token)
raise InvalidUserToken('Token authorization failed') raise InvalidUserToken('Token authorization failed')
if cached:
data, expires = cached
if time.time() < float(expires):
self.LOG.debug('Returning cached token %s', token)
return data
else:
self.LOG.debug('Cached Token %s seems expired', token)
def _cache_store(self, token, data, expires=None): data, expires = cached
""" Store value into memcache. """ if ignore_expires or time.time() < float(expires):
key = self._get_cache_key(token) self.LOG.debug('Returning cached token %s', token)
data = self._protect_cache_value(token, data) return data
data_to_store = data else:
if expires: self.LOG.debug('Cached Token %s seems expired', token)
data_to_store = (data, expires)
def _cache_store(self, token, data):
""" Store value into memcache.
data may be the string 'invalid' or a tuple like (data, expires)
"""
serialized_data = json.dumps(data)
if self._memcache_security_strategy is None:
cache_key = CACHE_KEY_TEMPLATE % token
data_to_store = serialized_data
else:
keys = memcache_crypt.derive_keys(
token,
self._memcache_secret_key,
self._memcache_security_strategy)
cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys)
data_to_store = memcache_crypt.protect_data(keys, serialized_data)
# we need to special-case set() because of the incompatibility between # we need to special-case set() because of the incompatibility between
# Swift MemcacheRing and python-memcached. See # Swift MemcacheRing and python-memcached. See
# https://bugs.launchpad.net/swift/+bug/1095730 # https://bugs.launchpad.net/swift/+bug/1095730
if self._use_keystone_cache: if self._use_keystone_cache:
self._cache.set(key, self._cache.set(cache_key,
data_to_store, data_to_store,
time=self.token_cache_time) time=self.token_cache_time)
else: else:
self._cache.set(key, self._cache.set(cache_key,
data_to_store, data_to_store,
timeout=self.token_cache_time) timeout=self.token_cache_time)
@@ -959,7 +950,7 @@ class AuthProtocol(object):
""" """
if self._cache: if self._cache:
self.LOG.debug('Storing %s token in memcache', token) self.LOG.debug('Storing %s token in memcache', token)
self._cache_store(token, data, expires) self._cache_store(token, (data, expires))
def _cache_store_invalid(self, token): def _cache_store_invalid(self, token):
"""Store invalid token in cache.""" """Store invalid token in cache."""

View File

@@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2012 OpenStack LLC # Copyright 2010-2013 OpenStack LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -18,33 +18,34 @@
""" """
Utilities for memcache encryption and integrity check. Utilities for memcache encryption and integrity check.
Data is serialized before been encrypted or MACed. Encryption have a Data should be serialized before entering these functions. Encryption
dependency on the pycrypto. If pycrypto is not available, has a dependency on the pycrypto. If pycrypto is not available,
CryptoUnabailableError will be raised. CryptoUnavailableError will be raised.
Encrypted data stored in memcache are prefixed with '{ENCRYPT:AES256}'. This module will not be called unless signing or encryption is enabled
in the config. It will always validate signatures, and will decrypt
MACed data stored in memcache are prefixed with '{MAC:SHA1}'. data if encryption is enabled. It is not valid to mix protection
modes.
""" """
import base64 import base64
import functools import functools
import hashlib import hashlib
import json import hmac
import math
import os import os
# make sure pycrypt is available # make sure pycrypto is available
try: try:
from Crypto.Cipher import AES from Crypto.Cipher import AES
except ImportError: except ImportError:
AES = None AES = None
HASH_FUNCTION = hashlib.sha384
# prefix marker indicating data is HMACed (signed by a secret key) DIGEST_LENGTH = HASH_FUNCTION().digest_size
MAC_MARKER = '{MAC:SHA1}' DIGEST_SPLIT = DIGEST_LENGTH // 3
# prefix marker indicating data is encrypted DIGEST_LENGTH_B64 = 4 * int(math.ceil(DIGEST_LENGTH / 3.0))
ENCRYPT_MARKER = '{ENCRYPT:AES256}'
class InvalidMacError(Exception): class InvalidMacError(Exception):
@@ -81,77 +82,121 @@ def assert_crypto_availability(f):
return wrapper return wrapper
def generate_aes_key(token, secret): def constant_time_compare(first, second):
""" Generates and returns a 256 bit AES key, based on sha256 hash. """ """ Returns True if both string inputs are equal, otherwise False
return hashlib.sha256(token + secret).digest()
This function should take a constant amount of time regardless of
how many characters in the strings match.
"""
if len(first) != len(second):
return False
result = 0
for x, y in zip(first, second):
result |= ord(x) ^ ord(y)
return result == 0
def compute_mac(token, serialized_data): def derive_keys(token, secret, strategy):
""" Computes and returns the base64 encoded MAC. """ """ Derives keys for MAC and ENCRYPTION from the user-provided
return hash_data(serialized_data + token) secret. The resulting keys should be passed to the protect and
unprotect functions.
As suggested by NIST Special Publication 800-108, this uses the
first 128 bits from the sha384 KDF for the obscured cache key
value, the second 128 bits for the message authentication key and
the remaining 128 bits for the encryption key.
This approach is faster than computing a separate hmac as the KDF
for each desired key.
"""
digest = hmac.new(secret, token + strategy, HASH_FUNCTION).digest()
return {'CACHE_KEY': digest[:DIGEST_SPLIT],
'MAC': digest[DIGEST_SPLIT: 2 * DIGEST_SPLIT],
'ENCRYPTION': digest[2 * DIGEST_SPLIT:],
'strategy': strategy}
def hash_data(data): def sign_data(key, data):
""" Return the base64 encoded SHA1 hash of the data. """ """ Sign the data using the defined function and the derived key"""
return base64.b64encode(hashlib.sha1(data).digest()) mac = hmac.new(key, data, HASH_FUNCTION).digest()
return base64.b64encode(mac)
def sign_data(token, data):
""" MAC the data using SHA1. """
mac_data = {}
mac_data['serialized_data'] = json.dumps(data)
mac = compute_mac(token, mac_data['serialized_data'])
mac_data['mac'] = mac
md = MAC_MARKER + base64.b64encode(json.dumps(mac_data))
return md
def verify_signed_data(token, data):
""" Verify data integrity by ensuring MAC is valid. """
if data.startswith(MAC_MARKER):
try:
data = data[len(MAC_MARKER):]
mac_data = json.loads(base64.b64decode(data))
mac = compute_mac(token, mac_data['serialized_data'])
if mac != mac_data['mac']:
raise InvalidMacError('invalid MAC; expect=%s, actual=%s' %
(mac_data['mac'], mac))
return json.loads(mac_data['serialized_data'])
except:
raise InvalidMacError('invalid MAC; data appeared to be corrupted')
else:
# doesn't appear to be MACed data
return data
@assert_crypto_availability @assert_crypto_availability
def encrypt_data(token, secret, data): def encrypt_data(key, data):
""" Encryptes the data with the given secret key. """ """ Encrypt the data with the given secret key.
Padding is n bytes of the value n, where 1 <= n <= blocksize.
"""
iv = os.urandom(16) iv = os.urandom(16)
aes_key = generate_aes_key(token, secret) cipher = AES.new(key, AES.MODE_CBC, iv)
cipher = AES.new(aes_key, AES.MODE_CFB, iv) padding = 16 - len(data) % 16
data = json.dumps(data) return iv + cipher.encrypt(data + chr(padding) * padding)
encoded_data = base64.b64encode(iv + cipher.encrypt(data))
encoded_data = ENCRYPT_MARKER + encoded_data
return encoded_data
@assert_crypto_availability @assert_crypto_availability
def decrypt_data(token, secret, data): def decrypt_data(key, data):
""" Decrypt the data with the given secret key. """ """ Decrypt the data with the given secret key. """
if data.startswith(ENCRYPT_MARKER): iv = data[:16]
try: cipher = AES.new(key, AES.MODE_CBC, iv)
# encrypted data try:
encoded_data = data[len(ENCRYPT_MARKER):] result = cipher.decrypt(data[16:])
aes_key = generate_aes_key(token, secret) except Exception:
decoded_data = base64.b64decode(encoded_data) raise DecryptError('Encrypted data appears to be corrupted.')
iv = decoded_data[:16]
encrypted_data = decoded_data[16:] # Strip the last n padding bytes where n is the last value in
cipher = AES.new(aes_key, AES.MODE_CFB, iv) # the plaintext
decrypted_data = cipher.decrypt(encrypted_data) padding = ord(result[-1])
return json.loads(decrypted_data) return result[:-1 * padding]
except:
raise DecryptError('data appeared to be corrupted')
else: def protect_data(keys, data):
# doesn't appear to be encrypted data """ Given keys and serialized data, returns an appropriately
return data protected string suitable for storage in the cache.
"""
if keys['strategy'] == 'ENCRYPT':
data = encrypt_data(keys['ENCRYPTION'], data)
encoded_data = base64.b64encode(data)
signature = sign_data(keys['MAC'], encoded_data)
return signature + encoded_data
def unprotect_data(keys, signed_data):
""" Given keys and cached string data, verifies the signature,
decrypts if necessary, and returns the original serialized data.
"""
# cache backends return None when no data is found. We don't mind
# that this particular special value is unsigned.
if signed_data is None:
return None
# First we calculate the signature
provided_mac = signed_data[:DIGEST_LENGTH_B64]
calculated_mac = sign_data(
keys['MAC'],
signed_data[DIGEST_LENGTH_B64:])
# Then verify that it matches the provided value
if not constant_time_compare(provided_mac, calculated_mac):
raise InvalidMacError('Invalid MAC; data appears to be corrupted.')
data = base64.b64decode(signed_data[DIGEST_LENGTH_B64:])
# then if necessary decrypt the data
if keys['strategy'] == 'ENCRYPT':
data = decrypt_data(keys['ENCRYPTION'], data)
return data
def get_cache_key(keys):
""" Given keys generated by derive_keys(), returns a base64
encoded value suitable for use as a cache key in memcached.
"""
return base64.b64encode(keys['CACHE_KEY'])

View File

@@ -28,7 +28,6 @@ import webob
from keystoneclient.common import cms from keystoneclient.common import cms
from keystoneclient import utils from keystoneclient import utils
from keystoneclient.middleware import auth_token from keystoneclient.middleware import auth_token
from keystoneclient.middleware import memcache_crypt
from keystoneclient.openstack.common import memorycache from keystoneclient.openstack.common import memorycache
from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils from keystoneclient.openstack.common import timeutils
@@ -1013,9 +1012,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def _get_cached_token(self, token): def _get_cached_token(self, token):
token_id = cms.cms_hash_token(token) token_id = cms.cms_hash_token(token)
# NOTE(vish): example tokens are expired so skip the expiration check. # NOTE(vish): example tokens are expired so skip the expiration check.
key = self.middleware._get_cache_key(token_id) return self.middleware._cache_get(token_id, ignore_expires=True)
cached = self.middleware._cache.get(key)
return self.middleware._unprotect_cache_value(token, cached)
def test_memcache(self): def test_memcache(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
@@ -1036,7 +1033,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
token = 'invalid-token' token = 'invalid-token'
req.headers['X-Auth-Token'] = token req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response) self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self._get_cached_token(token), "invalid") self.assertRaises(auth_token.InvalidUserToken,
self._get_cached_token, token)
def test_memcache_set_expired(self): def test_memcache_set_expired(self):
token_cache_time = 10 token_cache_time = 10
@@ -1096,18 +1094,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'memcache_secret_key': 'mysecret' 'memcache_secret_key': 'mysecret'
} }
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
encrypted_data = self.middleware._protect_cache_value( token = 'my_token'
'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) data = ('this_data', 10e100)
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16]) self.middleware._init_cache({})
self.assertEqual( self.middleware._cache_store(token, data)
TOKEN_RESPONSES[self.token_dict['uuid_token_default']], self.assertEqual(self.middleware._cache_get(token), data[0])
self.middleware._unprotect_cache_value('token', encrypted_data))
# should return None if unable to decrypt
self.assertIsNone(
self.middleware._unprotect_cache_value(
'token', '{ENCRYPT:AES256}corrupted'))
self.assertIsNone(
self.middleware._unprotect_cache_value('mykey', encrypted_data))
def test_sign_cache_data(self): def test_sign_cache_data(self):
conf = { conf = {
@@ -1119,19 +1110,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'memcache_secret_key': 'mysecret' 'memcache_secret_key': 'mysecret'
} }
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
signed_data = self.middleware._protect_cache_value( token = 'my_token'
'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) data = ('this_data', 10e100)
expected = '{MAC:SHA1}' self.middleware._init_cache({})
self.assertEqual( self.middleware._cache_store(token, data)
signed_data[:10], self.assertEqual(self.middleware._cache_get(token), data[0])
expected)
self.assertEqual(
TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
self.middleware._unprotect_cache_value('mykey', signed_data))
# should return None on corrupted data
self.assertIsNone(
self.middleware._unprotect_cache_value('mykey',
'{MAC:SHA1}corrupted'))
def test_no_memcache_protection(self): def test_no_memcache_protection(self):
conf = { conf = {
@@ -1142,47 +1125,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'memcache_secret_key': 'mysecret' 'memcache_secret_key': 'mysecret'
} }
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
data = self.middleware._protect_cache_value('mykey', token = 'my_token'
'This is a test!') data = ('this_data', 10e100)
self.assertEqual(data, 'This is a test!') self.middleware._init_cache({})
self.assertEqual( self.middleware._cache_store(token, data)
'This is a test!', self.assertEqual(self.middleware._cache_get(token), data[0])
self.middleware._unprotect_cache_value('mykey', data))
def test_get_cache_key(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': ['localhost:11211'],
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
self.assertEqual(
'tokens/mytoken',
self.middleware._get_cache_key('mytoken'))
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': ['localhost:11211'],
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': ['localhost:11211'],
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': 'abc!'
}
self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
def test_assert_valid_memcache_protection_config(self): def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key # test missing memcache_secret_key

View File

@@ -4,48 +4,66 @@ from keystoneclient.middleware import memcache_crypt
class MemcacheCryptPositiveTests(testtools.TestCase): class MemcacheCryptPositiveTests(testtools.TestCase):
def test_generate_aes_key(self): def _setup_keys(self, strategy):
self.assertEqual( return memcache_crypt.derive_keys('token', 'secret', strategy)
len(memcache_crypt.generate_aes_key('Gimme Da Key', 'hush')), 32)
def test_compute_mac(self): def test_constant_time_compare(self):
self.assertEqual( # make sure it works as a compare, the "constant time" aspect
memcache_crypt.compute_mac('mykey', 'This is a test!'), # isn't appropriate to test in unittests
'tREu41yR5tEgeBWIuv9ag4AeKA8=') ctc = memcache_crypt.constant_time_compare
self.assertTrue(ctc('abcd', 'abcd'))
self.assertTrue(ctc('', ''))
self.assertFalse(ctc('abcd', 'efgh'))
self.assertFalse(ctc('abc', 'abcd'))
self.assertFalse(ctc('abc', 'abc\x00'))
self.assertFalse(ctc('', 'abc'))
def test_derive_keys(self):
keys = memcache_crypt.derive_keys('token', 'secret', 'strategy')
self.assertEqual(len(keys['ENCRYPTION']),
len(keys['CACHE_KEY']))
self.assertEqual(len(keys['CACHE_KEY']),
len(keys['MAC']))
self.assertNotEqual(keys['ENCRYPTION'],
keys['MAC'])
self.assertIn('strategy', keys.keys())
def test_key_strategy_diff(self):
k1 = self._setup_keys('MAC')
k2 = self._setup_keys('ENCRYPT')
self.assertNotEqual(k1, k2)
def test_sign_data(self): def test_sign_data(self):
expected = '{MAC:SHA1}eyJtYWMiOiAiM0FrQmdPZHRybGo1RFFESHA1eUxqcDVq' +\ keys = self._setup_keys('MAC')
'Si9BPSIsICJzZXJpYWxpemVkX2RhdGEiOiAiXCJUaGlzIGlzIGEgdG' +\ sig = memcache_crypt.sign_data(keys['MAC'], 'data')
'VzdCFcIiJ9' self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
self.assertEqual(
memcache_crypt.sign_data('mykey', 'This is a test!'),
expected)
def test_verify_signed_data(self): def test_encryption(self):
signed = memcache_crypt.sign_data('mykey', 'Testz') keys = self._setup_keys('ENCRYPT')
self.assertEqual( # what you put in is what you get out
memcache_crypt.verify_signed_data('mykey', signed), for data in ['data', '1234567890123456', '\x00\xFF' * 13
'Testz') ] + [chr(x % 256) * x for x in range(768)]:
self.assertEqual( crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
memcache_crypt.verify_signed_data('aasSFWE13WER', 'not MACed'), decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
'not MACed') self.assertEqual(data, decrypt)
self.assertRaises(memcache_crypt.DecryptError,
memcache_crypt.decrypt_data,
keys['ENCRYPTION'], crypt[:-1])
def test_encrypt_data(self): def test_protect_wrappers(self):
expected = '{ENCRYPT:AES256}' data = 'My Pretty Little Data'
self.assertEqual( for strategy in ['MAC', 'ENCRYPT']:
memcache_crypt.encrypt_data('mykey', 'mysecret', keys = self._setup_keys(strategy)
'This is a test!')[:16], protected = memcache_crypt.protect_data(keys, data)
expected) self.assertNotEqual(protected, data)
if strategy == 'ENCRYPT':
def test_decrypt_data(self): self.assertNotIn(data, protected)
encrypted = memcache_crypt.encrypt_data('mykey', 'mysecret', 'Testz') unprotected = memcache_crypt.unprotect_data(keys, protected)
self.assertEqual( self.assertEqual(data, unprotected)
memcache_crypt.decrypt_data('mykey', 'mysecret', encrypted), self.assertRaises(memcache_crypt.InvalidMacError,
'Testz') memcache_crypt.unprotect_data,
self.assertEqual( keys, protected[:-1])
memcache_crypt.decrypt_data('mykey', 'mysecret', self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
'Not Encrypted!'),
'Not Encrypted!')
def test_no_pycrypt(self): def test_no_pycrypt(self):
aes = memcache_crypt.AES aes = memcache_crypt.AES