
Fix an anomaly where object metadata for an empty object has no encrypted etag, but if the encrypter received a container update override etag in footers or headers then it would encrypt that, so we'd have encrypted metadata in the container listing but not in the object metadata. (Empty object etags are not encrypted because the object content is revealed by its size anyway). This patch changes the override handling to not encrypt override etags that correspond to an empty object, with one exception: if for some reason the received override etag value is that of an empty string but there *was* an object body, then we'll encrypt the override etag, because it's value is not obvious from the object size. Change-Id: I8d7da34d6d98f351f59174883bc4d5ed0416c101
884 lines
39 KiB
Python
884 lines
39 KiB
Python
# Copyright (c) 2015-2016 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import unittest
|
|
import urllib
|
|
|
|
import mock
|
|
|
|
from swift.common.middleware.crypto import encrypter
|
|
from swift.common.middleware.crypto.crypto_utils import (
|
|
CRYPTO_KEY_CALLBACK, Crypto)
|
|
from swift.common.swob import (
|
|
Request, HTTPException, HTTPCreated, HTTPAccepted, HTTPOk, HTTPBadRequest)
|
|
from swift.common.utils import FileLikeIter
|
|
|
|
from test.unit import FakeLogger, EMPTY_ETAG
|
|
from test.unit.common.middleware.crypto.crypto_helpers import (
|
|
fetch_crypto_keys, md5hex, FAKE_IV, encrypt)
|
|
from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts
|
|
|
|
|
|
@mock.patch('swift.common.middleware.crypto.crypto_utils.Crypto.create_iv',
|
|
lambda *args: FAKE_IV)
|
|
class TestEncrypter(unittest.TestCase):
|
|
def setUp(self):
|
|
self.app = FakeSwift()
|
|
self.encrypter = encrypter.Encrypter(self.app, {})
|
|
self.encrypter.logger = FakeLogger()
|
|
|
|
def _verify_user_metadata(self, req_hdrs, name, value, key):
|
|
# verify encrypted version of user metadata
|
|
self.assertNotIn('X-Object-Meta-' + name, req_hdrs)
|
|
expected_hdr = 'X-Object-Transient-Sysmeta-Crypto-Meta-' + name
|
|
self.assertIn(expected_hdr, req_hdrs)
|
|
enc_val, param = req_hdrs[expected_hdr].split(';')
|
|
param = param.strip()
|
|
self.assertTrue(param.startswith('swift_meta='))
|
|
actual_meta = json.loads(
|
|
urllib.unquote_plus(param[len('swift_meta='):]))
|
|
self.assertEqual(Crypto.cipher, actual_meta['cipher'])
|
|
meta_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(FAKE_IV, meta_iv)
|
|
self.assertEqual(
|
|
base64.b64encode(encrypt(value, key, meta_iv)),
|
|
enc_val)
|
|
# if there is any encrypted user metadata then this header should exist
|
|
self.assertIn('X-Object-Transient-Sysmeta-Crypto-Meta', req_hdrs)
|
|
common_meta = json.loads(urllib.unquote_plus(
|
|
req_hdrs['X-Object-Transient-Sysmeta-Crypto-Meta']))
|
|
self.assertDictEqual({'cipher': Crypto.cipher,
|
|
'key_id': {'v': 'fake', 'path': '/a/c/fake'}},
|
|
common_meta)
|
|
|
|
def test_PUT_req(self):
|
|
body_key = os.urandom(32)
|
|
object_key = fetch_crypto_keys()['object']
|
|
plaintext = 'FAKE APP'
|
|
plaintext_etag = md5hex(plaintext)
|
|
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
|
|
ciphertext_etag = md5hex(ciphertext)
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'etag': plaintext_etag,
|
|
'content-type': 'text/plain',
|
|
'content-length': str(len(plaintext)),
|
|
'x-object-meta-etag': 'not to be confused with the Etag!',
|
|
'x-object-meta-test': 'encrypt me',
|
|
'x-object-sysmeta-test': 'do not encrypt me'}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
with mock.patch(
|
|
'swift.common.middleware.crypto.crypto_utils.'
|
|
'Crypto.create_random_key',
|
|
return_value=body_key):
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual(plaintext_etag, resp.headers['Etag'])
|
|
|
|
# verify metadata items
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual('PUT', self.app.calls[0][0])
|
|
req_hdrs = self.app.headers[0]
|
|
|
|
# verify body crypto meta
|
|
actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta']
|
|
actual = json.loads(urllib.unquote_plus(actual))
|
|
self.assertEqual(Crypto().cipher, actual['cipher'])
|
|
self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))
|
|
|
|
# verify wrapped body key
|
|
expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
|
|
self.assertEqual(expected_wrapped_key,
|
|
base64.b64decode(actual['body_key']['key']))
|
|
self.assertEqual(FAKE_IV,
|
|
base64.b64decode(actual['body_key']['iv']))
|
|
self.assertEqual(fetch_crypto_keys()['id'], actual['key_id'])
|
|
|
|
# verify etag
|
|
self.assertEqual(ciphertext_etag, req_hdrs['Etag'])
|
|
|
|
encrypted_etag, _junk, etag_meta = \
|
|
req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=')
|
|
# verify crypto_meta was appended to this etag
|
|
self.assertTrue(etag_meta)
|
|
actual_meta = json.loads(urllib.unquote_plus(etag_meta))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
|
|
# verify encrypted version of plaintext etag
|
|
actual = base64.b64decode(encrypted_etag)
|
|
etag_iv = base64.b64decode(actual_meta['iv'])
|
|
enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
|
|
self.assertEqual(enc_etag, actual)
|
|
|
|
# verify etag MAC for conditional requests
|
|
actual_hmac = base64.b64decode(
|
|
req_hdrs['X-Object-Sysmeta-Crypto-Etag-Mac'])
|
|
self.assertEqual(actual_hmac, hmac.new(
|
|
object_key, plaintext_etag, hashlib.sha256).digest())
|
|
|
|
# verify encrypted etag for container update
|
|
self.assertIn(
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
|
|
parts = req_hdrs[
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
|
|
self.assertEqual(2, len(parts))
|
|
|
|
# extract crypto_meta from end of etag for container update
|
|
param = parts[1].strip()
|
|
crypto_meta_tag = 'swift_meta='
|
|
self.assertTrue(param.startswith(crypto_meta_tag), param)
|
|
actual_meta = json.loads(
|
|
urllib.unquote_plus(param[len(crypto_meta_tag):]))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id'])
|
|
|
|
cont_key = fetch_crypto_keys()['container']
|
|
cont_etag_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(FAKE_IV, cont_etag_iv)
|
|
self.assertEqual(encrypt(plaintext_etag, cont_key, cont_etag_iv),
|
|
base64.b64decode(parts[0]))
|
|
|
|
# content-type is not encrypted
|
|
self.assertEqual('text/plain', req_hdrs['Content-Type'])
|
|
|
|
# user meta is encrypted
|
|
self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key)
|
|
self._verify_user_metadata(
|
|
req_hdrs, 'Etag', 'not to be confused with the Etag!', object_key)
|
|
|
|
# sysmeta is not encrypted
|
|
self.assertEqual('do not encrypt me',
|
|
req_hdrs['X-Object-Sysmeta-Test'])
|
|
|
|
# verify object is encrypted by getting direct from the app
|
|
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
|
|
resp = get_req.get_response(self.app)
|
|
self.assertEqual(ciphertext, resp.body)
|
|
self.assertEqual(ciphertext_etag, resp.headers['Etag'])
|
|
|
|
def test_PUT_zero_size_object(self):
|
|
# object body encryption should be skipped for zero sized object body
|
|
object_key = fetch_crypto_keys()['object']
|
|
plaintext_etag = EMPTY_ETAG
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'etag': EMPTY_ETAG,
|
|
'content-type': 'text/plain',
|
|
'content-length': '0',
|
|
'x-object-meta-etag': 'not to be confused with the Etag!',
|
|
'x-object-meta-test': 'encrypt me',
|
|
'x-object-sysmeta-test': 'do not encrypt me'}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body='', headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
|
|
resp = req.get_response(self.encrypter)
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual(plaintext_etag, resp.headers['Etag'])
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual('PUT', self.app.calls[0][0])
|
|
req_hdrs = self.app.headers[0]
|
|
|
|
# verify that there is no body crypto meta
|
|
self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', req_hdrs)
|
|
# verify etag is md5 of plaintext
|
|
self.assertEqual(EMPTY_ETAG, req_hdrs['Etag'])
|
|
# verify there is no etag crypto meta
|
|
self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', req_hdrs)
|
|
# verify there is no container update override for etag
|
|
self.assertNotIn(
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
|
|
|
|
# user meta is still encrypted
|
|
self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key)
|
|
self._verify_user_metadata(
|
|
req_hdrs, 'Etag', 'not to be confused with the Etag!', object_key)
|
|
|
|
# sysmeta is not encrypted
|
|
self.assertEqual('do not encrypt me',
|
|
req_hdrs['X-Object-Sysmeta-Test'])
|
|
|
|
# verify object is empty by getting direct from the app
|
|
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
|
|
resp = get_req.get_response(self.app)
|
|
self.assertEqual('', resp.body)
|
|
self.assertEqual(EMPTY_ETAG, resp.headers['Etag'])
|
|
|
|
def _test_PUT_with_other_footers(self, override_etag):
|
|
# verify handling of another middleware's footer callback
|
|
cont_key = fetch_crypto_keys()['container']
|
|
body_key = os.urandom(32)
|
|
object_key = fetch_crypto_keys()['object']
|
|
plaintext = 'FAKE APP'
|
|
plaintext_etag = md5hex(plaintext)
|
|
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
|
|
ciphertext_etag = md5hex(ciphertext)
|
|
other_footers = {
|
|
'Etag': plaintext_etag,
|
|
'X-Object-Sysmeta-Other': 'other sysmeta',
|
|
'X-Object-Sysmeta-Container-Update-Override-Size':
|
|
'other override',
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
override_etag}
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(plaintext)),
|
|
'Etag': 'correct etag is in footers'}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
|
|
with mock.patch(
|
|
'swift.common.middleware.crypto.crypto_utils.'
|
|
'Crypto.create_random_key',
|
|
lambda *args: body_key):
|
|
resp = req.get_response(self.encrypter)
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual(plaintext_etag, resp.headers['Etag'])
|
|
|
|
# verify metadata items
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual('PUT', self.app.calls[0][0])
|
|
req_hdrs = self.app.headers[0]
|
|
|
|
# verify that other middleware's footers made it to app, including any
|
|
# container update overrides but nothing Etag-related
|
|
other_footers.pop('Etag')
|
|
other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag')
|
|
for k, v in other_footers.items():
|
|
self.assertEqual(v, req_hdrs[k])
|
|
|
|
# verify encryption footers are ok
|
|
encrypted_etag, _junk, etag_meta = \
|
|
req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=')
|
|
self.assertTrue(etag_meta)
|
|
actual_meta = json.loads(urllib.unquote_plus(etag_meta))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
|
|
self.assertEqual(ciphertext_etag, req_hdrs['Etag'])
|
|
actual = base64.b64decode(encrypted_etag)
|
|
etag_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(encrypt(plaintext_etag, object_key, etag_iv), actual)
|
|
|
|
# verify encrypted etag for container update
|
|
self.assertIn(
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
|
|
parts = req_hdrs[
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
|
|
self.assertEqual(2, len(parts))
|
|
|
|
# extract crypto_meta from end of etag for container update
|
|
param = parts[1].strip()
|
|
crypto_meta_tag = 'swift_meta='
|
|
self.assertTrue(param.startswith(crypto_meta_tag), param)
|
|
actual_meta = json.loads(
|
|
urllib.unquote_plus(param[len(crypto_meta_tag):]))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
|
|
cont_key = fetch_crypto_keys()['container']
|
|
cont_etag_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(FAKE_IV, cont_etag_iv)
|
|
self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv),
|
|
base64.b64decode(parts[0]))
|
|
|
|
# verify body crypto meta
|
|
actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta']
|
|
actual = json.loads(urllib.unquote_plus(actual))
|
|
self.assertEqual(Crypto().cipher, actual['cipher'])
|
|
self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))
|
|
|
|
# verify wrapped body key
|
|
expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
|
|
self.assertEqual(expected_wrapped_key,
|
|
base64.b64decode(actual['body_key']['key']))
|
|
self.assertEqual(FAKE_IV,
|
|
base64.b64decode(actual['body_key']['iv']))
|
|
self.assertEqual(fetch_crypto_keys()['id'], actual['key_id'])
|
|
|
|
def test_PUT_with_other_footers(self):
|
|
self._test_PUT_with_other_footers('override etag')
|
|
|
|
def test_PUT_with_other_footers_and_empty_etag(self):
|
|
# verify that an override etag value of EMPTY_ETAG will be encrypted
|
|
# when there was a non-zero body length
|
|
self._test_PUT_with_other_footers(EMPTY_ETAG)
|
|
|
|
def _test_PUT_with_etag_override_in_headers(self, override_etag):
|
|
# verify handling of another middleware's
|
|
# container-update-override-etag in headers
|
|
plaintext = 'FAKE APP'
|
|
plaintext_etag = md5hex(plaintext)
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(plaintext)),
|
|
'Etag': plaintext_etag,
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
override_etag}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
resp = req.get_response(self.encrypter)
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual(plaintext_etag, resp.headers['Etag'])
|
|
|
|
# verify metadata items
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0])
|
|
req_hdrs = self.app.headers[0]
|
|
|
|
# verify encrypted etag for container update
|
|
self.assertIn(
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
|
|
parts = req_hdrs[
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
|
|
self.assertEqual(2, len(parts))
|
|
cont_key = fetch_crypto_keys()['container']
|
|
|
|
# extract crypto_meta from end of etag for container update
|
|
param = parts[1].strip()
|
|
crypto_meta_tag = 'swift_meta='
|
|
self.assertTrue(param.startswith(crypto_meta_tag), param)
|
|
actual_meta = json.loads(
|
|
urllib.unquote_plus(param[len(crypto_meta_tag):]))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id'])
|
|
|
|
cont_etag_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(FAKE_IV, cont_etag_iv)
|
|
self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv),
|
|
base64.b64decode(parts[0]))
|
|
|
|
def test_PUT_with_etag_override_in_headers(self):
|
|
self._test_PUT_with_etag_override_in_headers('override_etag')
|
|
|
|
def test_PUT_with_etag_override_in_headers_and_empty_etag(self):
|
|
# verify that an override etag value of EMPTY_ETAG will be encrypted
|
|
# when there was a non-zero body length
|
|
self._test_PUT_with_etag_override_in_headers(EMPTY_ETAG)
|
|
|
|
def test_PUT_with_bad_etag_in_other_footers(self):
|
|
# verify that etag supplied in footers from other middleware overrides
|
|
# header etag when validating inbound plaintext etags
|
|
plaintext = 'FAKE APP'
|
|
plaintext_etag = md5hex(plaintext)
|
|
other_footers = {
|
|
'Etag': 'bad etag',
|
|
'X-Object-Sysmeta-Other': 'other sysmeta',
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
'other override'}
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(plaintext)),
|
|
'Etag': plaintext_etag}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('422 Unprocessable Entity', resp.status)
|
|
self.assertNotIn('Etag', resp.headers)
|
|
|
|
def test_PUT_with_bad_etag_in_headers_and_other_footers(self):
|
|
# verify that etag supplied in headers from other middleware is used if
|
|
# none is supplied in footers when validating inbound plaintext etags
|
|
plaintext = 'FAKE APP'
|
|
other_footers = {
|
|
'X-Object-Sysmeta-Other': 'other sysmeta',
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
'other override'}
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(plaintext)),
|
|
'Etag': 'bad etag'}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('422 Unprocessable Entity', resp.status)
|
|
self.assertNotIn('Etag', resp.headers)
|
|
|
|
def test_PUT_nothing_read(self):
|
|
# simulate an artificial scenario of a downstream filter/app not
|
|
# actually reading the input stream from encrypter.
|
|
class NonReadingApp(object):
|
|
def __call__(self, env, start_response):
|
|
# note: no read from wsgi.input
|
|
req = Request(env)
|
|
env['swift.callback.update_footers'](req.headers)
|
|
call_headers.append(req.headers)
|
|
resp = HTTPCreated(req=req, headers={'Etag': 'response etag'})
|
|
return resp(env, start_response)
|
|
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': 0,
|
|
'etag': 'etag from client'}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs)
|
|
|
|
call_headers = []
|
|
resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {}))
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual('response etag', resp.headers['Etag'])
|
|
self.assertEqual(1, len(call_headers))
|
|
self.assertEqual('etag from client', call_headers[0]['etag'])
|
|
# verify no encryption footers
|
|
for k in call_headers[0]:
|
|
self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-'))
|
|
|
|
# check that an upstream footer callback gets called
|
|
other_footers = {
|
|
'Etag': EMPTY_ETAG,
|
|
'X-Object-Sysmeta-Other': 'other sysmeta',
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
'other override'}
|
|
env.update({'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)})
|
|
req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs)
|
|
|
|
call_headers = []
|
|
resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {}))
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual('response etag', resp.headers['Etag'])
|
|
self.assertEqual(1, len(call_headers))
|
|
|
|
# verify encrypted override etag for container update.
|
|
self.assertIn(
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag', call_headers[0])
|
|
parts = call_headers[0][
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
|
|
self.assertEqual(2, len(parts))
|
|
cont_key = fetch_crypto_keys()['container']
|
|
|
|
param = parts[1].strip()
|
|
crypto_meta_tag = 'swift_meta='
|
|
self.assertTrue(param.startswith(crypto_meta_tag), param)
|
|
actual_meta = json.loads(
|
|
urllib.unquote_plus(param[len(crypto_meta_tag):]))
|
|
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
|
|
self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id'])
|
|
|
|
cont_etag_iv = base64.b64decode(actual_meta['iv'])
|
|
self.assertEqual(FAKE_IV, cont_etag_iv)
|
|
self.assertEqual(encrypt('other override', cont_key, cont_etag_iv),
|
|
base64.b64decode(parts[0]))
|
|
|
|
# verify that other middleware's footers made it to app
|
|
other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag')
|
|
for k, v in other_footers.items():
|
|
self.assertEqual(v, call_headers[0][k])
|
|
# verify no encryption footers
|
|
for k in call_headers[0]:
|
|
self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-'))
|
|
|
|
# if upstream footer override etag is for an empty body then check that
|
|
# it is not encrypted
|
|
other_footers = {
|
|
'Etag': EMPTY_ETAG,
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag': EMPTY_ETAG}
|
|
env.update({'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)})
|
|
req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs)
|
|
|
|
call_headers = []
|
|
resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {}))
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual('response etag', resp.headers['Etag'])
|
|
self.assertEqual(1, len(call_headers))
|
|
|
|
# verify that other middleware's footers made it to app
|
|
for k, v in other_footers.items():
|
|
self.assertEqual(v, call_headers[0][k])
|
|
# verify no encryption footers
|
|
for k in call_headers[0]:
|
|
self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-'))
|
|
|
|
def test_POST_req(self):
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': 'POST',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'x-object-meta-test': 'encrypt me',
|
|
'x-object-sysmeta-test': 'do not encrypt me'}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
key = fetch_crypto_keys()['object']
|
|
self.app.register('POST', '/v1/a/c/o', HTTPAccepted, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('202 Accepted', resp.status)
|
|
self.assertNotIn('Etag', resp.headers)
|
|
|
|
# verify metadata items
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual('POST', self.app.calls[0][0])
|
|
req_hdrs = self.app.headers[0]
|
|
|
|
# user meta is encrypted
|
|
self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', key)
|
|
|
|
# sysmeta is not encrypted
|
|
self.assertEqual('do not encrypt me',
|
|
req_hdrs['X-Object-Sysmeta-Test'])
|
|
|
|
def _test_no_user_metadata(self, method):
|
|
# verify that x-object-transient-sysmeta-crypto-meta is not set when
|
|
# there is no user metadata
|
|
env = {'REQUEST_METHOD': method,
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body='body')
|
|
self.app.register(method, '/v1/a/c/o', HTTPAccepted, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('202 Accepted', resp.status)
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual(method, self.app.calls[0][0])
|
|
self.assertNotIn('x-object-transient-sysmeta-crypto-meta',
|
|
self.app.headers[0])
|
|
|
|
def test_PUT_no_user_metadata(self):
|
|
self._test_no_user_metadata('PUT')
|
|
|
|
def test_POST_no_user_metadata(self):
|
|
self._test_no_user_metadata('POST')
|
|
|
|
def _test_if_match(self, method, match_header_name):
|
|
def do_test(method, plain_etags, expected_plain_etags=None):
|
|
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
match_header_value = ', '.join(plain_etags)
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, method=method,
|
|
headers={match_header_name: match_header_value})
|
|
app = FakeSwift()
|
|
app.register(method, '/v1/a/c/o', HTTPOk, {})
|
|
resp = req.get_response(encrypter.Encrypter(app, {}))
|
|
self.assertEqual('200 OK', resp.status)
|
|
|
|
self.assertEqual(1, len(app.calls), app.calls)
|
|
self.assertEqual(method, app.calls[0][0])
|
|
actual_headers = app.headers[0]
|
|
|
|
# verify the alternate etag location has been specified
|
|
if match_header_value and match_header_value != '*':
|
|
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
|
|
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
|
|
actual_headers['X-Backend-Etag-Is-At'])
|
|
|
|
# verify etags have been supplemented with masked values
|
|
self.assertIn(match_header_name, actual_headers)
|
|
actual_etags = set(actual_headers[match_header_name].split(', '))
|
|
key = fetch_crypto_keys()['object']
|
|
masked_etags = [
|
|
'"%s"' % base64.b64encode(hmac.new(
|
|
key, etag.strip('"'), hashlib.sha256).digest())
|
|
for etag in plain_etags if etag not in ('*', '')]
|
|
expected_etags = set((expected_plain_etags or plain_etags) +
|
|
masked_etags)
|
|
self.assertEqual(expected_etags, actual_etags)
|
|
# check that the request environ was returned to original state
|
|
self.assertEqual(set(plain_etags),
|
|
set(req.headers[match_header_name].split(', ')))
|
|
|
|
do_test(method, [''])
|
|
do_test(method, ['"an etag"'])
|
|
do_test(method, ['"an etag"', '"another_etag"'])
|
|
do_test(method, ['*'])
|
|
# rfc2616 does not allow wildcard *and* etag but test it anyway
|
|
do_test(method, ['*', '"an etag"'])
|
|
# etags should be quoted but check we can cope if they are not
|
|
do_test(
|
|
method, ['*', 'an etag', 'another_etag'],
|
|
expected_plain_etags=['*', '"an etag"', '"another_etag"'])
|
|
|
|
def test_GET_if_match(self):
|
|
self._test_if_match('GET', 'If-Match')
|
|
|
|
def test_HEAD_if_match(self):
|
|
self._test_if_match('HEAD', 'If-Match')
|
|
|
|
def test_GET_if_none_match(self):
|
|
self._test_if_match('GET', 'If-None-Match')
|
|
|
|
def test_HEAD_if_none_match(self):
|
|
self._test_if_match('HEAD', 'If-None-Match')
|
|
|
|
def _test_existing_etag_is_at_header(self, method, match_header_name):
|
|
# if another middleware has already set X-Backend-Etag-Is-At then
|
|
# encrypter should not override that value
|
|
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, method=method,
|
|
headers={match_header_name: "an etag",
|
|
'X-Backend-Etag-Is-At': 'X-Object-Sysmeta-Other-Etag'})
|
|
self.app.register(method, '/v1/a/c/o', HTTPOk, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('200 OK', resp.status)
|
|
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual(method, self.app.calls[0][0])
|
|
actual_headers = self.app.headers[0]
|
|
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
|
|
self.assertEqual(
|
|
'X-Object-Sysmeta-Other-Etag,X-Object-Sysmeta-Crypto-Etag-Mac',
|
|
actual_headers['X-Backend-Etag-Is-At'])
|
|
actual_etags = set(actual_headers[match_header_name].split(', '))
|
|
self.assertIn('"an etag"', actual_etags)
|
|
|
|
def test_GET_if_match_with_existing_etag_is_at_header(self):
|
|
self._test_existing_etag_is_at_header('GET', 'If-Match')
|
|
|
|
def test_HEAD_if_match_with_existing_etag_is_at_header(self):
|
|
self._test_existing_etag_is_at_header('HEAD', 'If-Match')
|
|
|
|
def test_GET_if_none_match_with_existing_etag_is_at_header(self):
|
|
self._test_existing_etag_is_at_header('GET', 'If-None-Match')
|
|
|
|
def test_HEAD_if_none_match_with_existing_etag_is_at_header(self):
|
|
self._test_existing_etag_is_at_header('HEAD', 'If-None-Match')
|
|
|
|
def _test_etag_is_at_not_duplicated(self, method):
|
|
# verify only one occurrence of X-Object-Sysmeta-Crypto-Etag-Mac in
|
|
# X-Backend-Etag-Is-At
|
|
key = fetch_crypto_keys()['object']
|
|
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
req = Request.blank(
|
|
'/v1/a/c/o', environ=env, method=method,
|
|
headers={'If-Match': '"an etag"',
|
|
'If-None-Match': '"another etag"'})
|
|
self.app.register(method, '/v1/a/c/o', HTTPOk, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('200 OK', resp.status)
|
|
|
|
self.assertEqual(1, len(self.app.calls), self.app.calls)
|
|
self.assertEqual(method, self.app.calls[0][0])
|
|
actual_headers = self.app.headers[0]
|
|
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
|
|
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
|
|
actual_headers['X-Backend-Etag-Is-At'])
|
|
|
|
self.assertIn('"%s"' % base64.b64encode(
|
|
hmac.new(key, 'an etag', hashlib.sha256).digest()),
|
|
actual_headers['If-Match'])
|
|
self.assertIn('"another etag"', actual_headers['If-None-Match'])
|
|
self.assertIn('"%s"' % base64.b64encode(
|
|
hmac.new(key, 'another etag', hashlib.sha256).digest()),
|
|
actual_headers['If-None-Match'])
|
|
|
|
def test_GET_etag_is_at_not_duplicated(self):
|
|
self._test_etag_is_at_not_duplicated('GET')
|
|
|
|
def test_HEAD_etag_is_at_not_duplicated(self):
|
|
self._test_etag_is_at_not_duplicated('HEAD')
|
|
|
|
def test_PUT_response_inconsistent_etag_is_not_replaced(self):
|
|
# if response is success but etag does not match the ciphertext md5
|
|
# then verify that we do *not* replace it with the plaintext etag
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated,
|
|
{'Etag': 'not the ciphertext etag'})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('201 Created', resp.status)
|
|
self.assertEqual('not the ciphertext etag', resp.headers['Etag'])
|
|
|
|
def test_PUT_multiseg_no_client_etag(self):
|
|
body_key = os.urandom(32)
|
|
chunks = ['some', 'chunks', 'of data']
|
|
body = ''.join(chunks)
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'wsgi.input': FileLikeIter(chunks)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
|
|
with mock.patch(
|
|
'swift.common.middleware.crypto.crypto_utils.'
|
|
'Crypto.create_random_key',
|
|
lambda *args: body_key):
|
|
resp = req.get_response(self.encrypter)
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
# verify object is encrypted by getting direct from the app
|
|
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
|
|
self.assertEqual(encrypt(body, body_key, FAKE_IV),
|
|
get_req.get_response(self.app).body)
|
|
|
|
def test_PUT_multiseg_good_client_etag(self):
|
|
body_key = os.urandom(32)
|
|
chunks = ['some', 'chunks', 'of data']
|
|
body = ''.join(chunks)
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'wsgi.input': FileLikeIter(chunks)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body)),
|
|
'Etag': md5hex(body)}
|
|
req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
|
|
with mock.patch(
|
|
'swift.common.middleware.crypto.crypto_utils.'
|
|
'Crypto.create_random_key',
|
|
lambda *args: body_key):
|
|
resp = req.get_response(self.encrypter)
|
|
|
|
self.assertEqual('201 Created', resp.status)
|
|
# verify object is encrypted by getting direct from the app
|
|
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
|
|
self.assertEqual(encrypt(body, body_key, FAKE_IV),
|
|
get_req.get_response(self.app).body)
|
|
|
|
def test_PUT_multiseg_bad_client_etag(self):
|
|
chunks = ['some', 'chunks', 'of data']
|
|
body = ''.join(chunks)
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
|
|
'wsgi.input': FileLikeIter(chunks)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body)),
|
|
'Etag': 'badclientetag'}
|
|
req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('422 Unprocessable Entity', resp.status)
|
|
|
|
def test_PUT_missing_key_callback(self):
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': 'PUT'}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('500 Internal Error', resp.status)
|
|
self.assertIn('missing callback',
|
|
self.encrypter.logger.get_lines_for_level('error')[0])
|
|
self.assertEqual('Unable to retrieve encryption keys.', resp.body)
|
|
|
|
def test_PUT_error_in_key_callback(self):
|
|
def raise_exc():
|
|
raise Exception('Testing')
|
|
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
CRYPTO_KEY_CALLBACK: raise_exc}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('500 Internal Error', resp.status)
|
|
self.assertIn('from callback: Testing',
|
|
self.encrypter.logger.get_lines_for_level('error')[0])
|
|
self.assertEqual('Unable to retrieve encryption keys.', resp.body)
|
|
|
|
def test_PUT_encryption_override(self):
|
|
# set crypto override to disable encryption.
|
|
# simulate another middleware wanting to set footers
|
|
other_footers = {
|
|
'Etag': 'other etag',
|
|
'X-Object-Sysmeta-Other': 'other sysmeta',
|
|
'X-Object-Sysmeta-Container-Update-Override-Etag':
|
|
'other override'}
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': 'PUT',
|
|
'swift.crypto.override': True,
|
|
'swift.callback.update_footers':
|
|
lambda footers: footers.update(other_footers)}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('201 Created', resp.status)
|
|
|
|
# verify that other middleware's footers made it to app
|
|
req_hdrs = self.app.headers[0]
|
|
for k, v in other_footers.items():
|
|
self.assertEqual(v, req_hdrs[k])
|
|
|
|
# verify object is NOT encrypted by getting direct from the app
|
|
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
|
|
self.assertEqual(body, get_req.get_response(self.app).body)
|
|
|
|
def _test_constraints_checking(self, method):
|
|
# verify that the check_metadata function is called on PUT and POST
|
|
body = 'FAKE APP'
|
|
env = {'REQUEST_METHOD': method,
|
|
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
|
|
hdrs = {'content-type': 'text/plain',
|
|
'content-length': str(len(body))}
|
|
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
|
|
mocked_func = 'swift.common.middleware.crypto.encrypter.check_metadata'
|
|
with mock.patch(mocked_func) as mocked:
|
|
mocked.side_effect = [HTTPBadRequest('testing')]
|
|
resp = req.get_response(self.encrypter)
|
|
self.assertEqual('400 Bad Request', resp.status)
|
|
self.assertEqual(1, mocked.call_count)
|
|
mocked.assert_called_once_with(mock.ANY, 'object')
|
|
self.assertEqual(req.headers,
|
|
mocked.call_args_list[0][0][0].headers)
|
|
|
|
def test_PUT_constraints_checking(self):
|
|
self._test_constraints_checking('PUT')
|
|
|
|
def test_POST_constraints_checking(self):
|
|
self._test_constraints_checking('POST')
|
|
|
|
def test_config_true_value_on_disable_encryption(self):
|
|
app = FakeSwift()
|
|
self.assertFalse(encrypter.Encrypter(app, {}).disable_encryption)
|
|
for val in ('true', '1', 'yes', 'on', 't', 'y'):
|
|
app = encrypter.Encrypter(app,
|
|
{'disable_encryption': val})
|
|
self.assertTrue(app.disable_encryption)
|
|
|
|
def test_PUT_app_exception(self):
|
|
app = encrypter.Encrypter(FakeAppThatExcepts(HTTPException), {})
|
|
req = Request.blank('/', environ={'REQUEST_METHOD': 'PUT'})
|
|
with self.assertRaises(HTTPException) as catcher:
|
|
req.get_response(app)
|
|
self.assertEqual(FakeAppThatExcepts.MESSAGE, catcher.exception.body)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|