New Object Versioning mode

This patch adds a new object versioning mode. This new mode provides
a new set of APIs for users to interact with older versions of an
object. It also changes the naming scheme of older versions and adds
a version-id to each object.

This new mode is not backwards compatible or interchangeable with the
other two modes (i.e., stack and history), especially due to the changes
in the namimg scheme of older versions. This new mode will also serve
as a foundation for adding S3 versioning compatibility in the s3api
middleware.

Note that this does not (yet) support using a versioned container as
a source in container-sync. Container sync should be enhanced to sync
previous versions of objects.

Change-Id: Ic7d39ba425ca324eeb4543a2ce8d03428e2225a1
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Thiago da Silva <thiagodasilva@gmail.com>
This commit is contained in:
Clay Gerrard 2019-09-13 12:25:24 -05:00 committed by Tim Burke
parent 26ff2eb1cb
commit 2759d5d51c
33 changed files with 8307 additions and 163 deletions

View File

@ -68,6 +68,7 @@ use = egg:swift#gatekeeper
[filter:versioned_writes]
use = egg:swift#versioned_writes
allow_versioned_writes = true
allow_object_versioning = true
[filter:copy]
use = egg:swift#copy

View File

@ -140,6 +140,7 @@ SSC :ref:`copy`
SYM :ref:`symlink`
SH :ref:`sharding_doc`
S3 :ref:`s3api`
OV :ref:`object_versioning`
======================= =============================

View File

@ -278,12 +278,12 @@ Name Check (Forbidden Character Filter)
:members:
:show-inheritance:
.. _versioned_writes:
.. _object_versioning:
Object Versioning
=================
.. automodule:: swift.common.middleware.versioned_writes
.. automodule:: swift.common.middleware.versioned_writes.object_versioning
:members:
:show-inheritance:
@ -371,6 +371,15 @@ TempURL
:members:
:show-inheritance:
.. _versioned_writes:
Versioned Writes
=================
.. automodule:: swift.common.middleware.versioned_writes.legacy
:members:
:show-inheritance:
XProfile
==============

View File

@ -1072,6 +1072,8 @@ use = egg:swift#versioned_writes
# in the container configuration file, which will be eventually
# deprecated. See documentation for more details.
# allow_versioned_writes = false
# Enables Swift object-versioning API
# allow_object_versioning = false
# Note: Put after auth and before dlo and slo middlewares.
# If you don't put it in the pipeline, it will be inserted for you.

View File

@ -457,7 +457,8 @@ class Bulk(object):
failed_files.append([wsgi_quote(str_to_wsgi(obj_name)),
HTTPPreconditionFailed().status])
continue
yield (obj_name, delete_path)
yield (obj_name, delete_path,
obj_to_delete.get('version_id'))
def objs_then_containers(objs_to_delete):
# process all objects first
@ -467,13 +468,17 @@ class Bulk(object):
yield delete_filter(lambda name: '/' not in name.strip('/'),
objs_to_delete)
def do_delete(obj_name, delete_path):
def do_delete(obj_name, delete_path, version_id):
delete_obj_req = make_subrequest(
req.environ, method='DELETE',
path=wsgi_quote(str_to_wsgi(delete_path)),
headers={'X-Auth-Token': req.headers.get('X-Auth-Token')},
body='', agent='%(orig)s ' + user_agent,
swift_source=swift_source)
if version_id is None:
delete_obj_req.params = {}
else:
delete_obj_req.params = {'version-id': version_id}
return (delete_obj_req.get_response(self.app), obj_name, 0)
with StreamingPile(self.delete_concurrency) as pile:

View File

@ -87,6 +87,14 @@ class ContainerSync(object):
info = get_container_info(
req.environ, self.app, swift_source='CS')
sync_to = req.headers.get('x-container-sync-to')
if req.method in ('PUT', 'POST') and cont and not obj:
versions_cont = info.get(
'sysmeta', {}).get('versions-container')
if sync_to and versions_cont:
raise HTTPBadRequest(
'Cannot configure container sync on a container '
'with object versioning configured.',
request=req)
if not self.allow_full_urls:
if sync_to and not sync_to.startswith('//'):

View File

@ -319,6 +319,9 @@ class ServerSideCopyMiddleware(object):
if 'last-modified' in source_resp.headers:
resp_headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
if 'X-Object-Version-Id' in source_resp.headers:
resp_headers['X-Copied-From-Version-Id'] = \
source_resp.headers['X-Object-Version-Id']
# Existing sys and user meta of source object is added to response
# headers in addition to the new ones.
_copy_headers(sink_req.headers, resp_headers)
@ -374,6 +377,8 @@ class ServerSideCopyMiddleware(object):
sink_req.headers.update(req.headers)
params = sink_req.params
params_updated = False
if params.get('multipart-manifest') == 'get':
if 'X-Static-Large-Object' in source_resp.headers:
params['multipart-manifest'] = 'put'
@ -381,6 +386,13 @@ class ServerSideCopyMiddleware(object):
del params['multipart-manifest']
sink_req.headers['X-Object-Manifest'] = \
source_resp.headers['X-Object-Manifest']
params_updated = True
if 'version-id' in params:
del params['version-id']
params_updated = True
if params_updated:
sink_req.params = params
# Set swift.source, data source, content length and etag

View File

@ -170,7 +170,12 @@ class ListingFilter(object):
params['format'] = 'json'
req.params = params
# Give other middlewares a chance to be in charge
env.setdefault('swift.format_listing', True)
status, headers, resp_iter = req.call_application(self.app)
if not env.get('swift.format_listing'):
start_response(status, headers)
return resp_iter
header_to_index = {}
resp_content_type = resp_length = None

View File

@ -1417,6 +1417,9 @@ class StaticLargeObject(object):
segments = [{
'sub_slo': True,
'name': obj_path}]
if 'version-id' in req.params:
segments[0]['version_id'] = req.params['version-id']
while segments:
# We chose not to set the limit at max_manifest_segments
# in the case this value was decreased by operators.
@ -1469,6 +1472,9 @@ class StaticLargeObject(object):
new_env['REQUEST_METHOD'] = 'GET'
del(new_env['wsgi.input'])
new_env['QUERY_STRING'] = 'multipart-manifest=get'
if 'version-id' in req.params:
new_env['QUERY_STRING'] += \
'&version-id=' + req.params['version-id']
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT')

View File

