Merge "New Object Versioning mode"

This commit is contained in:
Zuul 2020-01-27 09:16:22 +00:00 committed by Gerrit Code Review
commit 742835a6ec
33 changed files with 8307 additions and 163 deletions

View File

@ -68,6 +68,7 @@ use = egg:swift#gatekeeper
[filter:versioned_writes]
use = egg:swift#versioned_writes
allow_versioned_writes = true
allow_object_versioning = true
[filter:copy]
use = egg:swift#copy

View File

@ -140,6 +140,7 @@ SSC :ref:`copy`
SYM :ref:`symlink`
SH :ref:`sharding_doc`
S3 :ref:`s3api`
OV :ref:`object_versioning`
======================= =============================

View File

@ -278,12 +278,12 @@ Name Check (Forbidden Character Filter)
:members:
:show-inheritance:
.. _versioned_writes:
.. _object_versioning:
Object Versioning
=================
.. automodule:: swift.common.middleware.versioned_writes
.. automodule:: swift.common.middleware.versioned_writes.object_versioning
:members:
:show-inheritance:
@ -371,6 +371,15 @@ TempURL
:members:
:show-inheritance:
.. _versioned_writes:
Versioned Writes
=================
.. automodule:: swift.common.middleware.versioned_writes.legacy
:members:
:show-inheritance:
XProfile
==============

View File

@ -1072,6 +1072,8 @@ use = egg:swift#versioned_writes
# in the container configuration file, which will be eventually
# deprecated. See documentation for more details.
# allow_versioned_writes = false
# Enables Swift object-versioning API
# allow_object_versioning = false
# Note: Put after auth and before dlo and slo middlewares.
# If you don't put it in the pipeline, it will be inserted for you.

View File

@ -457,7 +457,8 @@ class Bulk(object):
failed_files.append([wsgi_quote(str_to_wsgi(obj_name)),
HTTPPreconditionFailed().status])
continue
yield (obj_name, delete_path)
yield (obj_name, delete_path,
obj_to_delete.get('version_id'))
def objs_then_containers(objs_to_delete):
# process all objects first
@ -467,13 +468,17 @@ class Bulk(object):
yield delete_filter(lambda name: '/' not in name.strip('/'),
objs_to_delete)
def do_delete(obj_name, delete_path):
def do_delete(obj_name, delete_path, version_id):
delete_obj_req = make_subrequest(
req.environ, method='DELETE',
path=wsgi_quote(str_to_wsgi(delete_path)),
headers={'X-Auth-Token': req.headers.get('X-Auth-Token')},
body='', agent='%(orig)s ' + user_agent,
swift_source=swift_source)
if version_id is None:
delete_obj_req.params = {}
else:
delete_obj_req.params = {'version-id': version_id}
return (delete_obj_req.get_response(self.app), obj_name, 0)
with StreamingPile(self.delete_concurrency) as pile:

View File

@ -87,6 +87,14 @@ class ContainerSync(object):
info = get_container_info(
req.environ, self.app, swift_source='CS')
sync_to = req.headers.get('x-container-sync-to')
if req.method in ('PUT', 'POST') and cont and not obj:
versions_cont = info.get(
'sysmeta', {}).get('versions-container')
if sync_to and versions_cont:
raise HTTPBadRequest(
'Cannot configure container sync on a container '
'with object versioning configured.',
request=req)
if not self.allow_full_urls:
if sync_to and not sync_to.startswith('//'):

View File

@ -319,6 +319,9 @@ class ServerSideCopyMiddleware(object):
if 'last-modified' in source_resp.headers:
resp_headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
if 'X-Object-Version-Id' in source_resp.headers:
resp_headers['X-Copied-From-Version-Id'] = \
source_resp.headers['X-Object-Version-Id']
# Existing sys and user meta of source object is added to response
# headers in addition to the new ones.
_copy_headers(sink_req.headers, resp_headers)
@ -374,6 +377,8 @@ class ServerSideCopyMiddleware(object):
sink_req.headers.update(req.headers)
params = sink_req.params
params_updated = False
if params.get('multipart-manifest') == 'get':
if 'X-Static-Large-Object' in source_resp.headers:
params['multipart-manifest'] = 'put'
@ -381,6 +386,13 @@ class ServerSideCopyMiddleware(object):
del params['multipart-manifest']
sink_req.headers['X-Object-Manifest'] = \
source_resp.headers['X-Object-Manifest']
params_updated = True
if 'version-id' in params:
del params['version-id']
params_updated = True
if params_updated:
sink_req.params = params
# Set swift.source, data source, content length and etag

View File

@ -170,7 +170,12 @@ class ListingFilter(object):
params['format'] = 'json'
req.params = params
# Give other middlewares a chance to be in charge
env.setdefault('swift.format_listing', True)
status, headers, resp_iter = req.call_application(self.app)
if not env.get('swift.format_listing'):
start_response(status, headers)
return resp_iter
header_to_index = {}
resp_content_type = resp_length = None

View File

@ -1417,6 +1417,9 @@ class StaticLargeObject(object):
segments = [{
'sub_slo': True,
'name': obj_path}]
if 'version-id' in req.params:
segments[0]['version_id'] = req.params['version-id']
while segments:
# We chose not to set the limit at max_manifest_segments
# in the case this value was decreased by operators.
@ -1469,6 +1472,9 @@ class StaticLargeObject(object):
new_env['REQUEST_METHOD'] = 'GET'
del(new_env['wsgi.input'])
new_env['QUERY_STRING'] = 'multipart-manifest=get'
if 'version-id' in req.params:
new_env['QUERY_STRING'] += \
'&version-id=' + req.params['version-id']
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT')

View File

