Enable object system metadata on PUTs

This patch takes a first step towards support
for object system metadata by enabling headers
in the x-object-sysmeta- namespace to be
persisted when objects are PUT. This should be
useful for other pending patches such as on
demand migration and server side encryption
(https://review.openstack.org/#/c/64430/ and
https://review.openstack.org/#/c/76578/1).

The x-object-sysmeta- namespace is already
reserved/protected by the gatekeeper and
passed through the proxy. This patch modifies
the object server to persist these headers
alongside user metadata when an object is
PUT.

This patch will preserve existing object
system metadata and ignore any new system
metadata when handling object POSTs,
including POST-as-copy operations. Support
for modification of object system metadata
with a POST request requires further work
as discussed in the blueprint.

This patch will preserve existing object
system metadata and update it with new
system metadata when copying an object.

A new probe test is added which makes use of
the BrainSplitter class that has been moved
from test_container_merge_policy_index.py to
a new module brain.py.

blueprint object-system-metadata

Change-Id: If716bc15730b7322266ebff4ab8dd31e78e4b962
This commit is contained in:
anc 2014-03-10 11:46:58 +00:00 committed by Samuel Merritt
parent 3e78432cb1
commit 4286f36a60
11 changed files with 998 additions and 185 deletions

View File

@ -225,6 +225,21 @@ def remove_items(headers, condition):
return removed
def copy_header_subset(from_r, to_r, condition):
"""
Will copy desired subset of headers from from_r to to_r.
:param from_r: a swob Request or Response
:param to_r: a swob Request or Response
:param condition: a function that will be passed the header key as a
single argument and should return True if the header
is to be copied.
"""
for k, v in from_r.headers.items():
if condition(k):
to_r.headers[k] = v
def close_if_possible(maybe_closable):
close_method = getattr(maybe_closable, 'close', None)
if callable(close_method):

View File

@ -49,6 +49,7 @@ from eventlet import Timeout
from swift import gettext_ as _
from swift.common.constraints import check_mount
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
@ -1315,7 +1316,8 @@ class DiskFile(object):
self._metadata = self._failsafe_read_metadata(meta_file, meta_file)
sys_metadata = dict(
[(key, val) for key, val in datafile_metadata.iteritems()
if key.lower() in DATAFILE_SYSTEM_META])
if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)])
self._metadata.update(sys_metadata)
else:
self._metadata = datafile_metadata

View File

@ -38,7 +38,8 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout
from swift.obj import ssync_receiver
from swift.common.http import is_success
from swift.common.request_helpers import get_name_and_placement, is_user_meta
from swift.common.request_helpers import get_name_and_placement, \
is_user_meta, is_sys_or_user_meta
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
@ -445,7 +446,7 @@ class ObjectController(object):
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
if is_user_meta('object', val[0]))
if is_sys_or_user_meta('object', val[0]))
for header_key in (
request.headers.get('X-Backend-Replication-Headers') or
self.allowed_headers):
@ -503,7 +504,7 @@ class ObjectController(object):
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or \
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']
@ -549,7 +550,7 @@ class ObjectController(object):
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or \
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']

View File