@ -202,7 +202,8 @@ import os
from cgi import parse_header
from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value
from swift.common.constraints import check_account_format
from swift.common.wsgi import WSGIContext, make_subrequest
from swift.common.request_helpers import get_sys_meta_prefix, \
@ -228,6 +229,8 @@ TGT_ETAG_SYSMETA_SYMLINK_HDR = \
get_sys_meta_prefix('object') + 'symlink-target-etag'
TGT_BYTES_SYSMETA_SYMLINK_HDR = \
get_sys_meta_prefix('object') + 'symlink-target-bytes'
SYMLOOP_EXTEND = get_sys_meta_prefix('object') + 'symloop-extend'
ALLOW_RESERVED_NAMES = get_sys_meta_prefix('object') + 'allow-reserved-names'
def _validate_and_prep_request_headers(req):
@ -477,7 +480,13 @@ class SymlinkObjectContext(WSGIContext):
raise LinkIterError()
# format: /<account name>/<container name>/<object name>
new_req = build_traversal_req(symlink_target)
self._loop_count += 1
if not config_true_value(
self._response_header_value(SYMLOOP_EXTEND)):
self._loop_count += 1
if config_true_value(
self._response_header_value(ALLOW_RESERVED_NAMES)):
new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
return self._recursive_get_head(new_req, target_etag=resp_etag)
else:
final_etag = self._response_header_value('etag')
@ -516,6 +525,8 @@ class SymlinkObjectContext(WSGIContext):
new_req = make_subrequest(
req.environ, path=wsgi_quote(symlink_target_path), method='HEAD',
swift_source='SYM')
if req.allow_reserved_names:
new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
self._last_target_path = symlink_target_path
resp = self._recursive_get_head(new_req, target_etag=etag,
follow_softlinks=False)
@ -659,6 +670,9 @@ class SymlinkObjectContext(WSGIContext):
req.environ['swift.leave_relative_location'] = True
errmsg = 'The requested POST was applied to a symlink. POST ' +\
'directly to the target to apply requested metadata.'
for key, value in self._response_headers:
if key.lower().startswith('x-object-sysmeta-'):
headers[key] = value
raise HTTPTemporaryRedirect(
body=errmsg, headers=headers)
else:

View File

@ -0,0 +1,51 @@
# Copyright (c) 2019 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Implements middleware for object versioning which comprises an instance of a
:class:`~swift.common.middleware.versioned_writes.legacy.
VersionedWritesMiddleware` combined with an instance of an
:class:`~swift.common.middleware.versioned_writes.object_versioning.
ObjectVersioningMiddleware`.
"""
from swift.common.middleware.versioned_writes. \
legacy import CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC, \
VersionedWritesMiddleware
from swift.common.middleware.versioned_writes. \
object_versioning import ObjectVersioningMiddleware
from swift.common.utils import config_true_value, register_swift_info, \
get_swift_info
def filter_factory(global_conf, **local_conf):
"""Provides a factory function for loading versioning middleware."""
conf = global_conf.copy()
conf.update(local_conf)
if config_true_value(conf.get('allow_versioned_writes')):
register_swift_info('versioned_writes', allowed_flags=(
CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC))
allow_object_versioning = config_true_value(conf.get(
'allow_object_versioning'))
if allow_object_versioning:
register_swift_info('object_versioning')
def versioning_filter(app):
if allow_object_versioning:
if 'symlink' not in get_swift_info():
raise ValueError('object versioning requires symlinks')
app = ObjectVersioningMiddleware(app, conf)
return VersionedWritesMiddleware(app, conf)
return versioning_filter

View File

@ -14,6 +14,11 @@
# limitations under the License.
"""
.. note::
This middleware supports two legacy modes of object versioning that is
now replaced by a new mode. It is recommended to use the new
:ref:`Object Versioning <object_versioning>` mode for new containers.
Object versioning in swift is implemented by setting a flag on the container
to tell swift to version all objects in the container. The value of the flag is
the URL-encoded container name where the versions are stored (commonly referred
@ -225,7 +230,7 @@ import json
import time
from swift.common.utils import get_logger, Timestamp, \
register_swift_info, config_true_value, close_if_possible, FileLikeIter
config_true_value, close_if_possible, FileLikeIter
from swift.common.request_helpers import get_sys_meta_prefix, \
copy_header_subset
from swift.common.wsgi import WSGIContext, make_pre_authed_request
@ -457,6 +462,7 @@ class VersionedWritesContext(WSGIContext):
put_path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, vers_obj_name)
req.environ['QUERY_STRING'] = ''
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
self._check_response_error(req, put_resp)
@ -601,6 +607,7 @@ class VersionedWritesContext(WSGIContext):
break
obj_to_restore = bytes_to_wsgi(
version_to_restore['name'].encode('utf-8'))
req.environ['QUERY_STRING'] = ''
restored_path = self._restore_data(
req, versions_cont, api_version, account_name,
container_name, object_name, obj_to_restore)
@ -632,6 +639,7 @@ class VersionedWritesContext(WSGIContext):
# current object and delete the previous version
prev_obj_name = bytes_to_wsgi(
previous_version['name'].encode('utf-8'))
req.environ['QUERY_STRING'] = ''
restored_path = self._restore_data(
req, versions_cont, api_version, account_name,
container_name, object_name, prev_obj_name)
@ -856,16 +864,3 @@ class VersionedWritesMiddleware(object):
return error_response(env, start_response)
else:
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
if config_true_value(conf.get('allow_versioned_writes')):
register_swift_info('versioned_writes', allowed_flags=(
CLIENT_VERSIONS_LOC, CLIENT_HISTORY_LOC))
def obj_versions_filter(app):
return VersionedWritesMiddleware(app, conf)
return obj_versions_filter

File diff suppressed because it is too large Load Diff

View File

@ -1012,6 +1012,33 @@ class Request(object):
self.query_string = urllib.parse.urlencode(param_pairs,
encoding='latin-1')
def ensure_x_timestamp(self):
"""
Similar to :attr:`timestamp`, but the ``X-Timestamp`` header will be
set if not present.
:raises HTTPBadRequest: if X-Timestamp is already set but not a valid
:class:`~swift.common.utils.Timestamp`
:returns: the request's X-Timestamp header,
as a :class:`~swift.common.utils.Timestamp`
"""
# The container sync feature includes an x-timestamp header with
# requests. If present this is checked and preserved, otherwise a fresh
# timestamp is added.
if 'HTTP_X_TIMESTAMP' in self.environ:
try:
self._timestamp = Timestamp(self.environ['HTTP_X_TIMESTAMP'])
except ValueError:
raise HTTPBadRequest(
request=self, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % self.environ['HTTP_X_TIMESTAMP'])
else:
self._timestamp = Timestamp.now()
# Always normalize it to the internal form
self.environ['HTTP_X_TIMESTAMP'] = self._timestamp.internal
return self._timestamp
@property
def timestamp(self):
"""

View File

@ -1376,6 +1376,11 @@ class Timestamp(object):
def __hash__(self):
return hash(self.internal)
def __invert__(self):
if self.offset:
raise ValueError('Cannot invert timestamps with offsets')
return Timestamp((999999999999999 - self.raw) * PRECISION)
def encode_timestamps(t1, t2=None, t3=None, explicit=False):
"""