@ -202,7 +202,8 @@ import os
from cgi import parse_header
from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value
from swift.common.constraints import check_account_format
from swift.common.wsgi import WSGIContext, make_subrequest
from swift.common.request_helpers import get_sys_meta_prefix, \
@ -228,6 +229,8 @@ TGT_ETAG_SYSMETA_SYMLINK_HDR = \
get_sys_meta_prefix('object') + 'symlink-target-etag'
TGT_BYTES_SYSMETA_SYMLINK_HDR = \
get_sys_meta_prefix('object') + 'symlink-target-bytes'
SYMLOOP_EXTEND = get_sys_meta_prefix('object') + 'symloop-extend'
ALLOW_RESERVED_NAMES = get_sys_meta_prefix('object') + 'allow-reserved-names'
def _validate_and_prep_request_headers(req):
@ -477,7 +480,13 @@ class SymlinkObjectContext(WSGIContext):
raise LinkIterError()
# format: /<account name>/<container name>/<object name>
new_req = build_traversal_req(symlink_target)
self._loop_count += 1
if not config_true_value(
self._response_header_value(SYMLOOP_EXTEND)):
self._loop_count += 1
if config_true_value(
self._response_header_value(ALLOW_RESERVED_NAMES)):
new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
return self._recursive_get_head(new_req, target_etag=resp_etag)
else:
final_etag = self._response_header_value('etag')
@ -516,6 +525,8 @@ class SymlinkObjectContext(WSGIContext):
new_req = make_subrequest(
req.environ, path=wsgi_quote(symlink_target_path), method='HEAD',
swift_source='SYM')
if req.allow_reserved_names:
new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
self._last_target_path = symlink_target_path
resp = self._recursive_get_head(new_req, target_etag=etag,
follow_softlinks=False)
@ -659,6 +670,9 @@ class SymlinkObjectContext(WSGIContext):
req.environ['swift.leave_relative_location'] = True
errmsg = 'The requested POST was applied to a symlink. POST ' +\
'directly to the target to apply requested metadata.'
for key, value in self._response_headers:
if key.lower().startswith('x-object-sysmeta-'):
headers[key] = value
raise HTTPTemporaryRedirect(
body=errmsg, headers=headers)
else:

View File

@ -0,0 +1,51 @@
# Copyright (c) 2019 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Implements middleware for object versioning which comprises an instance of a
:class:`~swift.common.middleware.versioned_writes.legacy.
VersionedWritesMiddleware` combined with an instance of an
:class:`~swift.common.middleware.versioned_writes.object_versioning.
ObjectVersioningMiddleware`.
"""
from swift.common.middleware.versioned_writes. \
legacy import CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC, \
VersionedWritesMiddleware
from swift.common.middleware.versioned_writes. \
object_versioning import ObjectVersioningMiddleware
from swift.common.utils import config_true_value, register_swift_info, \
get_swift_info
def filter_factory(global_conf, **local_conf):
"""Provides a factory function for loading versioning middleware."""
conf = global_conf.copy()
conf.update(local_conf)
if config_true_value(conf.get('allow_versioned_writes')):
register_swift_info('versioned_writes', allowed_flags=(
CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC))
allow_object_versioning = config_true_value(conf.get(
'allow_object_versioning'))
if allow_object_versioning:
register_swift_info('object_versioning')
def versioning_filter(app):
if allow_object_versioning:
if 'symlink' not in get_swift_info():
raise ValueError('object versioning requires symlinks')
app = ObjectVersioningMiddleware(app, conf)
return VersionedWritesMiddleware(app, conf)
return versioning_filter

View File

@ -14,6 +14,11 @@
# limitations under the License.
"""
.. note::
This middleware supports two legacy modes of object versioning that is
now replaced by a new mode. It is recommended to use the new
:ref:`Object Versioning <object_versioning>` mode for new containers.
Object versioning in swift is implemented by setting a flag on the container
to tell swift to version all objects in the container. The value of the flag is
the URL-encoded container name where the versions are stored (commonly referred
@ -225,7 +230,7 @@ import json
import time
from swift.common.utils import get_logger, Timestamp, \
register_swift_info, config_true_value, close_if_possible, FileLikeIter
config_true_value, close_if_possible, FileLikeIter
from swift.common.request_helpers import get_sys_meta_prefix, \
copy_header_subset
from swift.common.wsgi import WSGIContext, make_pre_authed_request
@ -457,6 +462,7 @@ class VersionedWritesContext(WSGIContext):
put_path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, vers_obj_name)
req.environ['QUERY_STRING'] = ''
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
self._check_response_error(req, put_resp)
@ -601,6 +607,7 @@ class VersionedWritesContext(WSGIContext):
break
obj_to_restore = bytes_to_wsgi(
version_to_restore['name'].encode('utf-8'))
req.environ['QUERY_STRING'] = ''
restored_path = self._restore_data(
req, versions_cont, api_version, account_name,
container_name, object_name, obj_to_restore)
@ -632,6 +639,7 @@ class VersionedWritesContext(WSGIContext):
# current object and delete the previous version
prev_obj_name = bytes_to_wsgi(
previous_version['name'].encode('utf-8'))
req.environ['QUERY_STRING'] = ''
restored_path = self._restore_data(
req, versions_cont, api_version, account_name,
container_name, object_name, prev_obj_name)
@ -856,16 +864,3 @@ class VersionedWritesMiddleware(object):
return error_response(env, start_response)
else:
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
if config_true_value(conf.get('allow_versioned_writes')):
register_swift_info('versioned_writes', allowed_flags=(
CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC))
def obj_versions_filter(app):
return VersionedWritesMiddleware(app, conf)
return obj_versions_filter

File diff suppressed because it is too large Load Diff

View File

@ -1012,6 +1012,33 @@ class Request(object):
self.query_string = urllib.parse.urlencode(param_pairs,
encoding='latin-1')
def ensure_x_timestamp(self):
"""
Similar to :attr:`timestamp`, but the ``X-Timestamp`` header will be
set if not present.
:raises HTTPBadRequest: if X-Timestamp is already set but not a valid
:class:`~swift.common.utils.Timestamp`
:returns: the request's X-Timestamp header,
as a :class:`~swift.common.utils.Timestamp`
"""
# The container sync feature includes an x-timestamp header with
# requests. If present this is checked and preserved, otherwise a fresh
# timestamp is added.
if 'HTTP_X_TIMESTAMP' in self.environ:
try:
self._timestamp = Timestamp(self.environ['HTTP_X_TIMESTAMP'])
except ValueError:
raise HTTPBadRequest(
request=self, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % self.environ['HTTP_X_TIMESTAMP'])
else:
self._timestamp = Timestamp.now()
# Always normalize it to the internal form
self.environ['HTTP_X_TIMESTAMP'] = self._timestamp.internal
return self._timestamp
@property
def timestamp(self):
"""

View File

@ -1376,6 +1376,11 @@ class Timestamp(object):
def __hash__(self):
return hash(self.internal)
def __invert__(self):
if self.offset:
raise ValueError('Cannot invert timestamps with offsets')
return Timestamp((999999999999999 - self.raw) * PRECISION)
def encode_timestamps(t1, t2=None, t3=None, explicit=False):
"""