@ -56,7 +56,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \
HTTPClientDisconnect, HTTPNotImplemented
from swift.common.request_helpers import is_user_meta
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
remove_items, copy_header_subset
def copy_headers_into(from_r, to_r):
@ -67,7 +68,7 @@ def copy_headers_into(from_r, to_r):
"""
pass_headers = ['x-delete-at']
for k, v in from_r.headers.items():
if is_user_meta('object', k) or k.lower() in pass_headers:
if is_sys_or_user_meta('object', k) or k.lower() in pass_headers:
to_r.headers[k] = v
@ -624,8 +625,14 @@ class ObjectController(Controller):
if not content_type_manually_set:
sink_req.headers['Content-Type'] = \
source_resp.headers['Content-Type']
if not config_true_value(
if config_true_value(
sink_req.headers.get('x-fresh-metadata', 'false')):
# post-as-copy: ignore new sysmeta, copy existing sysmeta
condition = lambda k: is_sys_meta('object', k)
remove_items(sink_req.headers, condition)
copy_header_subset(source_resp, sink_req, condition)
else:
# copy/update existing sysmeta and user meta
copy_headers_into(source_resp, sink_req)
copy_headers_into(req, sink_req)
# copy over x-static-large-object for POSTs and manifest copies

206
test/probe/brain.py Normal file
View File

@ -0,0 +1,206 @@
#!/usr/bin/python -u
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import itertools
import uuid
from optparse import OptionParser
from urlparse import urlparse
import random
from swift.common.manager import Manager
from swift.common import utils, ring
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from swiftclient import client, get_auth, ClientException
TIMEOUT = 60
def meta_command(name, bases, attrs):
"""
Look for attrs with a truthy attribute __command__ and add them to an
attribute __commands__ on the type that maps names to decorated methods.
The decorated methods' doc strings also get mapped in __docs__.
Also adds a method run(command_name, *args, **kwargs) that will
execute the method mapped to the name in __commands__.
"""
commands = {}
docs = {}
for attr, value in attrs.items():
if getattr(value, '__command__', False):
commands[attr] = value
# methods have always have a __doc__ attribute, sometimes empty
docs[attr] = (getattr(value, '__doc__', None) or
'perform the %s command' % attr).strip()
attrs['__commands__'] = commands
attrs['__docs__'] = docs
def run(self, command, *args, **kwargs):
return self.__commands__[command](self, *args, **kwargs)
attrs.setdefault('run', run)
return type(name, bases, attrs)
def command(f):
f.__command__ = True
return f
class BrainSplitter(object):
__metaclass__ = meta_command
def __init__(self, url, token, container_name='test', object_name='test',
server_type='container'):
self.url = url
self.token = token
self.account = utils.split_path(urlparse(url).path, 2, 2)[1]
self.container_name = container_name
self.object_name = object_name
server_list = ['%s-server' % server_type] if server_type else ['all']
self.servers = Manager(server_list)
policies = list(POLICIES)
random.shuffle(policies)
self.policies = itertools.cycle(policies)
o = object_name if server_type == 'object' else None
c = container_name if server_type in ('object', 'container') else None
part, nodes = ring.Ring(
'/etc/swift/%s.ring.gz' % server_type).get_nodes(
self.account, c, o)
node_ids = [n['id'] for n in nodes]
if all(n_id in node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
self.handoff_numbers = (3, 4)
else:
self.primary_numbers = (3, 4)
self.handoff_numbers = (1, 2)
@command
def start_primary_half(self):
"""
start servers 1 & 2
"""
tuple(self.servers.start(number=n) for n in self.primary_numbers)
@command
def stop_primary_half(self):
"""
stop servers 1 & 2
"""
tuple(self.servers.stop(number=n) for n in self.primary_numbers)
@command
def start_handoff_half(self):
"""
start servers 3 & 4
"""
tuple(self.servers.start(number=n) for n in self.handoff_numbers)
@command
def stop_handoff_half(self):
"""
stop servers 3 & 4
"""
tuple(self.servers.stop(number=n) for n in self.handoff_numbers)
@command
def put_container(self, policy_index=None):
"""
put container with next storage policy
"""
policy = self.policies.next()
if policy_index is not None:
policy = POLICIES.get_by_index(int(policy_index))
if not policy:
raise ValueError('Unknown policy with index %s' % policy)
headers = {'X-Storage-Policy': policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
@command
def delete_container(self):
"""
delete container
"""
client.delete_container(self.url, self.token, self.container_name)
@command
def put_object(self, headers=None):
"""
issue put for zero byte test object
"""
client.put_object(self.url, self.token, self.container_name,
self.object_name, headers=headers)
@command
def delete_object(self):
"""
issue delete for test object
"""
try:
client.delete_object(self.url, self.token, self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
parser = OptionParser('%prog [options] '
'<command>[:<args>[,<args>...]] [<command>...]')
parser.usage += '\n\nCommands:\n\t' + \
'\n\t'.join("%s - %s" % (name, doc) for name, doc in
BrainSplitter.__docs__.items())
parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(),
help='set container name')
parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(),
help='set object name')
parser.add_option('-s', '--server_type', default='container',
help='set server type')
def main():
options, commands = parser.parse_args()
if not commands:
parser.print_help()
return 'ERROR: must specify at least one command'
for cmd_args in commands:
cmd = cmd_args.split(':', 1)[0]
if cmd not in BrainSplitter.__commands__:
parser.print_help()
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
brain = BrainSplitter(url, token, options.container, options.object,
options.server_type)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
if len(parts) > 1:
args = utils.list_from_csv(parts[1])
else:
args = ()
try:
brain.run(command, *args)
except ClientException as e:
print '**WARNING**: %s raised %s' % (command, e)
print 'STATUS'.join(['*' * 25] * 2)
brain.servers.status()
sys.exit()
if __name__ == "__main__":
sys.exit(main())

View File

@ -13,166 +13,26 @@
# limitations under the License.
from hashlib import md5
import sys
import itertools
import time
import unittest
import uuid
from optparse import OptionParser
from urlparse import urlparse
import random
from nose import SkipTest
from swift.common.manager import Manager
from swift.common.internal_client import InternalClient
from swift.common import utils, direct_client, ring
from swift.common import utils, direct_client
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from test.probe.brain import BrainSplitter
from test.probe.common import reset_environment, get_to_final_state
from swiftclient import client, get_auth, ClientException
from swiftclient import client, ClientException
TIMEOUT = 60
def meta_command(name, bases, attrs):
"""
Look for attrs with a truthy attribute __command__ and add them to an
attribute __commands__ on the type that maps names to decorated methods.
The decorated methods' doc strings also get mapped in __docs__.
Also adds a method run(command_name, *args, **kwargs) that will
execute the method mapped to the name in __commands__.
"""
commands = {}
docs = {}
for attr, value in attrs.items():
if getattr(value, '__command__', False):
commands[attr] = value
# methods have always have a __doc__ attribute, sometimes empty
docs[attr] = (getattr(value, '__doc__', None) or
'perform the %s command' % attr).strip()
attrs['__commands__'] = commands
attrs['__docs__'] = docs
def run(self, command, *args, **kwargs):
return self.__commands__[command](self, *args, **kwargs)
attrs.setdefault('run', run)
return type(name, bases, attrs)
def command(f):
f.__command__ = True
return f
class BrainSplitter(object):
__metaclass__ = meta_command
def __init__(self, url, token, container_name='test', object_name='test'):
self.url = url
self.token = token
self.account = utils.split_path(urlparse(url).path, 2, 2)[1]
self.container_name = container_name
self.object_name = object_name
self.servers = Manager(['container-server'])
policies = list(POLICIES)
random.shuffle(policies)
self.policies = itertools.cycle(policies)
container_part, container_nodes = ring.Ring(
'/etc/swift/container.ring.gz').get_nodes(
self.account, self.container_name)
container_node_ids = [n['id'] for n in container_nodes]
if all(n_id in container_node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
self.handoff_numbers = (3, 4)
else:
self.primary_numbers = (3, 4)
self.handoff_numbers = (1, 2)
@command
def start_primary_half(self):
"""
start container servers 1 & 2
"""
tuple(self.servers.start(number=n) for n in self.primary_numbers)
@command
def stop_primary_half(self):
"""
stop container servers 1 & 2
"""
tuple(self.servers.stop(number=n) for n in self.primary_numbers)
@command
def start_handoff_half(self):
"""
start container servers 3 & 4
"""
tuple(self.servers.start(number=n) for n in self.handoff_numbers)
@command
def stop_handoff_half(self):
"""
stop container servers 3 & 4
"""
tuple(self.servers.stop(number=n) for n in self.handoff_numbers)
@command
def put_container(self, policy_index=None):
"""
put container with next storage policy
"""
policy = self.policies.next()
if policy_index is not None:
policy = POLICIES.get_by_index(int(policy_index))
if not policy:
raise ValueError('Unknown policy with index %s' % policy)
headers = {'X-Storage-Policy': policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
@command
def delete_container(self):
"""
delete container
"""
client.delete_container(self.url, self.token, self.container_name)
@command
def put_object(self, headers=None):
"""
issue put for zero byte test object
"""
client.put_object(self.url, self.token, self.container_name,
self.object_name, headers=headers)
@command
def delete_object(self):
"""
issue delete for test object
"""
try:
client.delete_object(self.url, self.token, self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
parser = OptionParser('%prog split-brain [options] '
'<command>[:<args>[,<args>...]] [<command>...]')
parser.usage += '\n\nCommands:\n\t' + \
'\n\t'.join("%s - %s" % (name, doc) for name, doc in
BrainSplitter.__docs__.items())
parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(),
help='set container name')
parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(),
help='set object name')
class TestContainerMergePolicyIndex(unittest.TestCase):
def setUp(self):
@ -184,7 +44,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name)
self.object_name, 'container')
def test_merge_storage_policy_index(self):
# generic split brain
@ -594,37 +454,5 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
self.fail('Found unexpected object %r in the queue' % obj)
def main():
options, commands = parser.parse_args()
commands.remove('split-brain')
if not commands:
parser.print_help()
return 'ERROR: must specify at least one command'
for cmd_args in commands:
cmd = cmd_args.split(':', 1)[0]
if cmd not in BrainSplitter.__commands__:
parser.print_help()
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
brain = BrainSplitter(url, token, options.container, options.object)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
if len(parts) > 1:
args = utils.list_from_csv(parts[1])
else:
args = ()
try:
brain.run(command, *args)
except ClientException as e:
print '**WARNING**: %s raised %s' % (command, e)
print 'STATUS'.join(['*' * 25] * 2)
brain.servers.status()
sys.exit()
if __name__ == "__main__":
if any('split-brain' in arg for arg in sys.argv):
sys.exit(main())
unittest.main()

View File

@ -0,0 +1,186 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from io import StringIO
from tempfile import mkdtemp
from textwrap import dedent
import os
import shutil
import unittest
import uuid
from swift.common import internal_client
from test.probe.brain import BrainSplitter
from test.probe.common import kill_servers, reset_environment, \
get_to_final_state
class Test(unittest.TestCase):
def setUp(self):
"""
Reset all environment and start all servers.
"""
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.policy, self.url, self.token,
self.account, self.configs) = reset_environment()
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name, 'object')
self.tempdir = mkdtemp()
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
conf_body = """
[DEFAULT]
swift_dir = /etc/swift
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
object_post_as_copy = false
[filter:cache]
use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
"""
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
self.int_client = internal_client.InternalClient(conf_path, 'test', 1)
def tearDown(self):
"""
Stop all servers.
"""
kill_servers(self.port2server, self.pids)
shutil.rmtree(self.tempdir)
def _put_object(self, headers=None):
headers = headers or {}
self.int_client.upload_object(StringIO(u'stuff'), self.account,
self.container_name,
self.object_name, headers)
def _post_object(self, headers):
self.int_client.set_object_metadata(self.account, self.container_name,
self.object_name, headers)
def _get_object_metadata(self):
return self.int_client.get_object_metadata(self.account,
self.container_name,
self.object_name)
def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
self.brain.put_container(policy_index=0)
# put object
self._put_object()
# put newer object with sysmeta to first server subset
self.brain.stop_primary_half()
self._put_object(headers=sysmeta)
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
self.brain.start_primary_half()
# post some user meta to second server subset
self.brain.stop_handoff_half()
self._post_object(usermeta)
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
for key in sysmeta:
self.assertFalse(key in metadata)
self.brain.start_handoff_half()
# run replicator
get_to_final_state()
# check user metadata has been replicated to first server subset
# and sysmeta is unchanged
self.brain.stop_primary_half()
metadata = self._get_object_metadata()
expected = dict(sysmeta)
expected.update(usermeta)
for key in expected.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key])
self.brain.start_primary_half()
# check user metadata and sysmeta both on second server subset
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
for key in expected.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key])
def test_sysmeta_after_replication_with_prior_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
self.brain.put_container(policy_index=0)
# put object
self._put_object()
# put user meta to first server subset
self.brain.stop_handoff_half()
self._post_object(headers=usermeta)
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
self.brain.start_handoff_half()
# put newer object with sysmeta to second server subset
self.brain.stop_primary_half()
self._put_object(headers=sysmeta)
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
self.brain.start_primary_half()
# run replicator
get_to_final_state()
# check stale user metadata is not replicated to first server subset
# and sysmeta is unchanged
self.brain.stop_primary_half()
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta:
self.assertFalse(key in metadata)
self.brain.start_primary_half()
# check stale user metadata is removed from second server subset
# and sysmeta is replicated
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta:
self.assertFalse(key in metadata)
if __name__ == "__main__":
unittest.main()

View File

@ -16,9 +16,10 @@
"""Tests for swift.common.request_helpers"""
import unittest
from swift.common.swob import Request
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
remove_items
remove_items, copy_header_subset
server_types = ['account', 'container', 'object']
@ -68,3 +69,15 @@ class TestRequestHelpers(unittest.TestCase):
rem = remove_items(src, test)
self.assertEquals(src, {'c': 'd'})
self.assertEquals(rem, {'a': 'b'})
def test_copy_header_subset(self):
src = {'a': 'b',
'c': 'd'}
from_req = Request.blank('/path', environ={}, headers=src)
to_req = Request.blank('/path', {})
test = lambda x: x.lower() == 'a'
copy_header_subset(from_req, to_req, test)
self.assertTrue('A' in to_req.headers)
self.assertEqual(to_req.headers['A'], 'b')
self.assertFalse('c' in to_req.headers)
self.assertFalse('C' in to_req.headers)

View File

@ -1080,6 +1080,25 @@ class TestDiskFile(unittest.TestCase):
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
def test_disk_file_preserves_sysmeta(self):
# build an object with some meta (ts 41)
orig_metadata = {'X-Object-Sysmeta-Key1': 'Value1',
'Content-Type': 'text/garbage'}
df = self._get_open_disk_file(ts=41, extra_metadata=orig_metadata)
with df.open():
self.assertEquals('1024', df._metadata['Content-Length'])
# write some new metadata (fast POST, don't send orig meta, ts 42)
df = self._simple_get_diskfile()
df.write_metadata({'X-Timestamp': Timestamp(42).internal,
'X-Object-Sysmeta-Key1': 'Value2',
'X-Object-Meta-Key3': 'Value3'})
df = self._simple_get_diskfile()
with df.open():
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original sysmeta keys are preserved
self.assertEquals('Value1', df._metadata['X-Object-Sysmeta-Key1'])
def test_disk_file_reader_iter(self):
df = self._create_test_file('1234567890')
quarantine_msgs = []

View File

@ -730,6 +730,181 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 408)
def test_PUT_system_metadata(self):
# check that sysmeta is stored in diskfile
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA')
self.assertEquals(diskfile.read_metadata(objfile),
{'X-Timestamp': timestamp,
'Content-Length': '14',
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'name': '/a/c/o',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
def test_POST_system_metadata(self):
# check that diskfile sysmeta is not changed by a POST
timestamp1 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp1,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp2 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp2,
'X-Object-Meta-1': 'Not One',
'X-Object-Sysmeta-1': 'Not One',
'X-Object-Sysmeta-Two': 'Not Two'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
# original .data file metadata should be unchanged
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp1 + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA')
self.assertEquals(diskfile.read_metadata(objfile),
{'X-Timestamp': timestamp1,
'Content-Length': '14',
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'name': '/a/c/o',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
# .meta file metadata should have only user meta items
metafile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp2 + '.meta')
self.assert_(os.path.isfile(metafile))
self.assertEquals(diskfile.read_metadata(metafile),
{'X-Timestamp': timestamp2,
'name': '/a/c/o',
'X-Object-Meta-1': 'Not One'})
def test_PUT_then_fetch_system_metadata(self):
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
def check_response(resp):
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 14)
self.assertEquals(resp.content_type, 'text/plain')
self.assertEquals(resp.headers['content-type'], 'text/plain')
self.assertEquals(
resp.headers['last-modified'],
strftime('%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(timestamp)))))
self.assertEquals(resp.headers['etag'],
'"1000d172764c9dbc3a5798a67ec5bb76"')
self.assertEquals(resp.headers['x-object-meta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two')
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
check_response(resp)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
check_response(resp)
def test_PUT_then_POST_then_fetch_system_metadata(self):
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp2 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp2,
'X-Object-Meta-1': 'Not One',
'X-Object-Sysmeta-1': 'Not One',
'X-Object-Sysmeta-Two': 'Not Two'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
def check_response(resp):
# user meta should be updated but not sysmeta
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 14)
self.assertEquals(resp.content_type, 'text/plain')
self.assertEquals(resp.headers['content-type'], 'text/plain')
self.assertEquals(
resp.headers['last-modified'],
strftime('%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(timestamp2)))))
self.assertEquals(resp.headers['etag'],
'"1000d172764c9dbc3a5798a67ec5bb76"')
self.assertEquals(resp.headers['x-object-meta-1'], 'Not One')
self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two')
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
check_response(resp)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
check_response(resp)
def test_PUT_container_connection(self):
def mock_http_connect(response, with_exc=False):

View File

@ -0,0 +1,361 @@
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from tempfile import mkdtemp
from urllib import quote
from swift.common.storage_policy import StoragePolicy
from swift.common.swob import Request
from swift.common.utils import mkdirs, split_path
from swift.common.wsgi import monkey_patch_mimetools, WSGIContext
from swift.obj import server as object_server
from swift.proxy import server as proxy
import swift.proxy.controllers
from test.unit import FakeMemcache, debug_logger, FakeRing, \
fake_http_connect, patch_policies
class FakeServerConnection(WSGIContext):
'''Fakes an HTTPConnection to a server instance.'''
def __init__(self, app):
super(FakeServerConnection, self).__init__(app)
self.data = ''
def getheaders(self):
return self._response_headers
def read(self, amt=None):
try:
result = self.resp_iter.next()
return result
except StopIteration:
return ''
def getheader(self, name, default=None):
result = self._response_header_value(name)
return result if result else default
def getresponse(self):
environ = {'REQUEST_METHOD': self.method}
req = Request.blank(self.path, environ, headers=self.req_headers,
body=self.data)
self.resp = self._app_call(req.environ)
self.resp_iter = iter(self.resp)
if self._response_headers is None:
self._response_headers = []
status_parts = self._response_status.split(' ', 1)
self.status = int(status_parts[0])
self.reason = status_parts[1] if len(status_parts) == 2 else ''
return self
def getexpect(self):
class ContinueResponse(object):
status = 100
return ContinueResponse()
def send(self, data):
self.data = data
def __call__(self, ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
self.path = quote('/' + device + '/' + str(partition) + path)
self.method = method
self.req_headers = headers
return self
def get_http_connect(account_func, container_func, object_func):
'''Returns a http_connect function that delegates to
entity-specific http_connect methods based on request path.
'''
def http_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
a, c, o = split_path(path, 1, 3, True)
if o:
func = object_func
elif c:
func = container_func
else:
func = account_func
resp = func(ipaddr, port, device, partition, method, path,
headers=headers, query_string=query_string)
return resp
return http_connect
@patch_policies([StoragePolicy(0, 'zero', True,
object_ring=FakeRing(replicas=1))])
class TestObjectSysmeta(unittest.TestCase):
'''Tests object sysmeta is correctly handled by combination
of proxy server and object server.
'''
def _assertStatus(self, resp, expected):
self.assertEqual(resp.status_int, expected,
'Expected %d, got %s'
% (expected, resp.status))
def _assertInHeaders(self, resp, expected):
for key, val in expected.iteritems():
self.assertTrue(key in resp.headers,
'Header %s missing from %s' % (key, resp.headers))
self.assertEqual(val, resp.headers[key],
'Expected header %s:%s, got %s:%s'
% (key, val, key, resp.headers[key]))
def _assertNotInHeaders(self, resp, unexpected):
for key, val in unexpected.iteritems():
self.assertFalse(key in resp.headers,
'Header %s not expected in %s'
% (key, resp.headers))
def setUp(self):
self.app = proxy.Application(None, FakeMemcache(),
logger=debug_logger('proxy-ut'),
account_ring=FakeRing(replicas=1),
container_ring=FakeRing(replicas=1))
monkey_patch_mimetools()
self.testdir = \
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.obj_ctlr = object_server.ObjectController(
conf, logger=debug_logger('obj-ut'))
http_connect = get_http_connect(fake_http_connect(200),
fake_http_connect(200),
FakeServerConnection(self.obj_ctlr))
swift.proxy.controllers.base.http_connect = http_connect
swift.proxy.controllers.obj.http_connect = http_connect
original_sysmeta_headers_1 = {'x-object-sysmeta-test0': 'val0',
'x-object-sysmeta-test1': 'val1'}
original_sysmeta_headers_2 = {'x-object-sysmeta-test2': 'val2'}
changed_sysmeta_headers = {'x-object-sysmeta-test0': '',
'x-object-sysmeta-test1': 'val1 changed'}
new_sysmeta_headers = {'x-object-sysmeta-test3': 'val3'}
original_meta_headers_1 = {'x-object-meta-test0': 'meta0',
'x-object-meta-test1': 'meta1'}
original_meta_headers_2 = {'x-object-meta-test2': 'meta2'}
changed_meta_headers = {'x-object-meta-test0': '',
'x-object-meta-test1': 'meta1 changed'}
new_meta_headers = {'x-object-meta-test3': 'meta3'}
bad_headers = {'x-account-sysmeta-test1': 'bad1'}
def test_PUT_sysmeta_then_GET(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertInHeaders(resp, self.original_meta_headers_1)
self._assertNotInHeaders(resp, self.bad_headers)
def test_PUT_sysmeta_then_HEAD(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'HEAD'}
req = Request.blank(path, environ=env)
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertInHeaders(resp, self.original_meta_headers_1)
self._assertNotInHeaders(resp, self.bad_headers)
def test_sysmeta_replaced_by_PUT(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertNotInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertNotInHeaders(resp, self.original_meta_headers_2)
def _test_sysmeta_not_updated_by_POST(self):
# check sysmeta is not changed by a POST but user meta is replaced
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'POST'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs)
resp = req.get_response(self.app)
self._assertStatus(resp, 202)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertNotInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertNotInHeaders(resp, self.bad_headers)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertNotInHeaders(resp, self.original_sysmeta_headers_2)
def test_sysmeta_not_updated_by_POST(self):
self.app.object_post_as_copy = False
self._test_sysmeta_not_updated_by_POST()
def test_sysmeta_not_updated_by_POST_as_copy(self):
self.app.object_post_as_copy = True
self._test_sysmeta_not_updated_by_POST()
def test_sysmeta_updated_by_COPY(self):
# check sysmeta is updated by a COPY in same way as user meta
path = '/v1/a/c/o'
dest = '/c/o2'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'COPY'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
hdrs.update({'Destination': dest})
req = Request.blank(path, environ=env, headers=hdrs)
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
req = Request.blank('/v1/a/c/o2', environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
def test_sysmeta_updated_by_COPY_from(self):
# check sysmeta is updated by a COPY in same way as user meta
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
hdrs.update({'X-Copy-From': '/c/o'})
req = Request.blank('/v1/a/c/o2', environ=env, headers=hdrs, body='')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
req = Request.blank('/v1/a/c/o2', environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)