View File

@ -1236,7 +1236,7 @@ class ContainerBroker(DatabaseBroker):
limit, marker, end_marker, prefix=None, delimiter=None, path=None,
reverse=False, include_deleted=include_deleted,
transform_func=self._record_to_dict, since_row=since_row,
all_policies=True
all_policies=True, allow_reserved=True
)
def _transform_record(self, record):

View File

@ -44,6 +44,8 @@ from swift.common.utils import (
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.wsgi import ConfigString
from swift.common.middleware.versioned_writes.object_versioning import (
SYSMETA_VERSIONS_CONT, SYSMETA_VERSIONS_SYMLINK)
# The default internal client config body is to support upgrades without
@ -358,6 +360,13 @@ class ContainerSync(Daemon):
break
else:
return
if broker.metadata.get(SYSMETA_VERSIONS_CONT):
self.container_skips += 1
self.logger.increment('skips')
self.logger.warning('Skipping container %s/%s with '
'object versioning configured' % (
info['account'], info['container']))
return
if not broker.is_deleted():
sync_to = None
user_key = None
@ -594,6 +603,16 @@ class ContainerSync(Daemon):
headers = {}
body = None
exc = err
# skip object_versioning links; this is in case the container
# metadata is out of date
if headers.get(SYSMETA_VERSIONS_SYMLINK):
self.logger.info(
'Skipping versioning symlink %s/%s/%s ' % (
info['account'], info['container'],
row['name']))
return True
timestamp = Timestamp(headers.get('x-timestamp', 0))
if timestamp < ts_meta:
if exc:

View File

@ -764,8 +764,9 @@ class ObjectController(BaseStorageServer):
'PUT', account, container, obj, request, update_headers,
device, policy)
# Add sysmeta to response
resp_headers = {}
# Add current content-type and sysmeta to response
resp_headers = {
'X-Backend-Content-Type': content_type_headers['Content-Type']}
for key, value in orig_metadata.items():
if is_sys_meta('object', key):
resp_headers[key] = value
@ -1276,7 +1277,9 @@ class ObjectController(BaseStorageServer):
device, policy)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
headers={'X-Backend-Timestamp': response_timestamp.internal,
'X-Backend-Content-Type': orig_metadata.get(
'Content-Type', '')})
@public
@replication

View File

@ -57,7 +57,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED, HTTP_CONTINUE, HTTP_GONE
from swift.common.swob import Request, Response, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable, HTTPServiceUnavailable, \
status_map, wsgi_to_str, str_to_wsgi, wsgi_quote, normalize_etag
status_map, wsgi_to_str, str_to_wsgi, wsgi_quote, wsgi_unquote, \
normalize_etag
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
http_response_to_document_iters, is_object_transient_sysmeta, \
@ -396,6 +397,17 @@ def get_container_info(env, app, swift_source=None):
if info.get('sharding_state') is None:
info['sharding_state'] = 'unsharded'
versions_cont = info.get('sysmeta', {}).get('versions-container', '')
if versions_cont:
versions_cont = wsgi_unquote(str_to_wsgi(
versions_cont)).split('/')[0]
versions_req = _prepare_pre_auth_info_request(
env, ("/%s/%s/%s" % (version, wsgi_account, versions_cont)),
(swift_source or 'GET_CONTAINER_INFO'))
versions_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
versions_info = get_container_info(versions_req.environ, app)
info['bytes'] = info['bytes'] + versions_info['bytes']
return info

View File