View File

@ -1236,7 +1236,7 @@ class ContainerBroker(DatabaseBroker):
limit, marker, end_marker, prefix=None, delimiter=None, path=None,
reverse=False, include_deleted=include_deleted,
transform_func=self._record_to_dict, since_row=since_row,
all_policies=True
all_policies=True, allow_reserved=True
)
def _transform_record(self, record):

View File

@ -44,6 +44,8 @@ from swift.common.utils import (
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.wsgi import ConfigString
from swift.common.middleware.versioned_writes.object_versioning import (
SYSMETA_VERSIONS_CONT, SYSMETA_VERSIONS_SYMLINK)
# The default internal client config body is to support upgrades without
@ -358,6 +360,13 @@ class ContainerSync(Daemon):
break
else:
return
if broker.metadata.get(SYSMETA_VERSIONS_CONT):
self.container_skips += 1
self.logger.increment('skips')
self.logger.warning('Skipping container %s/%s with '
'object versioning configured' % (
info['account'], info['container']))
return
if not broker.is_deleted():
sync_to = None
user_key = None
@ -594,6 +603,16 @@ class ContainerSync(Daemon):
headers = {}
body = None
exc = err
# skip object_versioning links; this is in case the container
# metadata is out of date
if headers.get(SYSMETA_VERSIONS_SYMLINK):
self.logger.info(
'Skipping versioning symlink %s/%s/%s ' % (
info['account'], info['container'],
row['name']))
return True
timestamp = Timestamp(headers.get('x-timestamp', 0))
if timestamp < ts_meta:
if exc:

View File

@ -764,8 +764,9 @@ class ObjectController(BaseStorageServer):
'PUT', account, container, obj, request, update_headers,
device, policy)
# Add sysmeta to response
resp_headers = {}
# Add current content-type and sysmeta to response
resp_headers = {
'X-Backend-Content-Type': content_type_headers['Content-Type']}
for key, value in orig_metadata.items():
if is_sys_meta('object', key):
resp_headers[key] = value
@ -1276,7 +1277,9 @@ class ObjectController(BaseStorageServer):
device, policy)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
headers={'X-Backend-Timestamp': response_timestamp.internal,
'X-Backend-Content-Type': orig_metadata.get(
'Content-Type', '')})
@public
@replication

View File

@ -57,7 +57,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE, HTTP_GONE
from swift.common.swob import Request, Response, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable, \
status_map, wsgi_to_str, str_to_wsgi, wsgi_quote, normalize_etag
status_map, wsgi_to_str, str_to_wsgi, wsgi_quote, wsgi_unquote, \
normalize_etag
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
http_response_to_document_iters, is_object_transient_sysmeta, \
@ -396,6 +397,17 @@ def get_container_info(env, app, swift_source=None):
if info.get('sharding_state') is None:
info['sharding_state'] = 'unsharded'
versions_cont = info.get('sysmeta', {}).get('versions-container', '')
if versions_cont:
versions_cont = wsgi_unquote(str_to_wsgi(
versions_cont)).split('/')[0]
versions_req = _prepare_pre_auth_info_request(
env, ("/%s/%s/%s" % (version, wsgi_account, versions_cont)),
(swift_source or 'GET_CONTAINER_INFO'))
versions_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
versions_info = get_container_info(versions_req.environ, app)
info['bytes'] = info['bytes'] + versions_info['bytes']
return info

View File

