374 lines
16 KiB
Python
374 lines
16 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Tests for swift.common.request_helpers"""
|
|
|
|
import unittest
|
|
from swift.common.swob import Request, HTTPException, HeaderKeyDict
|
|
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
|
|
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
|
|
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
|
|
remove_items, copy_header_subset, get_name_and_placement, \
|
|
http_response_to_document_iters, is_object_transient_sysmeta, \
|
|
update_etag_is_at_header, resolve_etag_is_at_header, \
|
|
strip_object_transient_sysmeta_prefix
|
|
|
|
from test.unit import patch_policies
|
|
from test.unit.common.test_utils import FakeResponse
|
|
|
|
|
|
server_types = ['account', 'container', 'object']
|
|
|
|
|
|
class TestRequestHelpers(unittest.TestCase):
|
|
def test_is_user_meta(self):
|
|
m_type = 'meta'
|
|
for st in server_types:
|
|
self.assertTrue(is_user_meta(st, 'x-%s-%s-foo' % (st, m_type)))
|
|
self.assertFalse(is_user_meta(st, 'x-%s-%s-' % (st, m_type)))
|
|
self.assertFalse(is_user_meta(st, 'x-%s-%sfoo' % (st, m_type)))
|
|
|
|
def test_is_sys_meta(self):
|
|
m_type = 'sysmeta'
|
|
for st in server_types:
|
|
self.assertTrue(is_sys_meta(st, 'x-%s-%s-foo' % (st, m_type)))
|
|
self.assertFalse(is_sys_meta(st, 'x-%s-%s-' % (st, m_type)))
|
|
self.assertFalse(is_sys_meta(st, 'x-%s-%sfoo' % (st, m_type)))
|
|
|
|
def test_is_sys_or_user_meta(self):
|
|
m_types = ['sysmeta', 'meta']
|
|
for mt in m_types:
|
|
for st in server_types:
|
|
self.assertTrue(is_sys_or_user_meta(st, 'x-%s-%s-foo'
|
|
% (st, mt)))
|
|
self.assertFalse(is_sys_or_user_meta(st, 'x-%s-%s-'
|
|
% (st, mt)))
|
|
self.assertFalse(is_sys_or_user_meta(st, 'x-%s-%sfoo'
|
|
% (st, mt)))
|
|
|
|
def test_strip_sys_meta_prefix(self):
|
|
mt = 'sysmeta'
|
|
for st in server_types:
|
|
self.assertEqual(strip_sys_meta_prefix(st, 'x-%s-%s-a'
|
|
% (st, mt)), 'a')
|
|
mt = 'not-sysmeta'
|
|
for st in server_types:
|
|
with self.assertRaises(ValueError):
|
|
strip_sys_meta_prefix(st, 'x-%s-%s-a' % (st, mt))
|
|
|
|
def test_strip_user_meta_prefix(self):
|
|
mt = 'meta'
|
|
for st in server_types:
|
|
self.assertEqual(strip_user_meta_prefix(st, 'x-%s-%s-a'
|
|
% (st, mt)), 'a')
|
|
mt = 'not-meta'
|
|
for st in server_types:
|
|
with self.assertRaises(ValueError):
|
|
strip_sys_meta_prefix(st, 'x-%s-%s-a' % (st, mt))
|
|
|
|
def test_is_object_transient_sysmeta(self):
|
|
self.assertTrue(is_object_transient_sysmeta(
|
|
'x-object-transient-sysmeta-foo'))
|
|
self.assertFalse(is_object_transient_sysmeta(
|
|
'x-object-transient-sysmeta-'))
|
|
self.assertFalse(is_object_transient_sysmeta(
|
|
'x-object-meatmeta-foo'))
|
|
|
|
def test_strip_object_transient_sysmeta_prefix(self):
|
|
mt = 'object-transient-sysmeta'
|
|
self.assertEqual(strip_object_transient_sysmeta_prefix('x-%s-a' % mt),
|
|
'a')
|
|
|
|
mt = 'object-sysmeta-transient'
|
|
with self.assertRaises(ValueError):
|
|
strip_object_transient_sysmeta_prefix('x-%s-a' % mt)
|
|
|
|
def test_remove_items(self):
|
|
src = {'a': 'b',
|
|
'c': 'd'}
|
|
test = lambda x: x == 'a'
|
|
rem = remove_items(src, test)
|
|
self.assertEqual(src, {'c': 'd'})
|
|
self.assertEqual(rem, {'a': 'b'})
|
|
|
|
def test_copy_header_subset(self):
|
|
src = {'a': 'b',
|
|
'c': 'd'}
|
|
from_req = Request.blank('/path', environ={}, headers=src)
|
|
to_req = Request.blank('/path', {})
|
|
test = lambda x: x.lower() == 'a'
|
|
copy_header_subset(from_req, to_req, test)
|
|
self.assertTrue('A' in to_req.headers)
|
|
self.assertEqual(to_req.headers['A'], 'b')
|
|
self.assertFalse('c' in to_req.headers)
|
|
self.assertFalse('C' in to_req.headers)
|
|
|
|
@patch_policies(with_ec_default=True)
|
|
def test_get_name_and_placement_object_req(self):
|
|
path = '/device/part/account/container/object'
|
|
req = Request.blank(path, headers={
|
|
'X-Backend-Storage-Policy-Index': '0'})
|
|
device, part, account, container, obj, policy = \
|
|
get_name_and_placement(req, 5, 5, True)
|
|
self.assertEqual(device, 'device')
|
|
self.assertEqual(part, 'part')
|
|
self.assertEqual(account, 'account')
|
|
self.assertEqual(container, 'container')
|
|
self.assertEqual(obj, 'object')
|
|
self.assertEqual(policy, POLICIES[0])
|
|
self.assertEqual(policy.policy_type, EC_POLICY)
|
|
|
|
req.headers['X-Backend-Storage-Policy-Index'] = 1
|
|
device, part, account, container, obj, policy = \
|
|
get_name_and_placement(req, 5, 5, True)
|
|
self.assertEqual(device, 'device')
|
|
self.assertEqual(part, 'part')
|
|
self.assertEqual(account, 'account')
|
|
self.assertEqual(container, 'container')
|
|
self.assertEqual(obj, 'object')
|
|
self.assertEqual(policy, POLICIES[1])
|
|
self.assertEqual(policy.policy_type, REPL_POLICY)
|
|
|
|
req.headers['X-Backend-Storage-Policy-Index'] = 'foo'
|
|
try:
|
|
device, part, account, container, obj, policy = \
|
|
get_name_and_placement(req, 5, 5, True)
|
|
except HTTPException as e:
|
|
self.assertEqual(e.status_int, 503)
|
|
self.assertEqual(str(e), '503 Service Unavailable')
|
|
self.assertEqual(e.body, "No policy with index foo")
|
|
else:
|
|
self.fail('get_name_and_placement did not raise error '
|
|
'for invalid storage policy index')
|
|
|
|
@patch_policies(with_ec_default=True)
|
|
def test_get_name_and_placement_object_replication(self):
|
|
# yup, suffixes are sent '-'.joined in the path
|
|
path = '/device/part/012-345-678-9ab-cde'
|
|
req = Request.blank(path, headers={
|
|
'X-Backend-Storage-Policy-Index': '0'})
|
|
device, partition, suffix_parts, policy = \
|
|
get_name_and_placement(req, 2, 3, True)
|
|
self.assertEqual(device, 'device')
|
|
self.assertEqual(partition, 'part')
|
|
self.assertEqual(suffix_parts, '012-345-678-9ab-cde')
|
|
self.assertEqual(policy, POLICIES[0])
|
|
self.assertEqual(policy.policy_type, EC_POLICY)
|
|
|
|
path = '/device/part'
|
|
req = Request.blank(path, headers={
|
|
'X-Backend-Storage-Policy-Index': '1'})
|
|
device, partition, suffix_parts, policy = \
|
|
get_name_and_placement(req, 2, 3, True)
|
|
self.assertEqual(device, 'device')
|
|
self.assertEqual(partition, 'part')
|
|
self.assertIsNone(suffix_parts) # false-y
|
|
self.assertEqual(policy, POLICIES[1])
|
|
self.assertEqual(policy.policy_type, REPL_POLICY)
|
|
|
|
path = '/device/part/' # with a trailing slash
|
|
req = Request.blank(path, headers={
|
|
'X-Backend-Storage-Policy-Index': '1'})
|
|
device, partition, suffix_parts, policy = \
|
|
get_name_and_placement(req, 2, 3, True)
|
|
self.assertEqual(device, 'device')
|
|
self.assertEqual(partition, 'part')
|
|
self.assertEqual(suffix_parts, '') # still false-y
|
|
self.assertEqual(policy, POLICIES[1])
|
|
self.assertEqual(policy.policy_type, REPL_POLICY)
|
|
|
|
|
|
class TestHTTPResponseToDocumentIters(unittest.TestCase):
|
|
def test_200(self):
|
|
fr = FakeResponse(
|
|
200,
|
|
{'Content-Length': '10', 'Content-Type': 'application/lunch'},
|
|
'sandwiches')
|
|
|
|
doc_iters = http_response_to_document_iters(fr)
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 0)
|
|
self.assertEqual(last_byte, 9)
|
|
self.assertEqual(length, 10)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Content-Length'), '10')
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'sandwiches')
|
|
|
|
self.assertRaises(StopIteration, next, doc_iters)
|
|
|
|
fr = FakeResponse(
|
|
200,
|
|
{'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'application/lunch'},
|
|
'sandwiches')
|
|
|
|
doc_iters = http_response_to_document_iters(fr)
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 0)
|
|
self.assertIsNone(last_byte)
|
|
self.assertIsNone(length)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Transfer-Encoding'), 'chunked')
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'sandwiches')
|
|
|
|
self.assertRaises(StopIteration, next, doc_iters)
|
|
|
|
def test_206_single_range(self):
|
|
fr = FakeResponse(
|
|
206,
|
|
{'Content-Length': '8', 'Content-Type': 'application/lunch',
|
|
'Content-Range': 'bytes 1-8/10'},
|
|
'andwiche')
|
|
|
|
doc_iters = http_response_to_document_iters(fr)
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 1)
|
|
self.assertEqual(last_byte, 8)
|
|
self.assertEqual(length, 10)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Content-Length'), '8')
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'andwiche')
|
|
|
|
self.assertRaises(StopIteration, next, doc_iters)
|
|
|
|
# Chunked response should be treated in the same way as non-chunked one
|
|
fr = FakeResponse(
|
|
206,
|
|
{'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'application/lunch',
|
|
'Content-Range': 'bytes 1-8/10'},
|
|
'andwiche')
|
|
|
|
doc_iters = http_response_to_document_iters(fr)
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 1)
|
|
self.assertEqual(last_byte, 8)
|
|
self.assertEqual(length, 10)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'andwiche')
|
|
|
|
self.assertRaises(StopIteration, next, doc_iters)
|
|
|
|
def test_206_multiple_ranges(self):
|
|
fr = FakeResponse(
|
|
206,
|
|
{'Content-Type': 'multipart/byteranges; boundary=asdfasdfasdf'},
|
|
("--asdfasdfasdf\r\n"
|
|
"Content-Type: application/lunch\r\n"
|
|
"Content-Range: bytes 0-3/10\r\n"
|
|
"\r\n"
|
|
"sand\r\n"
|
|
"--asdfasdfasdf\r\n"
|
|
"Content-Type: application/lunch\r\n"
|
|
"Content-Range: bytes 6-9/10\r\n"
|
|
"\r\n"
|
|
"ches\r\n"
|
|
"--asdfasdfasdf--"))
|
|
|
|
doc_iters = http_response_to_document_iters(fr)
|
|
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 0)
|
|
self.assertEqual(last_byte, 3)
|
|
self.assertEqual(length, 10)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'sand')
|
|
|
|
first_byte, last_byte, length, headers, body = next(doc_iters)
|
|
self.assertEqual(first_byte, 6)
|
|
self.assertEqual(last_byte, 9)
|
|
self.assertEqual(length, 10)
|
|
header_dict = HeaderKeyDict(headers)
|
|
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
|
self.assertEqual(body.read(), 'ches')
|
|
|
|
self.assertRaises(StopIteration, next, doc_iters)
|
|
|
|
def test_update_etag_is_at_header(self):
|
|
# start with no existing X-Backend-Etag-Is-At
|
|
req = Request.blank('/v/a/c/o')
|
|
update_etag_is_at_header(req, 'X-Object-Sysmeta-My-Etag')
|
|
self.assertEqual('X-Object-Sysmeta-My-Etag',
|
|
req.headers['X-Backend-Etag-Is-At'])
|
|
# add another alternate
|
|
update_etag_is_at_header(req, 'X-Object-Sysmeta-Ec-Etag')
|
|
self.assertEqual('X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag',
|
|
req.headers['X-Backend-Etag-Is-At'])
|
|
with self.assertRaises(ValueError) as cm:
|
|
update_etag_is_at_header(req, 'X-Object-Sysmeta-,-Bad')
|
|
self.assertEqual('Header name must not contain commas',
|
|
cm.exception.message)
|
|
|
|
def test_resolve_etag_is_at_header(self):
|
|
def do_test():
|
|
req = Request.blank('/v/a/c/o')
|
|
# ok to have no X-Backend-Etag-Is-At
|
|
self.assertIsNone(resolve_etag_is_at_header(req, metadata))
|
|
|
|
# ok to have no matching metadata
|
|
req.headers['X-Backend-Etag-Is-At'] = 'X-Not-There'
|
|
self.assertIsNone(resolve_etag_is_at_header(req, metadata))
|
|
|
|
# selects from metadata
|
|
req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-Ec-Etag'
|
|
self.assertEqual('an etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-My-Etag'
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
# first in list takes precedence
|
|
req.headers['X-Backend-Etag-Is-At'] = \
|
|
'X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag'
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
# non-existent alternates are passed over
|
|
req.headers['X-Backend-Etag-Is-At'] = \
|
|
'X-Bogus,X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag'
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
# spaces in list are ok
|
|
alts = 'X-Foo, X-Object-Sysmeta-My-Etag , X-Object-Sysmeta-Ec-Etag'
|
|
req.headers['X-Backend-Etag-Is-At'] = alts
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
# lower case in list is ok
|
|
alts = alts.lower()
|
|
req.headers['X-Backend-Etag-Is-At'] = alts
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
# upper case in list is ok
|
|
alts = alts.upper()
|
|
req.headers['X-Backend-Etag-Is-At'] = alts
|
|
self.assertEqual('another etag value',
|
|
resolve_etag_is_at_header(req, metadata))
|
|
|
|
metadata = {'X-Object-Sysmeta-Ec-Etag': 'an etag value',
|
|
'X-Object-Sysmeta-My-Etag': 'another etag value'}
|
|
do_test()
|
|
metadata = dict((k.lower(), v) for k, v in metadata.items())
|
|
do_test()
|
|
metadata = dict((k.upper(), v) for k, v in metadata.items())
|
|
do_test()
|