@ -303,7 +303,7 @@ class BaseObjectController(Controller):
if error_response:
return error_response
req.headers['X-Timestamp'] = Timestamp.now().internal
req.ensure_x_timestamp()
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
@ -547,23 +547,6 @@ class BaseObjectController(Controller):
if detect_content_type:
req.headers.pop('x-detect-content-type')
def _update_x_timestamp(self, req):
# The container sync feature includes an x-timestamp header with
# requests. If present this is checked and preserved, otherwise a fresh
# timestamp is added.
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
except ValueError:
raise HTTPBadRequest(
request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
req.headers['X-Timestamp'] = req_timestamp.internal
else:
req.headers['X-Timestamp'] = Timestamp.now().internal
return None
def _check_failure_put_connections(self, putters, req, min_conns):
"""
Identify any failed connections and check minimum connection count.
@ -785,7 +768,7 @@ class BaseObjectController(Controller):
# update content type in case it is missing
self._update_content_type(req)
self._update_x_timestamp(req)
req.ensure_x_timestamp()
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
@ -845,7 +828,7 @@ class BaseObjectController(Controller):
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
self._update_x_timestamp(req)
req.ensure_x_timestamp()
# Include local handoff nodes if write-affinity is enabled.
node_count = len(nodes)

View File

@ -327,6 +327,7 @@ def _load_encryption(proxy_conf_file, swift_conf_file, **kwargs):
if not six.PY2:
root_secret = root_secret.decode('ascii')
conf.set('filter:keymaster', 'encryption_root_secret', root_secret)
conf.set('filter:versioned_writes', 'allow_object_versioning', 'true')
except NoSectionError as err:
msg = 'Error problem with proxy conf file %s: %s' % \
(proxy_conf_file, err)
@ -456,6 +457,8 @@ def _load_s3api(proxy_conf_file, swift_conf_file, **kwargs):
"s3api tempauth")
conf.set(section, 'pipeline', pipeline)
conf.set('filter:s3api', 's3_acl', 'true')
conf.set('filter:versioned_writes', 'allow_object_versioning', 'true')
except NoSectionError as err:
msg = 'Error problem with proxy conf file %s: %s' % \
(proxy_conf_file, err)

View File

@ -699,7 +699,7 @@ class Container(Base):
if cfg is None:
cfg = {}
format_type = parms.get('format', None)
if format_type not in [None, 'json', 'xml']:
if format_type not in [None, 'plain', 'json', 'xml']:
raise RequestError('Invalid format: %s' % format_type)
if format_type is None and 'format' in parms:
del parms['format']
@ -707,12 +707,13 @@ class Container(Base):
status = self.conn.make_request('GET', self.path, hdrs=hdrs,
parms=parms, cfg=cfg)
if status == 200:
if format_type == 'json':
if format_type == 'json' or 'versions' in parms:
files = json.loads(self.conn.response.read())
if six.PY2:
for file_item in files:
for key in ('name', 'subdir', 'content_type'):
for key in ('name', 'subdir', 'content_type',
'version_id'):
if key in file_item:
file_item[key] = file_item[key].encode('utf-8')
return files
@ -785,8 +786,10 @@ class Container(Base):
# versioning is enabled.
['versions', 'x-versions-location'],
['versions', 'x-history-location'],
['versions_enabled', 'x-versions-enabled'],
['tempurl_key', 'x-container-meta-temp-url-key'],
['tempurl_key2', 'x-container-meta-temp-url-key-2']]
['tempurl_key2', 'x-container-meta-temp-url-key-2'],
['container_quota_bytes', 'x-container-meta-quota-bytes']]
return self.header_fields(required_fields, optional_fields)
@ -853,7 +856,8 @@ class File(Base):
data.seek(0)
return checksum.hexdigest()
def copy(self, dest_cont, dest_file, hdrs=None, parms=None, cfg=None):
def copy(self, dest_cont, dest_file, hdrs=None, parms=None, cfg=None,
return_resp=False):
if hdrs is None:
hdrs = {}
if parms is None:
@ -875,6 +879,8 @@ class File(Base):
cfg=cfg, parms=parms) != 201:
raise ResponseError(self.conn.response, 'COPY',
self.conn.make_path(self.path))
if return_resp:
return self.conn.response
return True
def copy_account(self, dest_account, dest_cont, dest_file,
@ -942,6 +948,8 @@ class File(Base):
['last_modified', 'last-modified'],
['etag', 'etag']]
optional_fields = [['x_object_manifest', 'x-object-manifest'],
['x_manifest_etag', 'x-manifest-etag'],
['x_object_version_id', 'x-object-version-id'],
['x_symlink_target', 'x-symlink-target']]
header_fields = self.header_fields(fields,

File diff suppressed because it is too large Load Diff

View File

@ -31,12 +31,13 @@ from six.moves.http_client import HTTPConnection
from six.moves.urllib.parse import urlparse
from swiftclient import get_auth, head_account, client
from swift.common import internal_client
from swift.obj.diskfile import get_data_dir
from swift.common import internal_client, direct_client
from swift.common.direct_client import DirectClientException
from swift.common.ring import Ring
from swift.common.utils import readconf, renamer, rsync_module_interpolation
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.obj.diskfile import get_data_dir
from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC
@ -556,6 +557,41 @@ class ReplProbeTest(ProbeTest):
obj_required_devices = 4
policy_requirements = {'policy_type': REPL_POLICY}
def direct_container_op(self, func, account=None, container=None,
expect_failure=False):
account = account if account else self.account
container = container if container else self.container_to_shard
cpart, cnodes = self.container_ring.get_nodes(account, container)
unexpected_responses = []
results = {}
for cnode in cnodes:
try:
results[cnode['id']] = func(cnode, cpart, account, container)
except DirectClientException as err:
if not expect_failure:
unexpected_responses.append((cnode, err))
else:
if expect_failure:
unexpected_responses.append((cnode, 'success'))
if unexpected_responses:
self.fail('Unexpected responses: %s' % unexpected_responses)
return results
def direct_delete_container(self, account=None, container=None,
expect_failure=False):
self.direct_container_op(direct_client.direct_delete_container,
account, container, expect_failure)
def direct_head_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_head_container,
account, container, expect_failure)
def direct_get_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_get_container,
account, container, expect_failure)
class ECProbeTest(ProbeTest):

View File

@ -739,5 +739,155 @@ class TestContainerSyncAndSymlink(BaseTestContainerSync):
self.assertEqual(target_body, actual_target_body)
class TestContainerSyncAndVersioning(BaseTestContainerSync):
def setUp(self):
super(TestContainerSyncAndVersioning, self).setUp()
if 'object_versioning' not in self.info:
raise unittest.SkipTest("Object Versioning not enabled")
def _test_syncing(self, source_container, dest_container):
# test syncing and versioning
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, source_container, object_name,
'version1')
# cycle container-sync
Manager(['container-sync']).once()
# overwrite source
client.put_object(self.url, self.token, source_container, object_name,
'version2')
# cycle container-sync
Manager(['container-sync']).once()
resp_headers, listing = client.get_container(
self.url, self.token, dest_container,
query_string='versions')
self.assertEqual(2, len(listing))
def test_enable_versioning_while_syncing_container(self):
source_container, dest_container = self._setup_synced_containers()
version_hdr = {'X-Versions-Enabled': 'true'}
# Cannot enable versioning on source container
with self.assertRaises(ClientException) as cm:
client.post_container(self.url, self.token, source_container,
headers=version_hdr)
self.assertEqual(400, cm.exception.http_status) # sanity check
self.assertEqual(b'Cannot enable object versioning on a container '
b'configured as source of container syncing.',
cm.exception.http_response_content)
# but destination is ok!
client.post_container(self.url, self.token, dest_container,
headers=version_hdr)
headers = client.head_container(self.url, self.token,
dest_container)
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
self._test_syncing(source_container, dest_container)
def test_enable_syncing_while_versioned(self):
source_container, dest_container = self._setup_synced_containers()
container_name = 'versioned-%s' % uuid.uuid4()
version_hdr = {'X-Versions-Enabled': 'true'}
client.put_container(self.url, self.token, container_name,
headers=version_hdr)
# fails to configure as a container-sync source
sync_headers = {'X-Container-Sync-Key': 'secret'}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
dest_container)
sync_headers['X-Container-Sync-To'] = sync_to
with self.assertRaises(ClientException) as cm:
client.post_container(self.url, self.token, container_name,
headers=sync_headers)
self.assertEqual(400, cm.exception.http_status) # sanity check
# but works if it's just a container-sync destination
sync_headers = {'X-Container-Sync-Key': 'secret'}
client.post_container(self.url, self.token, container_name,
headers=sync_headers)
headers = client.head_container(self.url, self.token,
container_name)
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
# update source header to sync to versioned container
source_headers = {'X-Container-Sync-Key': 'secret'}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
container_name)
source_headers['X-Container-Sync-To'] = sync_to
client.post_container(self.url, self.token, source_container,
headers=source_headers)
self._test_syncing(source_container, container_name)
def test_skip_sync_when_misconfigured(self):
source_container, dest_container = self._setup_synced_containers()
container_name = 'versioned-%s' % uuid.uuid4()
version_hdr = {'X-Versions-Enabled': 'true'}
client.put_container(self.url, self.token, container_name,
headers=version_hdr)
# some sanity checks
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, container_name, object_name,
'version1')
client.put_object(self.url, self.token, container_name, object_name,
'version2')
resp_headers, listing = client.get_container(
self.url, self.token, container_name,
query_string='versions')
self.assertEqual(2, len(listing))
sync_headers = {}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
dest_container)
sync_headers['X-Container-Sync-To'] = sync_to
sync_headers['X-Container-Sync-Key'] = 'secret'
# use internal client to set container-sync headers
# since it doesn't have container_sync middleware in pipeline
# allowing us to bypass checks
int_client = self.make_internal_client()
# TODO: what a terrible hack, maybe we need to extend internal
# client to allow caller to become a swift_owner??
int_client.app.app.app.app.swift_owner_headers = []
int_client.set_container_metadata(self.account, container_name,
metadata=sync_headers)
headers = client.head_container(self.url, self.token,
container_name)
# This should never happen, but if it does because of eventual
# consistency or a messed up pipeline, container-sync should
# skip syncing container.
self.assertEqual('True', headers.get('x-versions-enabled'))
self.assertEqual('secret', headers.get('x-container-sync-key'))
self.assertEqual(sync_to, headers.get('x-container-sync-to'))
# cycle container-sync
Manager(['container-sync']).once()
with self.assertRaises(ClientException) as cm:
client.get_object(
self.url, self.token, dest_container, object_name)
self.assertEqual(404, cm.exception.http_status) # sanity check
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,233 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import main
from swiftclient import client
from swift.common.request_helpers import get_reserved_name
from test.probe.common import ReplProbeTest
class TestObjectVersioning(ReplProbeTest):
def _assert_account_level(self, container_name, hdr_cont_count,
hdr_obj_count, hdr_bytes, cont_count,
cont_bytes):
headers, containers = client.get_account(self.url, self.token)
self.assertEqual(hdr_cont_count, headers['x-account-container-count'])
self.assertEqual(hdr_obj_count, headers['x-account-object-count'])
self.assertEqual(hdr_bytes, headers['x-account-bytes-used'])
self.assertEqual(len(containers), 1)
container = containers[0]
self.assertEqual(container_name, container['name'])
self.assertEqual(cont_count, container['count'])
self.assertEqual(cont_bytes, container['bytes'])
def test_account_listing(self):
versions_header_key = 'X-Versions-Enabled'
# Create container1
container_name = 'container1'
obj_name = 'object1'
client.put_container(self.url, self.token, container_name)
# Assert account level sees it
self._assert_account_level(
container_name,
hdr_cont_count='1',
hdr_obj_count='0',
hdr_bytes='0',
cont_count=0,
cont_bytes=0)
# Enable versioning
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
# write multiple versions of same obj
client.put_object(self.url, self.token, container_name, obj_name,
'version1')
client.put_object(self.url, self.token, container_name, obj_name,
'version2')
# Assert account level doesn't see object data yet, but it
# does see the update for the hidden container
self._assert_account_level(
container_name,
hdr_cont_count='2',
hdr_obj_count='0',
hdr_bytes='0',
cont_count=0,
cont_bytes=0)
# Get to final state
self.get_to_final_state()
# Assert account level now sees updated values
# N.B: Note difference in values between header and container listing
# header object count is counting both symlink + object versions
# listing count is counting only symlink (in primary container)
self._assert_account_level(
container_name,
hdr_cont_count='2',
hdr_obj_count='3',
hdr_bytes='16',
cont_count=1,
cont_bytes=16)
client.delete_object(self.url, self.token, container_name, obj_name)
_headers, current_versions = client.get_container(
self.url, self.token, container_name)
self.assertEqual(len(current_versions), 0)
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 3)
# directly delete primary container to leave an orphan hidden
# container
self.direct_delete_container(container=container_name)
# Get to final state
self.get_to_final_state()
# The container count decreases, as well as object count. But bytes
# do not. The discrepancy between header object count, container
# object count and bytes should indicate orphan hidden container is
# still around consuming storage
self._assert_account_level(
container_name,
hdr_cont_count='1',
hdr_obj_count='3',
hdr_bytes='16',
cont_count=0,
cont_bytes=16)
# Can't HEAD or list anything, though
with self.assertRaises(client.ClientException) as caught:
client.head_container(self.url, self.token, container_name)
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_container(self.url, self.token, container_name)
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_container(self.url, self.token, container_name,
query_string='versions')
self.assertEqual(caught.exception.http_status, 404)
with self.assertRaises(client.ClientException) as caught:
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
# A little funny -- maybe this should 404 instead?
self.assertEqual(caught.exception.http_status, 400)
# Fix isn't too bad -- just make the container again!
client.put_container(self.url, self.token, container_name)
_headers, current_versions = client.get_container(
self.url, self.token, container_name)
self.assertEqual(len(current_versions), 0)
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 3)
# ... but to actually *access* the versions, you have to enable
# versioning again
with self.assertRaises(client.ClientException) as caught:
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
self.assertEqual(caught.exception.http_status, 400)
self.assertIn(b'version-aware operations require',
caught.exception.http_response_content)
client.post_container(self.url, self.token, container_name,
headers={'X-Versions-Enabled': 'true'})
client.get_object(
self.url, self.token, container_name, all_versions[1]['name'],
query_string='version-id=%s' % all_versions[1]['version_id'])
def test_missing_versions_container(self):
versions_header_key = 'X-Versions-Enabled'
# Create container1
container_name = 'container1'
obj_name = 'object1'
client.put_container(self.url, self.token, container_name)
# Write some data
client.put_object(self.url, self.token, container_name, obj_name,
b'null version')
# Enable versioning
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
# But directly delete hidden container to leave an orphan primary
# container
self.direct_delete_container(container=get_reserved_name(
'versions', container_name))
# Could be worse; we can still list versions and GET data
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 1)
self.assertEqual(all_versions[0]['name'], obj_name)
self.assertEqual(all_versions[0]['version_id'], 'null')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name)
self.assertEqual(data, b'null version')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name,
query_string='version-id=null')
self.assertEqual(data, b'null version')
# But most any write is going to fail
with self.assertRaises(client.ClientException) as caught:
client.put_object(self.url, self.token, container_name, obj_name,
b'new version')
self.assertEqual(caught.exception.http_status, 500)
with self.assertRaises(client.ClientException) as caught:
client.delete_object(self.url, self.token, container_name,
obj_name)
self.assertEqual(caught.exception.http_status, 500)
# Version-aware delete can work, though!
client.delete_object(self.url, self.token, container_name, obj_name,
query_string='version-id=null')
# Re-enabling versioning should square us
hdrs = {versions_header_key: 'True'}
client.post_container(self.url, self.token, container_name, hdrs)
client.put_object(self.url, self.token, container_name, obj_name,
b'new version')
_headers, all_versions = client.get_container(
self.url, self.token, container_name, query_string='versions')
self.assertEqual(len(all_versions), 1)
self.assertEqual(all_versions[0]['name'], obj_name)
self.assertNotEqual(all_versions[0]['version_id'], 'null')
_headers, data = client.get_object(
self.url, self.token, container_name, obj_name)
self.assertEqual(data, b'new version')
if __name__ == '__main__':
main()

View File

@ -25,7 +25,6 @@ from six.moves.urllib.parse import quote
from swift.common import direct_client, utils
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.direct_client import DirectClientException
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
@ -102,10 +101,12 @@ class BaseTestContainerSharding(ReplProbeTest):
# perform checks for skipping test before starting services
self._maybe_skip_test()
def _make_object_names(self, number):
return ['obj%s%04d' % (self.DELIM, x) for x in range(number)]
def _make_object_names(self, number, start=0):
return ['obj%s%04d' % (self.DELIM, x)
for x in range(start, start + number)]
def _setup_container_name(self):
# Container where we're PUTting objects
self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4())
def setUp(self):
@ -116,13 +117,18 @@ class BaseTestContainerSharding(ReplProbeTest):
_, self.admin_token = get_auth(
'http://127.0.0.1:8080/auth/v1.0', 'admin:admin', 'admin')
self._setup_container_name()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
None, 'container')
self.brain.put_container(policy_index=int(self.policy))
self.init_brain(self.container_name)
self.sharders = Manager(['container-sharder'])
self.internal_client = self.make_internal_client()
self.memcache = MemcacheRing(['127.0.0.1:11211'])
def init_brain(self, container_name):
self.container_to_shard = container_name
self.brain = BrainSplitter(
self.url, self.token, self.container_to_shard,
None, 'container')
self.brain.put_container(policy_index=int(self.policy))
def stop_container_servers(self, node_numbers=None):
if node_numbers:
ipports = []
@ -139,45 +145,35 @@ class BaseTestContainerSharding(ReplProbeTest):
wait_for_server_to_hangup(ipport)
def put_objects(self, obj_names, contents=None):
results = []
for obj in obj_names:
rdict = {}
client.put_object(self.url, token=self.token,
container=self.container_name, name=obj,
contents=contents)
contents=contents, response_dict=rdict)
results.append((obj, rdict['headers'].get('x-object-version-id')))
return results
def delete_objects(self, obj_names):
for obj in obj_names:
client.delete_object(
self.url, self.token, self.container_name, obj)
def delete_objects(self, obj_names_and_versions):
for obj in obj_names_and_versions:
if isinstance(obj, tuple):
obj, version = obj
client.delete_object(
self.url, self.token, self.container_name, obj,
query_string='version-id=%s' % version)
else:
client.delete_object(
self.url, self.token, self.container_name, obj)
def get_container_shard_ranges(self, account=None, container=None):
account = account if account else self.account
container = container if container else self.container_name
container = container if container else self.container_to_shard
path = self.internal_client.make_path(account, container)
resp = self.internal_client.make_request(
'GET', path + '?format=json', {'X-Backend-Record-Type': 'shard'},
[200])
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
def direct_container_op(self, func, account=None, container=None,
expect_failure=False):
account = account if account else self.account
container = container if container else self.container_name
cpart, cnodes = self.container_ring.get_nodes(account, container)
unexpected_responses = []
results = {}
for cnode in cnodes:
try:
results[cnode['id']] = func(cnode, cpart, account, container)
except DirectClientException as err:
if not expect_failure:
unexpected_responses.append((cnode, err))
else:
if expect_failure:
unexpected_responses.append((cnode, 'success'))
if unexpected_responses:
self.fail('Unexpected responses: %s' % unexpected_responses)
return results
def direct_get_container_shard_ranges(self, account=None, container=None,
expect_failure=False):
collector = ShardCollector()
@ -185,21 +181,6 @@ class BaseTestContainerSharding(ReplProbeTest):
collector, account, container, expect_failure)
return collector.ranges
def direct_delete_container(self, account=None, container=None,
expect_failure=False):
self.direct_container_op(direct_client.direct_delete_container,
account, container, expect_failure)
def direct_head_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_head_container,
account, container, expect_failure)
def direct_get_container(self, account=None, container=None,
expect_failure=False):
return self.direct_container_op(direct_client.direct_get_container,
account, container, expect_failure)
def get_storage_dir(self, part, node, account=None, container=None):
account = account or self.brain.account
container = container or self.container_name
@ -371,7 +352,7 @@ class BaseTestContainerSharding(ReplProbeTest):
def assert_container_state(self, node, expected_state, num_shard_ranges):
headers, shard_ranges = direct_client.direct_get_container(
node, self.brain.part, self.account, self.container_name,
node, self.brain.part, self.account, self.container_to_shard,
headers={'X-Backend-Record-Type': 'shard'})
self.assertEqual(num_shard_ranges, len(shard_ranges))
self.assertIn('X-Backend-Sharding-State', headers)
@ -560,11 +541,11 @@ class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
def _make_object_names(self, number):
def _make_object_names(self, number, start=0):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
for x in range(start, start + number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
@ -583,6 +564,215 @@ class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
self.container_name = self.container_name.decode('utf8')
class TestContainerShardingObjectVersioning(BaseTestContainerSharding):
def _maybe_skip_test(self):
super(TestContainerShardingObjectVersioning, self)._maybe_skip_test()
try:
vw_config = utils.readconf(self.configs['proxy-server'],
'filter:versioned_writes')
except ValueError:
raise SkipTest('No [filter:versioned_writes] section found in '
'proxy-server configs')
allow_object_versioning = config_true_value(
vw_config.get('allow_object_versioning', False))
if not allow_object_versioning:
raise SkipTest('allow_object_versioning must be true '
'in all versioned_writes configs')
def init_brain(self, container_name):
client.put_container(self.url, self.token, container_name, headers={
'X-Storage-Policy': self.policy.name,
'X-Versions-Enabled': 'true',
})
self.container_to_shard = '\x00versions\x00' + container_name
self.brain = BrainSplitter(
self.url, self.token, self.container_to_shard,
None, 'container')
def test_sharding_listing(self):
# verify parameterised listing of a container during sharding
all_obj_names = self._make_object_names(3) * self.max_shard_size
all_obj_names.extend(self._make_object_names(self.max_shard_size,
start=3))
obj_names = all_obj_names[::2]
obj_names_and_versions = self.put_objects(obj_names)
def sort_key(obj_and_ver):
obj, ver = obj_and_ver
return obj, ~Timestamp(ver)
obj_names_and_versions.sort(key=sort_key)
# choose some names approx in middle of each expected shard range
markers = [
obj_names_and_versions[i]
for i in range(self.max_shard_size // 4,
2 * self.max_shard_size,
self.max_shard_size // 2)]
def check_listing(objects, **params):
params['versions'] = ''
qs = '&'.join('%s=%s' % param for param in params.items())
headers, listing = client.get_container(
self.url, self.token, self.container_name, query_string=qs)
listing = [(x['name'].encode('utf-8') if six.PY2 else x['name'],
x['version_id'])
for x in listing]
if params.get('reverse'):
marker = (
params.get('marker', ShardRange.MAX),
~Timestamp(params['version_marker'])
if 'version_marker' in params else ~Timestamp('0'),
)
end_marker = (
params.get('end_marker', ShardRange.MIN),
Timestamp('0'),
)
expected = [o for o in objects
if end_marker < sort_key(o) < marker]
expected.reverse()
else:
marker = (
params.get('marker', ShardRange.MIN),
~Timestamp(params['version_marker'])
if 'version_marker' in params else Timestamp('0'),
)
end_marker = (
params.get('end_marker', ShardRange.MAX),
~Timestamp('0'),
)
expected = [o for o in objects
if marker < sort_key(o) < end_marker]
if 'limit' in params:
expected = expected[:params['limit']]
self.assertEqual(expected, listing)
def check_listing_fails(exp_status, **params):
params['versions'] = ''
qs = '&'.join('%s=%s' % param for param in params.items())
with self.assertRaises(ClientException) as cm:
client.get_container(
self.url, self.token, self.container_name, query_string=qs)
self.assertEqual(exp_status, cm.exception.http_status)
return cm.exception
def do_listing_checks(objects):
check_listing(objects)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1])
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 10)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 4)
check_listing(objects,
marker=markers[0][0], version_marker=markers[0][1],
limit=self.max_shard_size // 2)
check_listing(objects,
marker=markers[1][0], version_marker=markers[1][1])
check_listing(objects,
marker=markers[1][0], version_marker=markers[1][1],
limit=self.max_shard_size // 10)
check_listing(objects,
marker=markers[2][0], version_marker=markers[2][1],
limit=self.max_shard_size // 4)
check_listing(objects,
marker=markers[2][0], version_marker=markers[2][1],
limit=self.max_shard_size // 2)
check_listing(objects, reverse=True)
check_listing(objects, reverse=True,
marker=markers[1][0], version_marker=markers[1][1])
check_listing(objects, prefix='obj')
check_listing([], prefix='zzz')
# delimiter
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=-')
self.assertEqual([{'subdir': 'obj-'}], listing)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=j-')
self.assertEqual([{'subdir': 'obj-'}], listing)
limit = self.cluster_info['swift']['container_listing_limit']
exc = check_listing_fails(412, limit=limit + 1)
self.assertIn(b'Maximum limit', exc.http_response_content)
exc = check_listing_fails(400, delimiter='%ff')
self.assertIn(b'not valid UTF-8', exc.http_response_content)
# sanity checks
do_listing_checks(obj_names_and_versions)
# Shard the container. Use an internal_client so we get an implicit
# X-Backend-Allow-Reserved-Names header
self.internal_client.set_container_metadata(
self.account, self.container_to_shard, {
'X-Container-Sysmeta-Sharding': 'True',
})
# First run the 'leader' in charge of scanning, which finds all shard
# ranges and cleaves first two
self.sharders.once(number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# Then run sharder on other nodes which will also cleave first two
# shard ranges
for n in self.brain.node_numbers[1:]:
self.sharders.once(
number=n, additional_args='--partitions=%s' % self.brain.part)
# sanity check shard range states
for node in self.brain.nodes:
self.assert_container_state(node, 'sharding', 4)
shard_ranges = self.get_container_shard_ranges()
self.assertLengthEqual(shard_ranges, 4)
self.assert_shard_range_state(ShardRange.CLEAVED, shard_ranges[:2])
self.assert_shard_range_state(ShardRange.CREATED, shard_ranges[2:])
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta() # confirm no sysmeta deleted
self.assert_container_post_ok('sharding')
do_listing_checks(obj_names_and_versions)
# put some new objects spread through entire namespace
new_obj_names = all_obj_names[1::4]
new_obj_names_and_versions = self.put_objects(new_obj_names)
# new objects that fell into the first two cleaved shard ranges are
# reported in listing, new objects in the yet-to-be-cleaved shard
# ranges are not yet included in listing
exp_obj_names_and_versions = [
o for o in obj_names_and_versions + new_obj_names_and_versions
if '\x00' + o[0] <= shard_ranges[1].upper]
exp_obj_names_and_versions += [
o for o in obj_names_and_versions
if '\x00' + o[0] > shard_ranges[1].upper]
exp_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(exp_obj_names_and_versions)
# run all the sharders again and the last two shard ranges get cleaved
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
for node in self.brain.nodes:
self.assert_container_state(node, 'sharded', 4)
shard_ranges = self.get_container_shard_ranges()
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
exp_obj_names_and_versions = \
obj_names_and_versions + new_obj_names_and_versions
exp_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(exp_obj_names_and_versions)
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta()
self.assert_container_post_ok('sharded')
# delete original objects
self.delete_objects(obj_names_and_versions)
new_obj_names_and_versions.sort(key=sort_key)
do_listing_checks(new_obj_names_and_versions)
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta()
self.assert_container_post_ok('sharded')
class TestContainerSharding(BaseTestContainerSharding):
def _test_sharded_listing(self, run_replicators=False):
obj_names = self._make_object_names(self.max_shard_size)

File diff suppressed because it is too large Load Diff

View File

@ -1310,12 +1310,9 @@ class TestSloDeleteManifest(SloTestCase):
set(self.app.calls),
set([('GET',
'/v1/AUTH_test/deltest/man?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/gone?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/man?multipart-manifest=delete')]))
('DELETE', '/v1/AUTH_test/deltest/gone'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/man')]))
self.assertEqual(resp_data['Response Status'], '200 OK')
self.assertEqual(resp_data['Number Deleted'], 2)
self.assertEqual(resp_data['Number Not Found'], 1)
@ -1328,10 +1325,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE', ('/v1/AUTH_test/deltest/' +
'man-all-there?multipart-manifest=delete'))]))
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', ('/v1/AUTH_test/deltest/man-all-there'))]))
def test_handle_multipart_delete_non_ascii(self):
if six.PY2:
@ -1356,10 +1352,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/%s/deltest/man-all-there?multipart-manifest=get' % acct),
('DELETE', '/v1/%s/deltest/b_2?multipart-manifest=delete' % acct),
('DELETE', '/v1/%s/deltest/c_3?multipart-manifest=delete' % acct),
('DELETE', ('/v1/%s/deltest/'
'man-all-there?multipart-manifest=delete' % acct))]))
('DELETE', '/v1/%s/deltest/b_2' % acct),
('DELETE', '/v1/%s/deltest/c_3' % acct),
('DELETE', ('/v1/%s/deltest/man-all-there' % acct))]))
def test_handle_multipart_delete_nested(self):
req = Request.blank(
@ -1369,24 +1364,16 @@ class TestSloDeleteManifest(SloTestCase):
self.call_slo(req)
self.assertEqual(
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=get'),
('GET', '/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/d_3?multipart-manifest=delete'),
('DELETE',
'/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=delete')]))
{('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=get'),
('GET', '/v1/AUTH_test/deltest/' +
'submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/submanifest'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/manifest-with-submanifest')})
def test_handle_multipart_delete_nested_too_many_segments(self):
req = Request.blank(
@ -1410,18 +1397,15 @@ class TestSloDeleteManifest(SloTestCase):
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEqual(
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'a_1?multipart-manifest=delete'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'd_3?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=delete')]))
self.assertEqual(set(self.app.calls), {
('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/manifest-missing-submanifest'),
})
self.assertEqual(resp_data['Response Status'], '200 OK')
self.assertEqual(resp_data['Response Body'], '')
self.assertEqual(resp_data['Number Deleted'], 3)
@ -1510,12 +1494,10 @@ class TestSloDeleteManifest(SloTestCase):
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment?multipart-manifest=get'),
('DELETE',
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest-unauth/' +
'q_17?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest-unauth/q_17'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment?multipart-manifest=delete')]))
'manifest-with-unauth-segment')]))
self.assertEqual(resp_data['Response Status'], '400 Bad Request')
self.assertEqual(resp_data['Response Body'], '')
self.assertEqual(resp_data['Number Deleted'], 2)
@ -1537,10 +1519,9 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/b_2?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/c_3?multipart-manifest=delete'),
('DELETE', ('/v1/AUTH_test/deltest/' +
'man-all-there?multipart-manifest=delete'))]))
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/man-all-there')]))
class TestSloHeadOldManifest(SloTestCase):

View File

@ -60,7 +60,8 @@ class VersionedWritesBaseTestCase(unittest.TestCase):
def setUp(self):
self.app = helpers.FakeSwift()
conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(conf)(self.app)
self.vw = versioned_writes.legacy.VersionedWritesMiddleware(
self.app, conf)
def tearDown(self):
self.assertEqual(self.app.unclosed_requests, {})
@ -842,7 +843,7 @@ class VersionedWritesTestCase(VersionedWritesBaseTestCase):
self.assertTrue(path.startswith('/v1/a/ver_cont/001o/3'))
self.assertNotIn('x-if-delete-at', [h.lower() for h in req_headers])
@mock.patch('swift.common.middleware.versioned_writes.time.time',
@mock.patch('swift.common.middleware.versioned_writes.legacy.time.time',
return_value=1234)
def test_history_delete_marker_no_object_success(self, mock_time):
self.app.register(
@ -872,7 +873,7 @@ class VersionedWritesTestCase(VersionedWritesBaseTestCase):
self.assertEqual('application/x-deleted;swift_versions_deleted=1',
calls[1].headers.get('Content-Type'))
@mock.patch('swift.common.middleware.versioned_writes.time.time',
@mock.patch('swift.common.middleware.versioned_writes.legacy.time.time',
return_value=123456789.54321)
def test_history_delete_marker_over_object_success(self, mock_time):
self.app.register(

View File

@ -889,6 +889,32 @@ class TestTimestamp(unittest.TestCase):
check_is_earlier(b'-9999.999')
check_is_earlier(u'-1234_5678')
def test_inversion(self):
ts = utils.Timestamp(0)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual((~ts).internal, '9999999999.99999')
ts = utils.Timestamp(123456.789)
self.assertIsInstance(~ts, utils.Timestamp)
self.assertEqual(ts.internal, '0000123456.78900')
self.assertEqual((~ts).internal, '9999876543.21099')
timestamps = sorted(utils.Timestamp(random.random() * 1e10)
for _ in range(20))
self.assertEqual([x.internal for x in timestamps],
sorted(x.internal for x in timestamps))
self.assertEqual([(~x).internal for x in reversed(timestamps)],
sorted((~x).internal for x in timestamps))
ts = utils.Timestamp.now()
self.assertGreater(~ts, ts) # NB: will break around 2128
ts = utils.Timestamp.now(offset=1)
with self.assertRaises(ValueError) as caught:
~ts
self.assertEqual(caught.exception.args[0],
'Cannot invert timestamps with offsets')
class TestTimestampEncoding(unittest.TestCase):

View File

@ -1795,7 +1795,12 @@ class TestPipelineModification(unittest.TestCase):
# anywhere other than an attribute named "app", but it works for now.
pipe = []
for _ in range(1000):
pipe.append(app.__class__.__module__)
if app.__class__.__module__ == \
'swift.common.middleware.versioned_writes.legacy':
pipe.append('swift.common.middleware.versioned_writes')
else:
pipe.append(app.__class__.__module__)
if not hasattr(app, 'app'):
break
app = app.app

View File

@ -276,8 +276,7 @@ class TestObjectController(unittest.TestCase):
'X-Object-Meta-4': 'Four',
'Content-Encoding': 'gzip',
'Foo': 'fooheader',
'Bar': 'barheader',
'Content-Type': 'application/x-test'}
'Bar': 'barheader'}
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers=headers)
@ -286,6 +285,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'blue',
})
@ -321,19 +321,20 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': post_timestamp,
'X-Object-Sysmeta-Color': 'red',
'Content-Type': 'application/x-test'})
'Content-Type': 'application/x-test2'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test2',
'X-Object-Sysmeta-Color': 'blue',
})
req = Request.blank('/sda1/p/a/c/o')
resp = req.get_response(self.object_controller)
self.assertEqual(dict(resp.headers), {
'Content-Type': 'application/x-test',
'Content-Type': 'application/x-test2',
'Content-Length': '6',
'Etag': etag,
'X-Object-Sysmeta-Color': 'blue',
@ -403,6 +404,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'red',
})
@ -436,6 +438,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(dict(resp.headers), {
'Content-Type': 'text/html; charset=UTF-8',
'Content-Length': str(len(resp.body)),
'X-Backend-Content-Type': 'application/x-test',
'X-Object-Sysmeta-Color': 'red',
})