@ -303,7 +303,7 @@ class BaseObjectController(Controller):
if error_response:
return error_response
req.headers['X-Timestamp'] = Timestamp.now().internal
req.ensure_x_timestamp()
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
@ -548,23 +548,6 @@ class BaseObjectController(Controller):
if detect_content_type:
req.headers.pop('x-detect-content-type')
def _update_x_timestamp(self, req):
# The container sync feature includes an x-timestamp header with
# requests. If present this is checked and preserved, otherwise a fresh
# timestamp is added.
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
except ValueError:
raise HTTPBadRequest(
request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
req.headers['X-Timestamp'] = req_timestamp.internal
else:
req.headers['X-Timestamp'] = Timestamp.now().internal
return None
def _check_failure_put_connections(self, putters, req, min_conns):
"""
Identify any failed connections and check minimum connection count.
@ -786,7 +769,7 @@ class BaseObjectController(Controller):
# update content type in case it is missing
self._update_content_type(req)
self._update_x_timestamp(req)
req.ensure_x_timestamp()
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
@ -846,7 +829,7 @@ class BaseObjectController(Controller):
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
self._update_x_timestamp(req)
req.ensure_x_timestamp()
# Include local handoff nodes if write-affinity is enabled.
node_count = len(nodes)

View File

@ -327,6 +327,7 @@ def _load_encryption(proxy_conf_file, swift_conf_file, **kwargs):
if not six.PY2:
root_secret = root_secret.decode('ascii')
conf.set('filter:keymaster', 'encryption_root_secret', root_secret)
conf.set('filter:versioned_writes', 'allow_object_versioning', 'true')
except NoSectionError as err:
msg = 'Error problem with proxy conf file %s: %s' % \
(proxy_conf_file, err)
@ -456,6 +457,8 @@ def _load_s3api(proxy_conf_file, swift_conf_file, **kwargs):
"s3api tempauth")
conf.set(section, 'pipeline', pipeline)
conf.set('filter:s3api', 's3_acl', 'true')
conf.set('filter:versioned_writes', 'allow_object_versioning', 'true')
except NoSectionError as err:
msg = 'Error problem with proxy conf file %s: %s' % \
(proxy_conf_file, err)

View File

@ -699,7 +699,7 @@ class Container(Base):
if cfg is None:
cfg = {}
format_type = parms.get('format', None)
if format_type not in [None, 'json', 'xml']:
if format_type not in [None, 'plain', 'json', 'xml']:
raise RequestError('Invalid format: %s' % format_type)
if format_type is None and 'format' in parms:
del parms['format']
@ -707,12 +707,13 @@ class Container(Base):
status = self.conn.make_request('GET', self.path, hdrs=hdrs,
parms=parms, cfg=cfg)
if status == 200:
if format_type == 'json':
if format_type == 'json' or 'versions' in parms:
files = json.loads(self.conn.response.read())
if six.PY2:
for file_item in files:
for key in ('name', 'subdir', 'content_type'):
for key in ('name', 'subdir', 'content_type',
'version_id'):
if key in file_item:
file_item[key] = file_item[key].encode('utf-8')
return files
@ -785,8 +786,10 @@ class Container(Base):
# versioning is enabled.
['versions', 'x-versions-location'],
['versions', 'x-history-location'],
['versions_enabled', 'x-versions-enabled'],
['tempurl_key', 'x-container-meta-temp-url-key'],
['tempurl_key2', 'x-container-meta-temp-url-key-2']]
['tempurl_key2', 'x-container-meta-temp-url-key-2'],
['container_quota_bytes', 'x-container-meta-quota-bytes']]
return self.header_fields(required_fields, optional_fields)
@ -853,7 +856,8 @@ class File(Base):
data.seek(0)
return checksum.hexdigest()
def copy(self, dest_cont, dest_file, hdrs=None, parms=None, cfg=None):
def copy(self, dest_cont, dest_file, hdrs=None, parms=None, cfg=None,
return_resp=False):
if hdrs is None:
hdrs = {}
if parms is None:
@ -875,6 +879,8 @@ class File(Base):
cfg=cfg, parms=parms) != 201:
raise ResponseError(self.conn.response, 'COPY',
self.conn.make_path(self.path))
if return_resp:
return self.conn.response
return True
def copy_account(self, dest_account, dest_cont, dest_file,
@ -942,6 +948,8 @@ class File(Base):
['last_modified', 'last-modified'],
['etag', 'etag']]
optional_fields = [['x_object_manifest', 'x-object-manifest'],
['x_manifest_etag', 'x-manifest-etag'],
['x_object_version_id', 'x-object-version-id'],
['x_symlink_target', 'x-symlink-target']]
header_fields = self.header_fields(fields,

File diff suppressed because it is too large Load Diff

View File

@ -31,12 +31,13 @@ from six.moves.http_client import HTTPConnection
from six.moves.urllib.parse import urlparse
from swiftclient import get_auth, head_account, client
from swift.common import internal_client
from swift.obj.diskfile import get_data_dir
from swift.common import internal_client, direct_client
from swift.common.direct_client import DirectClientException
from swift.common.ring import Ring
from swift.common.utils import readconf, renamer, rsync_module_interpolation
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.obj.diskfile import get_data_dir
from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC
@ -556,6 +557,41 @@ class ReplProbeTest(ProbeTest):
obj_required_devices = 4
policy_requirements = {'policy_type': REPL_POLICY}
def direct_container_op(self, func, account=None, container=None,
expect_failure=False):
account = account if account else self.account
container = container if container else self.container_to_shard
cpart, cnodes = self.container_ring.get_nodes(account, container)
unexpected_responses = []
results = {}
for cnode in cnodes:
try:
results[cnode['id']] = func(cnode, cpart, account, container)
except DirectClientException as err:
if not expect_failure:
unexpected_responses.append((cnode, err))
else:
if expect_failure:
unexpected_responses.append((cnode, 'success'))
if unexpected_responses:
self.fail('Unexpected responses: %s' % unexpected_responses)
return results
def direct_delete_container(self, account=None, container=None,
expect_failure=False):
self.direct_container_op(direct_client.direct_delete_container,
account, container, expect_failure)
def direct_head_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_head_container,
account, container, expect_failure)
def direct_get_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_get_container,
account, container, expect_failure)
class ECProbeTest(ProbeTest):

View File

@ -739,5 +739,155 @@ class TestContainerSyncAndSymlink(BaseTestContainerSync):
self.assertEqual(target_body, actual_target_body)
class TestContainerSyncAndVersioning(BaseTestContainerSync):
def setUp(self):
super(TestContainerSyncAndVersioning, self).setUp()
if 'object_versioning' not in self.info:
raise unittest.SkipTest("Object Versioning not enabled")
def _test_syncing(self, source_container, dest_container):
# test syncing and versioning
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, source_container, object_name,
'version1')
# cycle container-sync
Manager(['container-sync']).once()
# overwrite source
client.put_object(self.url, self.token, source_container, object_name,
'version2')
# cycle container-sync
Manager(['container-sync']).once()
resp_headers, listing = client.get_container(
self.url, self.token, dest_container,
query_string='versions')
self.assertEqual(2, len(listing))
def test_enable_versioning_while_syncing_container(self):
source_container, dest_container = self._setup_synced_containers()
version_hdr = {'X-Versions-Enabled': 'true'}
# Cannot enable versioning on source container
with self.assertRaises(ClientException) as cm:
client.post_container(self.url, self.token, source_container,
headers=version_hdr)
self.assertEqual(400, cm.exception.http_status) # sanity check
self.assertEqual(b'Cannot enable object versioning on a container '
b'configured as source of container syncing.',
cm.exception.http_response_content)
# but destination is ok!
client.post_container(self.url, self.token, dest_container,
headers=version_hdr)
headers = client.head_container(self.url, self.token,
dest_container)
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
self._test_syncing(source_container, dest_container)
def test_enable_syncing_while_versioned(self):
source_container, dest_container = self._setup_synced_containers()
container_name = 'versioned-%s' % uuid.uuid4()
version_hdr = {'X-Versions-Enabled': 'true'}
client.put_container(self.url, self.token, container_name,
headers=version_hdr)
# fails to configure as a container-sync source
sync_headers = {'X-Container-Sync-Key': 'secret'}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
dest_container)
sync_headers['X-Container-Sync-To'] = sync_to
with self.assertRaises(ClientException) as cm:
client.post_container(self.url, self.token, container_name,
headers=sync_headers)
self.assertEqual(400, cm.exception.http_status) # sanity check
# but works if it's just a container-sync destination
sync_headers = {'X-Container-Sync-Key': 'secret'}
client.post_container(self.url, self.token, container_name,
headers=sync_headers)
headers = client.head_container(self.url, self.token,
container_name)
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
# update source header to sync to versioned container
source_headers = {'X-Container-Sync-Key': 'secret'}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
container_name)
source_headers['X-Container-Sync-To'] = sync_to
client.post_container(self.url, self.token, source_container,
headers=source_headers)
self._test_syncing(source_container, container_name)
def test_skip_sync_when_misconfigured(self):
source_container, dest_container = self._setup_synced_containers()
container_name = 'versioned-%s' % uuid.uuid4()
version_hdr = {'X-Versions-Enabled': 'true'}
client.put_container(self.url, self.token, container_name,
headers=version_hdr)
# some sanity checks
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, container_name, object_name,
'version1')
client.put_object(self.url, self.token, container_name, object_name,
'version2')
resp_headers, listing = client.get_container(
self.url, self.token, container_name,
query_string='versions')
self.assertEqual(2, len(listing))
sync_headers = {}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
dest_container)
sync_headers['X-Container-Sync-To'] = sync_to
sync_headers['X-Container-Sync-Key'] = 'secret'
# use internal client to set container-sync headers
# since it doesn't have container_sync middleware in pipeline
# allowing us to bypass checks
int_client = self.make_internal_client()
# TODO: what a terrible hack, maybe we need to extend internal
# client to allow caller to become a swift_owner??
int_client.app.app.app.app.swift_owner_headers = []
int_client.set_container_metadata(self.account, container_name,
metadata=sync_headers)
headers = client.head_container(self.url, self.token,
container_name)
# This should never happen, but if it does because of eventual
# consistency or a messed up pipeline, container-sync should
# skip syncing container.
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
self.assertEqual(sync_to, headers.get('x-container-sync-to'))
# cycle container-sync
Manager(['container-sync']).once()
with self.assertRaises(ClientException) as cm:
client.get_object(
self.url, self.token, dest_container, object_name)
self.assertEqual(404, cm.exception.http_status) # sanity check
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,233 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import main
from swiftclient import client
from swift.common.request_helpers import get_reserved_name
from test.probe.common import ReplProbeTest
class TestObjectVersioning(ReplProbeTest):
def _assert_account_level(self, container_name, hdr_cont_count,
hdr_obj_count, hdr_bytes, cont_count,
cont_bytes):
headers, containers = client.get_account(self.url, self.token)
self.assertEqual(hdr_cont_count, headers['x-account-container-count'])
self.assertEqual(hdr_obj_count, headers['x-account-object-count'])
self.assertEqual(hdr_bytes, headers['x-account-bytes-used'])
self.assertEqual(len(containers), 1)
container = containers[0]
self.assertEqual(container_name, container['name'])
self.assertEqual(cont_count, container['count'])
self.assertEqual(cont_bytes, container['bytes'])
def test_account_listing(self):
versions_header_key = 'X-Versions-Enabled'
# Create container1
container_name = 'container1'
obj_name = 'object1'
client.put_container(self.url, self.token, container_name)
# Assert account level sees it
self._assert_account_level(
container_name,
hdr_cont_count='1',
hdr_obj_count='0',
hdr_bytes='0',
cont_count=0,
cont_bytes=0)
# Enable versioning
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
# write multiple versions of same obj
client.put_object(self.url, self.token, container_name, obj_name,
'version1')
client.put_object(self.url, self.token, container_name, obj_name,
'version2')
# Assert account level doesn't see object data yet, but it
# does see the update for the hidden container
self._assert_account_level(
container_name,
hdr_cont_count='2',
hdr_obj_count='0',
hdr_bytes='0',
cont_count=0,
cont_bytes=0)
# Get to final state
self.get_to_final_state()
# Assert account level now sees updated values
# N.B: Note difference in values between header and container listing
# header object count is counting both symlink + object versions
# listing count is counting only symlink (in primary container)
self._assert_account_level(
container_name,
hdr_cont_count='2',
hdr_obj_count='3',
hdr_bytes='16',
cont_count=1,
cont_bytes=16)
client.delete_object(self.url, self.token, container_name, obj_name)
_headers, current_versions = client.get_container(
self.url, self.token, container_name)
self.assertEqual(len(current_versions), 0)
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 3)
# directly delete primary container to leave an orphan hidden
# container
self.direct_delete_container(container=container_name)
# Get to final state
self.get_to_final_state()
# The container count decreases, as well as object count. But bytes
# do not. The discrepancy between header object count, container
# object count and bytes should indicate orphan hidden container is
# still around consuming storage
self._assert_account_level(
container_name,
hdr_cont_count='1',
hdr_obj_count='3',
hdr_bytes='16',
cont_count=0,
cont_bytes=16)
# Can't HEAD or list anything, though
with self.assertRaises(client.ClientException) as caught:
client.head_container(self.url, self.token, container_name)
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_container(self.url, self.token, container_name)
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_container(self.url, self.token, container_name,
query_string='versions')
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
# A little funny -- maybe this should 404 instead?
self.assertEqual(caught.exception.http_status, 400)
# Fix isn't too bad -- just make the container again!
client.put_container(self.url, self.token, container_name)
_headers, current_versions = client.get_container(
self.url, self.token, container_name)
self.assertEqual(len(current_versions), 0)
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 3)
# ... but to actually *access* the versions, you have to enable
# versioning again
with self.assertRaises(client.ClientException) as caught:
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
self.assertEqual(caught.exception.http_status, 400)
self.assertIn(b'version-aware operations require',
caught.exception.http_response_content)
client.post_container(self.url, self.token, container_name,
headers={'X-Versions-Enabled': 'true'})
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
def test_missing_versions_container(self):
versions_header_key = 'X-Versions-Enabled'
# Create container1
container_name = 'container1'
obj_name = 'object1'
client.put_container(self.url, self.token, container_name)
# Write some data
client.put_object(self.url, self.token, container_name, obj_name,
b'null version')
# Enable versioning
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
# But directly delete hidden container to leave an orphan primary
# container
self.direct_delete_container(container=get_reserved_name(
'versions', container_name))
# Could be worse; we can still list versions and GET data
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 1)
self.assertEqual(all_versions[0]['name'], obj_name)
self.assertEqual(all_versions[0]['version_id'], 'null')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name)
self.assertEqual(data, b'null version')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name,
query_string='version-id=null')
self.assertEqual(data, b'null version')
# But most any write is going to fail
with self.assertRaises(client.ClientException) as caught:
client.put_object(self.url, self.token, container_name, obj_name,
b'new version')
self.assertEqual(caught.exception.http_status, 500)
with self.assertRaises(client.ClientException) as caught:
client.delete_object(self.url, self.token, container_name,
obj_name)
self.assertEqual(caught.exception.http_status, 500)
# Version-aware delete can work, though!
client.delete_object(self.url, self.token, container_name, obj_name,
query_string='version-id=null')
# Re-enabling versioning should square us
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
client.put_object(self.url, self.token, container_name, obj_name,
b'new version')
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 1)
self.assertEqual(all_versions[0]['name'], obj_name)
self.assertNotEqual(all_versions[0]['version_id'], 'null')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name)
self.assertEqual(data, b'new version')
if __name__ == '__main__':
main()

View File

@ -25,7 +25,6 @@ from six.moves.urllib.parse import quote
from swift.common import direct_client, utils
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.direct_client import DirectClientException
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
@ -102,10 +101,12 @@ class BaseTestContainerSharding(ReplProbeTest):
# perform checks for skipping test before starting services
self._maybe_skip_test()
def _make_object_names(self, number):
return ['obj%s%04d' % (self.DELIM, x) for x in range(number)]
def _make_object_names(self, number, start=0):
return ['obj%s%04d' % (self.DELIM, x)
for x in range(start, start + number)]
def _setup_container_name(self):
# Container where we're PUTting objects
self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4())
def setUp(self):
@ -116,13 +117,18 @@ class BaseTestContainerSharding(ReplProbeTest):
_, self.admin_token = get_auth(
'http://127.0.0.1:8080/auth/v1.0', 'admin:admin', 'admin')
self._setup_container_name()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
None, 'container')
self.brain.put_container(policy_index=int(self.policy))
self.init_brain(self.container_name)
self.sharders = Manager(['container-sharder'])
self.internal_client = self.make_internal_client()
self.memcache = MemcacheRing(['127.0.0.1:11211'])
def init_brain(self, container_name):
self.container_to_shard = container_name
self.brain = BrainSplitter(
self.url, self.token, self.container_to_shard,
None, 'container')
self.brain.put_container(policy_index=int(self.policy))
def stop_container_servers(self, node_numbers=None):
if node_numbers:
ipports = []
@ -139,45 +145,35 @@ class BaseTestContainerSharding(ReplProbeTest):
wait_for_server_to_hangup(ipport)
def put_objects(self, obj_names, contents=None):
results = []
for obj in obj_names:
rdict = {}
client.put_object(self.url, token=self.token,
container=self.container_name, name=obj,
contents=contents)
contents=contents, response_dict=rdict)
results.append((obj, rdict['headers'].get('x-object-version-id')))
return results
def delete_objects(self, obj_names):
for obj in obj_names:
client.delete_object(
self.url, self.token, self.container_name, obj)
def delete_objects(self, obj_names_and_versions):
for obj in obj_names_and_versions:
if isinstance(obj, tuple):
obj, version = obj
client.delete_object(
self.url, self.token, self.container_name, obj,
query_string='version-id=%s' % version)
else:
client.delete_object(
self.url, self.token, self.container_name, obj)
def get_container_shard_ranges(self, account=None, container=None):
account = account if account else self.account
container = container if container else self.container_name
container = container if container else self.container_to_shard
path = self.internal_client.make_path(account, container)
resp = self.internal_client.make_request(
'GET', path + '?format=json', {'X-Backend-Record-Type': 'shard'},
[200])
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
def direct_container_op(self, func, account=None, container=None,
expect_failure=False):
account = account if account else self.account
container = container if container else self.container_name
cpart, cnodes = self.container_ring.get_nodes(account, container)
unexpected_responses = []
results = {}
for cnode in cnodes:
try:
results[cnode['id']] = func(cnode, cpart, account, container)
except DirectClientException as err:
if not expect_failure:
unexpected_responses.append((cnode, err))
else:
if expect_failure:
unexpected_responses.append((cnode, 'success'))
if unexpected_responses:
self.fail('Unexpected responses: %s' % unexpected_responses)
return results
def direct_get_container_shard_ranges(self, account=None, container=None,
expect_failure=False):
collector = ShardCollector()
@ -185,21 +181,6 @@ class BaseTestContainerSharding(ReplProbeTest):
collector, account, container, expect_failure)
return collector.ranges
def direct_delete_container(self, account=None, container=None,
expect_failure=False):
self.direct_container_op(direct_client.direct_delete_container,
account, container, expect_failure)
def direct_head_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_head_container,
account, container, expect_failure)
def direct_get_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_get_container,
account, container, expect_failure)
def get_storage_dir(self, part, node, account=None, container=None):
account = account or self.brain.account
container = container or self.container_name
@ -371,7 +352,7 @@ class BaseTestContainerSharding(ReplProbeTest):
def assert_container_state(self, node, expected_state, num_shard_ranges):
headers, shard_ranges = direct_client.direct_get_container(
node, self.brain.part, self.account, self.container_name,
node, self.brain.part, self.account, self.container_to_shard,
headers={'X-Backend-Record-Type': 'shard'})
self.assertEqual(num_shard_ranges, len(shard_ranges))
self.assertIn('X-Backend-Sharding-State', headers)
@ -560,11 +541,11 @@ class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
def _make_object_names(self, number):
def _make_object_names(self, number, start=0):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
for x in range(start, start + number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
@ -583,6 +564,215 @@ class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
self.container_name = self.container_name.decode('utf8')
class TestContainerShardingObjectVersioning(BaseTestContainerSharding):
def _maybe_skip_test(self):
super(TestContainerShardingObjectVersioning, self)._maybe_skip_test()
try:
vw_config = utils.readconf(self.configs['proxy-server'],
'filter:versioned_writes')
except ValueError:
raise SkipTest('No [filter:versioned_writes] section found in '
'proxy-server configs')
allow_object_versioning = config_true_value(
vw_config.get('allow_object_versioning', False))
if not allow_object_versioning:
raise SkipTest('allow_object_versioning must be true '
'in all versioned_writes configs')
def init_brain(self, container_name):
client.put_container(self.url, self.token, container_name, headers={
'X-Storage-Policy': self.policy.name,
'X-Versions-Enabled': 'true',
})
self.container_to_shard = '\x00versions\x00' + container_name
self.brain = BrainSplitter(
self.url, self.token, self.container_to_shard,
None, 'container')
def test_sharding_listing(self):
# verify parameterised listing of a container during sharding
all_obj_names = self._make_object_names(3) * self.max_shard_size
all_obj_names.extend(self._make_object_names(self.max_shard_size,
start=3))
obj_names = all_obj_names[::2]
obj_names_and_versions = self.put_objects(obj_names)
def sort_key(obj_and_ver):
obj, ver = obj_and_ver
return obj, ~Timestamp(ver)
obj_names_and_versions.sort(key=sort_key)
# choose some names approx in middle of each expected shard range
markers = [
obj_names_and_versions[i]
for i in range(self.max_shard_size // 4,
2 * self.max_shard_size,
self.max_shard_size // 2)]
def check_listing(objects, **params):
params['versions'] = ''
qs = '&'.join('%s=%s' % param for param in params.items())
headers, listing = client.get_container(
self.url, self.token, self.container_name, query_string=qs)
listing = [(x['name'].encode('utf-8') if six.PY2 else x['name'],
x['version_id'])
for x in listing]
if params.get('reverse'):
marker = (
params.get('marker', ShardRange.MAX),
~Timestamp(params['version_marker'])
if 'version_marker' in params else ~Timestamp('0'),
)
end_marker = (
params.get('end_marker', ShardRange.MIN),
Timestamp('0'),
)
expected = [o for o in objects
if end_marker < sort_key(o) < marker]
expected.reverse()
else:
marker = (
params.get('marker', ShardRange.MIN),
~Timestamp(params['version_marker'])
if 'version_marker' in params else Timestamp('0'),
)
end_marker = (
params.get('end_marker', ShardRange.MAX),
~Timestamp('0'),
)
expected = [o for o in objects
if marker < sort_key(o) < end_marker]
if 'limit' in params:
expected = expected[:params['limit']]
self.assertEqual(expected, listing)
def check_listing_fails(exp_status, **params):
params['versions'] = ''
qs = '&'.join('%s=%s' % param for param in params.items())
with self.assertRaises(ClientException) as cm:
client.get_container(
self.url, self.token, self.container_name, query_string=qs)
self.assertEqual(exp_status, cm.exception.http_status)
return cm.exception
def do_listing_checks(objects):
check_listing(objects)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1])
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 10)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 4)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 2)
check_listing(objects,
marker=markers[1][0], version_marker=markers[1][1])
check_listing(objects,
marker=markers[1][0], version_marker=markers[1][1],
limit=self.max_shard_size // 10)
check_listing(objects,
marker=markers[2][0], version_marker=markers[2][1],
limit=self.max_shard_size // 4)
check_listing(objects,
marker=markers[2][0], version_marker=markers[2][1],
limit=self.max_shard_size // 2)
check_listing(objects, reverse=True)
check_listing(objects, reverse=True,
marker=markers[1][0], version_marker=markers[1][1])
check_listing(objects, prefix='obj')
check_listing([], prefix='zzz')
# delimiter
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=-')
self.assertEqual([{'subdir': 'obj-'}], listing)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=j-')
self.assertEqual([{'subdir': 'obj-'}], listing)
limit = self.cluster_info['swift']['container_listing_limit']
exc = check_listing_fails(412, limit=limit + 1)
self.assertIn(b'Maximum limit', exc.http_response_content)
exc = check_listing_fails(400, delimiter='%ff')
self.assertIn(b'not valid UTF-8', exc.http_response_content)
# sanity checks
do_listing_checks(obj_names_and_versions)
# Shard the container. Use an internal_client so we get an implicit
# X-Backend-Allow-Reserved-Names header
self.internal_client.set_container_metadata(
self.account, self.container_to_shard, {
'X-Container-Sysmeta-Sharding': 'True',
})
# First run the 'leader' in charge of scanning, which finds all shard
# ranges and cleaves first two
self.sharders.once(number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# Then run sharder on other nodes which will also cleave first two
# shard ranges
for n in self.brain.node_numbers[1:]:
self.sharders.once(
number=n, additional_args='--partitions=%s' % self.brain.part)
# sanity check shard range states
for node in self.brain.nodes:
self.assert_container_state(node, 'sharding', 4)
shard_ranges = self.get_container_shard_ranges()
self.assertLengthEqual(shard_ranges, 4)
self.assert_shard_range_state(ShardRange.CLEAVED, shard_ranges[:2])
self.assert_shard_range_state(ShardRange.CREATED, shard_ranges[2:])
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta() # confirm no sysmeta deleted
self.assert_container_post_ok('sharding')
do_listing_checks(obj_names_and_versions)
# put some new objects spread through entire namespace
new_obj_names = all_obj_names[1::4]
new_obj_names_and_versions = self.put_objects(new_obj_names)
# new objects that fell into the first two cleaved shard ranges are
# reported in listing, new objects in the yet-to-be-cleaved shard
# ranges are not yet included in listing
exp_obj_names_and_versions = [
o for o in obj_names_and_versions + new_obj_names_and_versions
if '\x00' + o[0] <= shard_ranges[1].upper]
exp_obj_names_and_versions += [
o for o in obj_names_and_versions
if '\x00' + o[0] > shard_ranges[1].upper]
exp_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(exp_obj_names_and_versions)
# run all the sharders again and the last two shard ranges get cleaved
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
for node in self.brain.nodes:
self.assert_container_state(node, 'sharded', 4)
shard_ranges = self.get_container_shard_ranges()
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
exp_obj_names_and_versions = \
obj_names_and_versions + new_obj_names_and_versions
exp_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(exp_obj_names_and_versions)
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta()
self.assert_container_post_ok('sharded')
# delete original objects
self.delete_objects(obj_names_and_versions)
new_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(new_obj_names_and_versions)
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta()
self.assert_container_post_ok('sharded')
class TestContainerSharding(BaseTestContainerSharding):
def _test_sharded_listing(self, run_replicators=False):
obj_names = self._make_object_names(self.max_shard_size)

File diff suppressed because it is too large Load Diff

View File

@ -1310,12 +1310,9 @@ class TestSloDeleteManifest(SloTestCase):
set(self.app.calls),
set([('GET',
'/v1/AUTH_test/deltest/man?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/gone?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/man?multipart-manifest=delete')]))
('DELETE', '/v1/AUTH_test/deltest/gone'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/man')]))
self.assertEqual(resp_data['Response Status'], '200 OK')
self.assertEqual(resp_data['Number Deleted'], 2)
self.assertEqual(resp_data['Number Not Found'], 1)
@ -1328,10 +1325,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE', ('/v1/AUTH_test/deltest/' +
'man-all-there?multipart-manifest=delete'))]))
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', ('/v1/AUTH_test/deltest/man-all-there'))]))
def test_handle_multipart_delete_non_ascii(self):
if six.PY2:
@ -1356,10 +1352,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/%s/deltest/man-all-there?multipart-manifest=get' % acct),
('DELETE', '/v1/%s/deltest/b_2?multipart-manifest=delete' % acct),
('DELETE', '/v1/%s/deltest/c_3?multipart-manifest=delete' % acct),
('DELETE', ('/v1/%s/deltest/'
'man-all-there?multipart-manifest=delete' % acct))]))
('DELETE', '/v1/%s/deltest/b_2' % acct),
('DELETE', '/v1/%s/deltest/c_3' % acct),
('DELETE', ('/v1/%s/deltest/man-all-there' % acct))]))
def test_handle_multipart_delete_nested(self):
req = Request.blank(
@ -1369,24 +1364,16 @@ class TestSloDeleteManifest(SloTestCase):
self.call_slo(req)
self.assertEqual(
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=get'),
('GET', '/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/d_3?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=delete')]))
{('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=get'),
('GET', '/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/submanifest'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/manifest-with-submanifest')})
def test_handle_multipart_delete_nested_too_many_segments(self):
req = Request.blank(
@ -1410,18 +1397,15 @@ class TestSloDeleteManifest(SloTestCase):
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEqual(
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'a_1?multipart-manifest=delete'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'd_3?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=delete')]))
self.assertEqual(set(self.app.calls), {
('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/manifest-missing-submanifest'),
})
self.assertEqual(resp_data['Response Status'], '200 OK')
self.assertEqual(resp_data['Response Body'], '')
self.assertEqual(resp_data['Number Deleted'], 3)
@ -1510,12 +1494,10 @@ class TestSloDeleteManifest(SloTestCase):
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest-unauth/' +
'q_17?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest-unauth/q_17'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment?multipart-manifest=delete')]))
'manifest-with-unauth-segment')]))
self.assertEqual(resp_data['Response Status'], '400 Bad Request')
self.assertEqual(resp_data['Response Body'], '')
self.assertEqual(resp_data['Number Deleted'], 2)
@ -1537,10 +1519,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE', ('/v1/AUTH_test/deltest/' +
'man-all-there?multipart-manifest=delete'))]))
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/man-all-there')]))
class TestSloHeadOldManifest(SloTestCase):

View File

@ -60,7 +60,8 @@ class VersionedWritesBaseTestCase(unittest.TestCase):
def setUp(self):
self.app = helpers.FakeSwift()
conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(conf)(self.app)
self.vw = versioned_writes.legacy.VersionedWritesMiddleware(
self.app, conf)
def tearDown(self):
self.assertEqual(self.app.unclosed_requests, {})
@ -842,7 +843,7 @@ class VersionedWritesTestCase(VersionedWritesBaseTestCase):
self.assertTrue(path.startswith('/v1/a/ver_cont/001o/3'))
self.assertNotIn('x-if-delete-at', [h.lower() for h in req_headers])
@mock.patch('swift.common.middleware.versioned_writes.time.time',
@mock.patch('swift.common.middleware.versioned_writes.legacy.time.time',
return_value=1234)
def test_history_delete_marker_no_object_success(self, mock_time):
self.app.register(
@ -872,7 +873,7 @@ class VersionedWritesTestCase(VersionedWritesBaseTestCase):
self.assertEqual('application/x-deleted;swift_versions_deleted=1',
calls[1].headers.get('Content-Type'))
@mock.patch('swift.common.middleware.versioned_writes.time.time',
@mock.patch('swift.common.middleware.versioned_writes.legacy.time.time',
return_value=123456789.54321)
def test_history_delete_marker_over_object_success(self, mock_time):
self.app.register(

View File

@ -889,6 +889,32 @@ class TestTimestamp(unittest.TestCase):
check_is_earlier(b'-9999.999')
check_is_earlier(u'-1234_5678')
def test_inversion(self):
ts = utils.Timestamp(0)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual((~ts).internal, '9999999999.99999')
ts = utils.Timestamp(123456.789)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual(ts.internal, '0000123456.78900')
self.assertEqual((~ts).internal, '9999876543.21099')
timestamps = sorted(utils.Timestamp(random.random() * 1e10)
for _ in range(20))
self.assertEqual([x.internal for x in timestamps],
sorted(x.internal for x in timestamps))
self.assertEqual([(~x).internal for x in reversed(timestamps)],
sorted((~x).internal for x in timestamps))
ts = utils.Timestamp.now()
self.assertGreater(~ts, ts) # NB: will break around 2128
ts = utils.Timestamp.now(offset=1)
with self.assertRaises(ValueError) as caught:
~ts
self.assertEqual(caught.exception.args[0],
'Cannot invert timestamps with offsets')
class TestTimestampEncoding(unittest.TestCase):

View File

@ -1795,7 +1795,12 @@ class TestPipelineModification(unittest.TestCase):
# anywhere other than an attribute named "app", but it works for now.
pipe = []
for _ in range(1000):
pipe.append(app.__class__.__module__)
if app.__class__.__module__ == \
'swift.common.middleware.versioned_writes.legacy':
pipe.append('swift.common.middleware.versioned_writes')
else:
pipe.append(app.__class__.__module__)
if not hasattr(app, 'app'):
break
app = app.app

View File

@ -276,8 +276,7 @@ class TestObjectController(unittest.TestCase):
'X-Object-Meta-4': 'Four',
'Content-Encoding': 'gzip',
'Foo': 'fooheader',
'Bar': 'barheader',
'Content-Type': 'application/x-test'}
'Bar': 'barheader'}
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers=headers)
@ -286,6 +285,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'blue',
})
@ -321,19 +321,20 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': post_timestamp,
'X-Object-Sysmeta-Color': 'red',
'Content-Type': 'application/x-test'})
'Content-Type': 'application/x-test2'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test2',
'X-Object-Sysmeta-Color': 'blue',
})
req = Request.blank('/sda1/p/a/c/o')
resp = req.get_response(self.object_controller)
self.assertEqual(dict(resp.headers), {
'Content-Type': 'application/x-test',
'Content-Type': 'application/x-test2',
'Content-Length': '6',
'Etag': etag,
'X-Object-Sysmeta-Color': 'blue',
@ -403,6 +404,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'red',
})
@ -436,6 +438,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'red',
})