Merge "Use cached shard ranges for container GETs"

This commit is contained in:
Zuul 2021-01-08 20:50:45 +00:00 committed by Gerrit Code Review
commit d5bb644a17
15 changed files with 1659 additions and 146 deletions

View File

@ -131,12 +131,19 @@ use = egg:swift#proxy
# recheck_account_existence = 60
# recheck_container_existence = 60
#
# How long the proxy should cache a set of shard ranges for a container.
# How long the proxy should cache a set of shard ranges for a container when
# the set is to be used for directing object updates.
# Note that stale shard range info should be fine; updates will still
# eventually make their way to the correct shard. As a result, you can
# usually set this much higher than the existence checks above.
# recheck_updating_shard_ranges = 3600
#
# How long the proxy should cache a set of shard ranges for a container when
# the set is to be used for gathering object listings.
# Note that stale shard range info might result in incomplete object listings
# so this value should be set less than recheck_updating_shard_ranges.
# recheck_listing_shard_ranges = 600
#
# object_chunk_size = 65536
# client_chunk_size = 65536
#
@ -977,7 +984,7 @@ use = egg:swift#proxy_logging
# log_anonymization_method = MD5
#
# Salt added during log anonymization
# log_anonymization_salt =
# log_anonymization_salt =
#
# Template used to format access logs. All words surrounded by curly brackets
# will be substituted with the appropriate values

View File

@ -28,7 +28,8 @@ import six
from swift.common.header_key_dict import HeaderKeyDict
from swift import gettext_ as _
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX, \
CONTAINER_LISTING_LIMIT
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success, is_server_error
@ -61,7 +62,7 @@ else:
def get_param(req, name, default=None):
"""
Get parameters from an HTTP request ensuring proper handling UTF-8
Get a parameter from an HTTP request ensuring proper handling UTF-8
encoding.
:param req: request object
@ -94,6 +95,27 @@ def get_param(req, name, default=None):
return value
def validate_params(req, names):
"""
Get list of parameters from an HTTP request, validating the encoding of
each parameter.
:param req: request object
:param names: parameter names
:returns: a dict mapping parameter names to values for each name that
appears in the request parameters
:raises HTTPBadRequest: if any parameter value is not a valid UTF-8 byte
sequence
"""
params = {}
for name in names:
value = get_param(req, name)
if value is None:
continue
params[name] = value
return params
def constrain_req_limit(req, constrained_limit):
given_limit = get_param(req, 'limit')
limit = constrained_limit
@ -105,6 +127,14 @@ def constrain_req_limit(req, constrained_limit):
return limit
def validate_container_params(req):
params = validate_params(req, ('marker', 'end_marker', 'prefix',
'delimiter', 'path', 'format', 'reverse',
'states', 'includes'))
params['limit'] = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
return params
def _validate_internal_name(name, type_='name'):
if RESERVED in name and not name.startswith(RESERVED):
raise HTTPBadRequest(body='Invalid reserved-namespace %s' % (type_))

View File

@ -5500,6 +5500,25 @@ def find_shard_range(item, ranges):
return None
def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
if includes:
shard_range = find_shard_range(includes, shard_ranges)
return [shard_range] if shard_range else []
def shard_range_filter(sr):
end = start = True
if end_marker:
end = end_marker > sr.lower
if marker:
start = marker < sr.upper
return start and end
if marker or end_marker:
return list(filter(shard_range_filter, shard_ranges))
return shard_ranges
def modify_priority(conf, logger):
"""
Modify priority by nice and ionice.

View File

@ -30,9 +30,9 @@ from swift.common.constraints import CONTAINER_LISTING_LIMIT
from swift.common.exceptions import LockTimeout
from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
ShardRange, renamer, find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, \
get_db_files, parse_db_filename, make_db_file_path, split_path, \
RESERVED_BYTE
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
filter_shard_ranges
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
@ -1757,14 +1757,6 @@ class ContainerBroker(DatabaseBroker):
at the tail of other shard ranges.
:return: a list of instances of :class:`swift.common.utils.ShardRange`
"""
def shard_range_filter(sr):
end = start = True
if end_marker:
end = end_marker > sr.lower
if marker:
start = marker < sr.upper
return start and end
if reverse:
marker, end_marker = end_marker, marker
if marker and end_marker and marker >= end_marker:
@ -1776,14 +1768,13 @@ class ContainerBroker(DatabaseBroker):
include_deleted=include_deleted, states=states,
include_own=include_own,
exclude_others=exclude_others)]
shard_ranges.sort(key=ShardRange.sort_key)
if includes:
shard_range = find_shard_range(includes, shard_ranges)
return [shard_range] if shard_range else []
if marker or end_marker:
shard_ranges = list(filter(shard_range_filter, shard_ranges))
if fill_gaps:
shard_ranges.sort(key=ShardRange.sort_key)
shard_ranges = filter_shard_ranges(shard_ranges, includes,
marker, end_marker)
if not includes and fill_gaps:
if shard_ranges:
last_upper = shard_ranges[-1].upper
else:

View File

@ -32,9 +32,9 @@ from swift.container.backend import ContainerBroker, DATADIR, \
from swift.container.replicator import ContainerReplicatorRpc
from swift.common.db import DatabaseAlreadyExists
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.request_helpers import get_param, \
split_and_validate_path, is_sys_or_user_meta, \
validate_internal_container, validate_internal_obj, constrain_req_limit
from swift.common.request_helpers import split_and_validate_path, \
is_sys_or_user_meta, validate_internal_container, validate_internal_obj, \
validate_container_params
from swift.common.utils import get_logger, hash_path, public, \
Timestamp, storage_directory, validate_sync_to, \
config_true_value, timing_stats, replication, \
@ -43,7 +43,6 @@ from swift.common.utils import get_logger, hash_path, public, \
ShardRange
from swift.common.constraints import valid_timestamp, check_utf8, \
check_drive, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common import constraints
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import HTTP_NO_CONTENT, HTTP_NOT_FOUND, is_success
@ -647,6 +646,19 @@ class ContainerController(BaseStorageServer):
``sharded``, then the listing will be a list of shard ranges;
otherwise the response body will be a list of objects.
* Both shard range and object listings may be filtered according to
the constraints described below. However, the
``X-Backend-Ignore-Shard-Name-Filter`` header may be used to override
the application of the ``marker``, ``end_marker``, ``includes`` and
``reverse`` parameters to shard range listings. These parameters will
be ignored if the header has the value 'sharded' and the current db
sharding state is also 'sharded'. Note that this header does not
override the ``states`` constraint on shard range listings.
* The order of both shard range and object listings may be reversed by
using a ``reverse`` query string parameter with a
value in :attr:`swift.common.utils.TRUE_VALUES`.
* Both shard range and object listings may be constrained to a name
range by the ``marker`` and ``end_marker`` query string parameters.
Object listings will only contain objects whose names are greater
@ -698,13 +710,14 @@ class ContainerController(BaseStorageServer):
:returns: an instance of :class:`swift.common.swob.Response`
"""
drive, part, account, container, obj = get_obj_name_and_placement(req)
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
limit = constrain_req_limit(req, constraints.CONTAINER_LISTING_LIMIT)
reverse = config_true_value(get_param(req, 'reverse'))
params = validate_container_params(req)
path = params.get('path')
prefix = params.get('prefix')
delimiter = params.get('delimiter')
marker = params.get('marker', '')
end_marker = params.get('end_marker')
limit = params['limit']
reverse = config_true_value(params.get('reverse'))
out_content_type = listing_formats.get_listing_content_type(req)
try:
check_drive(self.root, drive, self.mount_check)
@ -715,8 +728,8 @@ class ContainerController(BaseStorageServer):
stale_reads_ok=True)
info, is_deleted = broker.get_info_is_deleted()
record_type = req.headers.get('x-backend-record-type', '').lower()
if record_type == 'auto' and info.get('db_state') in (SHARDING,
SHARDED):
db_state = info.get('db_state')
if record_type == 'auto' and db_state in (SHARDING, SHARDED):
record_type = 'shard'
if record_type == 'shard':
override_deleted = info and config_true_value(
@ -726,8 +739,16 @@ class ContainerController(BaseStorageServer):
if is_deleted and not override_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
resp_headers['X-Backend-Record-Type'] = 'shard'
includes = get_param(req, 'includes')
states = get_param(req, 'states')
includes = params.get('includes')
override_filter_hdr = req.headers.get(
'x-backend-override-shard-name-filter', '').lower()
if override_filter_hdr == db_state == 'sharded':
# respect the request to send back *all* ranges if the db is in
# sharded state
resp_headers['X-Backend-Override-Shard-Name-Filter'] = 'true'
marker = end_marker = includes = None
reverse = False
states = params.get('states')
fill_gaps = False
if states:
states = list_from_csv(states)

View File

@ -63,13 +63,15 @@ from swift.common.swob import Request, Response, Range, \
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
http_response_to_document_iters, is_object_transient_sysmeta, \
strip_object_transient_sysmeta_prefix, get_ip_port
strip_object_transient_sysmeta_prefix, get_ip_port, get_user_meta_prefix, \
get_sys_meta_prefix
from swift.common.storage_policy import POLICIES
DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds
DEFAULT_RECHECK_LISTING_SHARD_RANGES = 600 # seconds
def update_headers(response, headers):
@ -195,9 +197,100 @@ def headers_to_container_info(headers, status_int=HTTP_OK):
'meta': meta,
'sysmeta': sysmeta,
'sharding_state': headers.get('x-backend-sharding-state', 'unsharded'),
# the 'internal' format version of timestamps is cached since the
# normal format can be derived from this when required
'created_at': headers.get('x-backend-timestamp'),
'put_timestamp': headers.get('x-backend-put-timestamp'),
'delete_timestamp': headers.get('x-backend-delete-timestamp'),
'status_changed_at': headers.get('x-backend-status-changed-at'),
}
def headers_from_container_info(info):
"""
Construct a HeaderKeyDict from a container info dict.
:param info: a dict of container metadata
:returns: a HeaderKeyDict or None if info is None or any required headers
could not be constructed
"""
if not info:
return None
required = (
('x-backend-timestamp', 'created_at'),
('x-backend-put-timestamp', 'put_timestamp'),
('x-backend-delete-timestamp', 'delete_timestamp'),
('x-backend-status-changed-at', 'status_changed_at'),
('x-backend-storage-policy-index', 'storage_policy'),
('x-container-object-count', 'object_count'),
('x-container-bytes-used', 'bytes'),
('x-backend-sharding-state', 'sharding_state'),
)
required_normal_format_timestamps = (
('x-timestamp', 'created_at'),
('x-put-timestamp', 'put_timestamp'),
)
optional = (
('x-container-read', 'read_acl'),
('x-container-write', 'write_acl'),
('x-container-sync-key', 'sync_key'),
('x-container-sync-to', 'sync_to'),
('x-versions-location', 'versions'),
)
cors_optional = (
('access-control-allow-origin', 'allow_origin'),
('access-control-expose-headers', 'expose_headers'),
('access-control-max-age', 'max_age')
)
def lookup(info, key):
# raises KeyError or ValueError
val = info[key]
if val is None:
raise ValueError
return val
# note: required headers may be missing from info for example during
# upgrade when stale info is still in cache
headers = HeaderKeyDict()
for hdr, key in required:
try:
headers[hdr] = lookup(info, key)
except (KeyError, ValueError):
return None
for hdr, key in required_normal_format_timestamps:
try:
headers[hdr] = Timestamp(lookup(info, key)).normal
except (KeyError, ValueError):
return None
for hdr, key in optional:
try:
headers[hdr] = lookup(info, key)
except (KeyError, ValueError):
pass
policy_index = info.get('storage_policy')
headers['x-storage-policy'] = POLICIES[int(policy_index)].name
prefix = get_user_meta_prefix('container')
headers.update(
(prefix + k, v)
for k, v in info.get('meta', {}).items())
for hdr, key in cors_optional:
try:
headers[prefix + hdr] = lookup(info.get('cors'), key)
except (KeyError, ValueError):
pass
prefix = get_sys_meta_prefix('container')
headers.update(
(prefix + k, v)
for k, v in info.get('sysmeta', {}).items())
return headers
def headers_to_object_info(headers, status_int=HTTP_OK):
"""
Construct a cacheable dict of object info based on response headers.
@ -544,9 +637,7 @@ def set_info_cache(app, env, account, container, resp):
infocache = env.setdefault('swift.infocache', {})
memcache = cache_from_env(env, True)
if resp is None:
infocache.pop(cache_key, None)
if memcache:
memcache.delete(cache_key)
clear_info_cache(app, env, account, container)
return
if container:
@ -603,16 +694,24 @@ def set_object_info_cache(app, env, account, container, obj, resp):
return info
def clear_info_cache(app, env, account, container=None):
def clear_info_cache(app, env, account, container=None, shard=None):
"""
Clear the cached info in both memcache and env
:param app: the application object
:param env: the WSGI environment
:param account: the account name
:param container: the containr name or None if setting info for containers
:param container: the container name if clearing info for containers, or
None
:param shard: the sharding state if clearing info for container shard
ranges, or None
"""
set_info_cache(app, env, account, container, None)
cache_key = get_cache_key(account, container, shard=shard)
infocache = env.setdefault('swift.infocache', {})
memcache = cache_from_env(env, True)
infocache.pop(cache_key, None)
if memcache:
memcache.delete(cache_key)
def _get_info_from_infocache(env, account, container=None):
@ -2160,6 +2259,24 @@ class Controller(object):
raise ValueError(
"server_type can only be 'account' or 'container'")
def _parse_listing_response(self, req, response):
if not is_success(response.status_int):
self.app.logger.warning(
'Failed to get container listing from %s: %s',
req.path_qs, response.status_int)
return None
try:
data = json.loads(response.body)
if not isinstance(data, list):
raise ValueError('not a list')
return data
except ValueError as err:
self.app.logger.error(
'Problem with listing response from %s: %r',
req.path_qs, err)
return None
def _get_container_listing(self, req, account, container, headers=None,
params=None):
"""
@ -2167,7 +2284,7 @@ class Controller(object):
:param req: original Request instance.
:param account: account in which `container` is stored.
:param container: container from listing should be fetched.
:param container: container from which listing should be fetched.
:param headers: headers to be included with the request
:param params: query string parameters to be used.
:return: a tuple of (deserialized json data structure, swob Response)
@ -2185,23 +2302,28 @@ class Controller(object):
self.app.logger.debug(
'Get listing from %s %s' % (subreq.path_qs, headers))
response = self.app.handle_request(subreq)
data = self._parse_listing_response(req, response)
return data, response
if not is_success(response.status_int):
self.app.logger.warning(
'Failed to get container listing from %s: %s',
subreq.path_qs, response.status_int)
return None, response
def _parse_shard_ranges(self, req, listing, response):
if listing is None:
return None
record_type = response.headers.get('x-backend-record-type')
if record_type != 'shard':
err = 'unexpected record type %r' % record_type
self.app.logger.error("Failed to get shard ranges from %s: %s",
req.path_qs, err)
return None
try:
data = json.loads(response.body)
if not isinstance(data, list):
raise ValueError('not a list')
return data, response
except ValueError as err:
return [ShardRange.from_dict(shard_range)
for shard_range in listing]
except (ValueError, TypeError, KeyError) as err:
self.app.logger.error(
'Problem with listing response from %s: %r',
subreq.path_qs, err)
return None, response
"Failed to get shard ranges from %s: invalid data: %r",
req.path_qs, err)
return None
def _get_shard_ranges(self, req, account, container, includes=None,
states=None):
@ -2229,24 +2351,7 @@ class Controller(object):
headers = {'X-Backend-Record-Type': 'shard'}
listing, response = self._get_container_listing(
req, account, container, headers=headers, params=params)
if listing is None:
return None
record_type = response.headers.get('x-backend-record-type')
if record_type != 'shard':
err = 'unexpected record type %r' % record_type
self.app.logger.error("Failed to get shard ranges from %s: %s",
req.path_qs, err)
return None
try:
return [ShardRange.from_dict(shard_range)
for shard_range in listing]
except (ValueError, TypeError, KeyError) as err:
self.app.logger.error(
"Failed to get shard ranges from %s: invalid data: %r",
req.path_qs, err)
return None
return self._parse_shard_ranges(req, listing, response)
def _get_update_shard(self, req, account, container, obj):
"""

View File

@ -15,21 +15,23 @@
from swift import gettext_ as _
import json
import math
import six
from six.moves.urllib.parse import unquote
from swift.common.utils import public, private, csv_append, Timestamp, \
config_true_value, ShardRange
config_true_value, ShardRange, cache_from_env, filter_shard_ranges
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
constrain_req_limit, validate_container_params
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, set_info_cache, clear_info_cache
cors_validation, set_info_cache, clear_info_cache, _get_info_from_caches, \
get_cache_key, headers_from_container_info, update_headers
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPNotFound, HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, \
bytes_to_wsgi
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, bytes_to_wsgi, Response
class ContainerController(Controller):
@ -87,6 +89,144 @@ class ContainerController(Controller):
return HTTPBadRequest(request=req, body=str(err))
return None
def _clear_container_info_cache(self, req):
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name, 'listing')
# TODO: should we also purge updating shards from cache?
def _GETorHEAD_from_backend(self, req):
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
concurrency = self.app.container_ring.replica_count \
if self.app.get_policy_options(None).concurrent_gets else 1
node_iter = self.app.iter_nodes(self.app.container_ring, part)
resp = self.GETorHEAD_base(
req, _('Container'), node_iter, part,
req.swift_entity_path, concurrency)
return resp
def _filter_resp_shard_ranges(self, req, cached_ranges):
# filter returned shard ranges according to request constraints
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
includes = get_param(req, 'includes')
reverse = config_true_value(get_param(req, 'reverse'))
if reverse:
marker, end_marker = end_marker, marker
shard_ranges = [
ShardRange.from_dict(shard_range)
for shard_range in cached_ranges]
shard_ranges = filter_shard_ranges(shard_ranges, includes, marker,
end_marker)
if reverse:
shard_ranges.reverse()
return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
def _GET_using_cache(self, req):
# It may be possible to fulfil the request from cache: we only reach
# here if request record_type is 'shard' or 'auto', so if the container
# state is 'sharded' then look for cached shard ranges. However, if
# X-Newest is true then we always fetch from the backend servers.
get_newest = config_true_value(req.headers.get('x-newest', False))
if get_newest:
self.app.logger.debug(
'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
info = None
else:
info = _get_info_from_caches(self.app, req.environ,
self.account_name,
self.container_name)
if (info and is_success(info['status']) and
info.get('sharding_state') == 'sharded'):
# container is sharded so we may have the shard ranges cached
headers = headers_from_container_info(info)
if headers:
# only use cached values if all required headers available
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cache_key = get_cache_key(self.account_name,
self.container_name,
shard='listing')
cached_ranges = infocache.get(cache_key)
if cached_ranges is None and memcache:
cached_ranges = memcache.get(cache_key)
if cached_ranges is not None:
infocache[cache_key] = tuple(cached_ranges)
# shard ranges can be returned from cache
self.app.logger.debug('Found %d shards in cache for %s',
len(cached_ranges), req.path_qs)
headers.update({'x-backend-record-type': 'shard',
'x-backend-cached-results': 'true'})
shard_range_body = self._filter_resp_shard_ranges(
req, cached_ranges)
# mimic GetOrHeadHandler.get_working_response...
# note: server sets charset with content_type but proxy
# GETorHEAD_base does not, so don't set it here either
resp = Response(request=req, body=shard_range_body)
update_headers(resp, headers)
resp.last_modified = math.ceil(
float(headers['x-put-timestamp']))
resp.environ['swift_x_timestamp'] = headers.get(
'x-timestamp')
resp.accept_ranges = 'bytes'
resp.content_type = 'application/json'
return resp
# The request was not fulfilled from cache so send to the backend
# server, but instruct the backend server to ignore name constraints in
# request params if returning shard ranges so that the response can
# potentially be cached. Only do this if the container state is
# 'sharded'. We don't attempt to cache shard ranges for a 'sharding'
# container as they may include the container itself as a 'gap filler'
# for shard ranges that have not yet cleaved; listings from 'gap
# filler' shard ranges are likely to become stale as the container
# continues to cleave objects to its shards and caching them is
# therefore more likely to result in stale or incomplete listings on
# subsequent container GETs.
req.headers['x-backend-override-shard-name-filter'] = 'sharded'
resp = self._GETorHEAD_from_backend(req)
sharding_state = resp.headers.get(
'x-backend-sharding-state', '').lower()
resp_record_type = resp.headers.get(
'x-backend-record-type', '').lower()
complete_listing = config_true_value(resp.headers.pop(
'x-backend-override-shard-name-filter', False))
# given that we sent 'x-backend-override-shard-name-filter=sharded' we
# should only receive back 'x-backend-override-shard-name-filter=true'
# if the sharding state is 'sharded', but check them both anyway...
if (resp_record_type == 'shard' and
sharding_state == 'sharded' and
complete_listing):
# backend returned unfiltered listing state shard ranges so parse
# them and replace response body with filtered listing
cache_key = get_cache_key(self.account_name, self.container_name,
shard='listing')
data = self._parse_listing_response(req, resp)
backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
if backend_shard_ranges is not None:
cached_ranges = [dict(sr) for sr in backend_shard_ranges]
if resp.headers.get('x-backend-sharding-state') == 'sharded':
# cache in infocache even if no shard ranges returned; this
# is unexpected but use that result for this request
infocache = req.environ.setdefault('swift.infocache', {})
infocache[cache_key] = tuple(cached_ranges)
memcache = cache_from_env(req.environ, True)
if memcache and cached_ranges:
# cache in memcache only if shard ranges as expected
self.app.logger.debug('Caching %d shards for %s',
len(cached_ranges), req.path_qs)
memcache.set(
cache_key, cached_ranges,
time=self.app.recheck_listing_shard_ranges)
# filter returned shard ranges according to request constraints
resp.body = self._filter_resp_shard_ranges(req, cached_ranges)
return resp
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
ai = self.account_info(self.account_name, req)
@ -102,33 +242,51 @@ class ContainerController(Controller):
# Don't cache this. The lack of account will be cached, and that
# is sufficient.
return HTTPNotFound(request=req)
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
concurrency = self.app.container_ring.replica_count \
if self.app.get_policy_options(None).concurrent_gets else 1
node_iter = self.app.iter_nodes(self.app.container_ring, part)
# The read-modify-write of params here is because the Request.params
# getter dynamically generates a dict of params from the query string;
# the setter must be called for new params to update the query string.
params = req.params
params['format'] = 'json'
# x-backend-record-type may be sent via internal client e.g. from
# the sharder or in probe tests
record_type = req.headers.get('X-Backend-Record-Type', '').lower()
if not record_type:
record_type = 'auto'
req.headers['X-Backend-Record-Type'] = 'auto'
params['states'] = 'listing'
req.params = params
resp = self.GETorHEAD_base(
req, _('Container'), node_iter, part,
req.swift_entity_path, concurrency)
memcache = cache_from_env(req.environ, True)
if (req.method == 'GET' and
record_type != 'object' and
self.app.recheck_listing_shard_ranges > 0 and
memcache and
get_param(req, 'states') == 'listing' and
not config_true_value(
req.headers.get('x-backend-include-deleted', False))):
# This GET might be served from cache or might populate cache.
# 'x-backend-include-deleted' is not usually expected in requests
# to the proxy (it is used from sharder to container servers) but
# it is included in the conditions just in case because we don't
# cache deleted shard ranges.
resp = self._GET_using_cache(req)
else:
resp = self._GETorHEAD_from_backend(req)
resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
if all((req.method == "GET", record_type == 'auto',
resp_record_type.lower() == 'shard')):
resp = self._get_from_shards(req, resp)
# Cache this. We just made a request to a storage node and got
# up-to-date information for the container.
resp.headers['X-Backend-Recheck-Container-Existence'] = str(
self.app.recheck_container_existence)
set_info_cache(self.app, req.environ, self.account_name,
self.container_name, resp)
if not config_true_value(
resp.headers.get('X-Backend-Cached-Results')):
# Cache container metadata. We just made a request to a storage
# node and got up-to-date information for the container.
resp.headers['X-Backend-Recheck-Container-Existence'] = str(
self.app.recheck_container_existence)
set_info_cache(self.app, req.environ, self.account_name,
self.container_name, resp)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
@ -171,7 +329,7 @@ class ContainerController(Controller):
return resp
objects = []
req_limit = int(req.params.get('limit') or CONTAINER_LISTING_LIMIT)
req_limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
params = req.params.copy()
params.pop('states', None)
req.headers.pop('X-Backend-Record-Type', None)
@ -181,7 +339,7 @@ class ContainerController(Controller):
prefix = wsgi_to_str(params.get('prefix'))
limit = req_limit
for shard_range in shard_ranges:
for i, shard_range in enumerate(shard_ranges):
params['limit'] = limit
# Always set marker to ensure that object names less than or equal
# to those already in the listing are not fetched; if the listing
@ -207,12 +365,13 @@ class ContainerController(Controller):
else:
params['end_marker'] = str_to_wsgi(shard_range.end_marker)
headers = {}
if ((shard_range.account, shard_range.container) in
shard_listing_history):
# directed back to same container - force GET of objects
headers = {'X-Backend-Record-Type': 'object'}
else:
headers = None
headers['X-Backend-Record-Type'] = 'object'
if config_true_value(req.headers.get('x-newest', False)):
headers['X-Newest'] = 'true'
if prefix:
if prefix > shard_range:
@ -225,14 +384,33 @@ class ContainerController(Controller):
if just_past < shard_range:
continue
self.app.logger.debug('Getting from %s %s with %s',
shard_range, shard_range.name, headers)
self.app.logger.debug(
'Getting listing part %d from shard %s %s with %s',
i, shard_range, shard_range.name, headers)
objs, shard_resp = self._get_container_listing(
req, shard_range.account, shard_range.container,
headers=headers, params=params)
shard_state = 'unknown'
try:
shard_state = shard_resp.headers['x-backend-sharding-state']
shard_state = ShardRange.resolve_state(shard_state)
except (AttributeError, ValueError, KeyError):
pass
if objs is None:
# tolerate errors
self.app.logger.debug(
'Failed to get objects from shard (state=%s), total = %d',
shard_state, len(objects))
continue
self.app.logger.debug(
'Found %d objects in shard (state=%s), total = %d',
len(objs), shard_state, len(objs) + len(objects))
if not objs:
# tolerate errors or empty shard containers
# tolerate empty shard containers
continue
objects.extend(objs)
@ -270,6 +448,8 @@ class ContainerController(Controller):
@cors_validation
def GET(self, req):
"""Handler for HTTP GET requests."""
# early checks for request validity
validate_container_params(req)
return self.GETorHEAD(req)
@public
@ -328,8 +508,7 @@ class ContainerController(Controller):
resp = self.make_requests(
req, self.app.container_ring,
container_partition, 'PUT', req.swift_entity_path, headers)
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)
self._clear_container_info_cache(req)
return resp
@public
@ -354,8 +533,7 @@ class ContainerController(Controller):
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = self.generate_request_headers(req, transfer=True)
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)
self._clear_container_info_cache(req)
resp = self.make_requests(
req, self.app.container_ring, container_partition, 'POST',
req.swift_entity_path, [headers] * len(containers))
@ -373,8 +551,7 @@ class ContainerController(Controller):
self.account_name, self.container_name)
headers = self._backend_requests(req, len(containers),
account_partition, accounts)
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)
self._clear_container_info_cache(req)
resp = self.make_requests(
req, self.app.container_ring, container_partition, 'DELETE',
req.swift_entity_path, headers)

View File

@ -41,7 +41,7 @@ from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController
from swift.proxy.controllers.base import get_container_info, NodeIter, \
DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \
DEFAULT_RECHECK_UPDATING_SHARD_RANGES
DEFAULT_RECHECK_UPDATING_SHARD_RANGES, DEFAULT_RECHECK_LISTING_SHARD_RANGES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \
@ -221,6 +221,9 @@ class Application(object):
self.recheck_updating_shard_ranges = \
int(conf.get('recheck_updating_shard_ranges',
DEFAULT_RECHECK_UPDATING_SHARD_RANGES))
self.recheck_listing_shard_ranges = \
int(conf.get('recheck_listing_shard_ranges',
DEFAULT_RECHECK_LISTING_SHARD_RANGES))
self.recheck_account_existence = \
int(conf.get('recheck_account_existence',
DEFAULT_RECHECK_ACCOUNT_EXISTENCE))

View File

@ -288,9 +288,10 @@ class BaseTestContainerSharding(ReplProbeTest):
actual = sum(sr['object_count'] for sr in shard_ranges)
self.assertEqual(expected_object_count, actual)
def assert_container_listing(self, expected_listing):
def assert_container_listing(self, expected_listing, req_hdrs=None):
req_hdrs = req_hdrs if req_hdrs else {}
headers, actual_listing = client.get_container(
self.url, self.token, self.container_name)
self.url, self.token, self.container_name, headers=req_hdrs)
self.assertIn('x-container-object-count', headers)
expected_obj_count = len(expected_listing)
self.assertEqual(expected_listing, [
@ -390,18 +391,21 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
# verify parameterised listing of a container during sharding
all_obj_names = self._make_object_names(4 * self.max_shard_size)
obj_names = all_obj_names[::2]
self.put_objects(obj_names)
obj_content = 'testing'
self.put_objects(obj_names, contents=obj_content)
# choose some names approx in middle of each expected shard range
markers = [
obj_names[i] for i in range(self.max_shard_size // 4,
2 * self.max_shard_size,
self.max_shard_size // 2)]
def check_listing(objects, **params):
def check_listing(objects, req_hdrs=None, **params):
req_hdrs = req_hdrs if req_hdrs else {}
qs = '&'.join('%s=%s' % (k, quote(str(v)))
for k, v in params.items())
headers, listing = client.get_container(
self.url, self.token, self.container_name, query_string=qs)
self.url, self.token, self.container_name, query_string=qs,
headers=req_hdrs)
listing = [x['name'].encode('utf-8') if six.PY2 else x['name']
for x in listing]
if params.get('reverse'):
@ -416,6 +420,12 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
if 'limit' in params:
expected = expected[:params['limit']]
self.assertEqual(expected, listing)
self.assertIn('x-timestamp', headers)
self.assertIn('last-modified', headers)
self.assertIn('x-trans-id', headers)
self.assertEqual('bytes', headers.get('accept-ranges'))
self.assertEqual('application/json; charset=utf-8',
headers.get('content-type'))
def check_listing_fails(exp_status, **params):
qs = '&'.join(['%s=%s' % param for param in params.items()])
@ -425,38 +435,39 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.assertEqual(exp_status, cm.exception.http_status)
return cm.exception
def do_listing_checks(objects):
check_listing(objects)
check_listing(objects, marker=markers[0], end_marker=markers[1])
check_listing(objects, marker=markers[0], end_marker=markers[2])
check_listing(objects, marker=markers[1], end_marker=markers[3])
check_listing(objects, marker=markers[1], end_marker=markers[3],
def do_listing_checks(objs, hdrs=None):
hdrs = hdrs if hdrs else {}
check_listing(objs, hdrs)
check_listing(objs, hdrs, marker=markers[0], end_marker=markers[1])
check_listing(objs, hdrs, marker=markers[0], end_marker=markers[2])
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3])
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3],
limit=self.max_shard_size // 4)
check_listing(objects, marker=markers[1], end_marker=markers[3],
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3],
limit=self.max_shard_size // 4)
check_listing(objects, marker=markers[1], end_marker=markers[2],
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[2],
limit=self.max_shard_size // 2)
check_listing(objects, marker=markers[1], end_marker=markers[1])
check_listing(objects, reverse=True)
check_listing(objects, reverse=True, end_marker=markers[1])
check_listing(objects, reverse=True, marker=markers[3],
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[1])
check_listing(objs, hdrs, reverse=True)
check_listing(objs, hdrs, reverse=True, end_marker=markers[1])
check_listing(objs, hdrs, reverse=True, marker=markers[3],
end_marker=markers[1],
limit=self.max_shard_size // 4)
check_listing(objects, reverse=True, marker=markers[3],
check_listing(objs, hdrs, reverse=True, marker=markers[3],
end_marker=markers[1], limit=0)
check_listing([], marker=markers[0], end_marker=markers[0])
check_listing([], marker=markers[0], end_marker=markers[1],
check_listing([], hdrs, marker=markers[0], end_marker=markers[0])
check_listing([], hdrs, marker=markers[0], end_marker=markers[1],
reverse=True)
check_listing(objects, prefix='obj')
check_listing([], prefix='zzz')
check_listing(objs, hdrs, prefix='obj')
check_listing([], hdrs, prefix='zzz')
# delimiter
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=' + quote(self.DELIM))
query_string='delimiter=' + quote(self.DELIM), headers=hdrs)
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=j' + quote(self.DELIM))
query_string='delimiter=j' + quote(self.DELIM), headers=hdrs)
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
limit = self.cluster_info['swift']['container_listing_limit']
@ -494,13 +505,16 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.assert_container_post_ok('sharding')
do_listing_checks(obj_names)
# put some new objects spread through entire namespace
# put some new objects spread through entire namespace; object updates
# should be directed to the shard container (both the cleaved and the
# created shards)
new_obj_names = all_obj_names[1::4]
self.put_objects(new_obj_names)
self.put_objects(new_obj_names, obj_content)
# new objects that fell into the first two cleaved shard ranges are
# reported in listing, new objects in the yet-to-be-cleaved shard
# ranges are not yet included in listing
# reported in listing; new objects in the yet-to-be-cleaved shard
# ranges are not yet included in listing because listings prefer the
# root over the final two shards that are not yet-cleaved
exp_obj_names = [o for o in obj_names + new_obj_names
if o <= shard_ranges[1].upper]
exp_obj_names += [o for o in obj_names
@ -515,9 +529,53 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
shard_ranges = self.get_container_shard_ranges()
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
# listings are now gathered from all four shard ranges so should have
# all the specified objects
exp_obj_names = obj_names + new_obj_names
exp_obj_names.sort()
do_listing_checks(exp_obj_names)
# shard ranges may now be cached by proxy so do listings checks again
# forcing backend request
do_listing_checks(exp_obj_names, hdrs={'X-Newest': 'true'})
# post more metadata to the container and check that it is read back
# correctly from backend (using x-newest) and cache
test_headers = {'x-container-meta-test': 'testing',
'x-container-read': 'read_acl',
'x-container-write': 'write_acl',
'x-container-sync-key': 'sync_key',
# 'x-container-sync-to': 'sync_to',
'x-versions-location': 'versions',
'x-container-meta-access-control-allow-origin': 'aa',
'x-container-meta-access-control-expose-headers': 'bb',
'x-container-meta-access-control-max-age': '123'}
client.post_container(self.url, self.admin_token, self.container_name,
headers=test_headers)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
headers={'X-Newest': 'true'})
exp_headers = dict(test_headers)
exp_headers.update({
'x-container-object-count': str(len(exp_obj_names)),
'x-container-bytes-used':
str(len(exp_obj_names) * len(obj_content))
})
for k, v in exp_headers.items():
self.assertIn(k, headers)
self.assertEqual(v, headers[k], dict(headers))
cache_headers, listing = client.get_container(
self.url, self.token, self.container_name)
for k, v in exp_headers.items():
self.assertIn(k, cache_headers)
self.assertEqual(v, cache_headers[k], dict(exp_headers))
# we don't expect any of these headers to be equal...
for k in ('x-timestamp', 'last-modified', 'date', 'x-trans-id',
'x-openstack-request-id'):
headers.pop(k, None)
cache_headers.pop(k, None)
self.assertEqual(headers, cache_headers)
self.assert_container_delete_fails()
self.assert_container_has_shard_sysmeta()
self.assert_container_post_ok('sharded')
@ -881,20 +939,29 @@ class TestContainerSharding(BaseTestContainerSharding):
# ... and check some other container properties
self.assertEqual(headers['last-modified'],
pre_sharding_headers['last-modified'])
# It even works in reverse!
headers, listing = client.get_container(self.url, self.token,
self.container_name,
query_string='reverse=on')
self.assertEqual(pre_sharding_listing[::-1], listing)
# and repeat checks to use shard ranges now cached in proxy
headers, actual_listing = self.assert_container_listing(obj_names)
self.assertEqual(headers['last-modified'],
pre_sharding_headers['last-modified'])
headers, listing = client.get_container(self.url, self.token,
self.container_name,
query_string='reverse=on')
self.assertEqual(pre_sharding_listing[::-1], listing)
# Now put some new objects into first shard, taking its count to
# 3 shard ranges' worth
more_obj_names = [
'beta%03d' % x for x in range(self.max_shard_size)]
self.put_objects(more_obj_names)
# The listing includes new objects...
# The listing includes new objects (shard ranges haven't changed, just
# their object content, so cached shard ranges are still correct)...
headers, listing = self.assert_container_listing(
more_obj_names + obj_names)
self.assertEqual(pre_sharding_listing, listing[len(more_obj_names):])
@ -2002,10 +2069,14 @@ class TestContainerSharding(BaseTestContainerSharding):
# then run sharder on the shard node without the alpha object
self.sharders.once(additional_args='--partitions=%s' % shard_part,
number=shard_nodes[2])
# root sees first shard has shrunk, only second shard range used for
# listing so alpha object not in listing
# root sees first shard has shrunk
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
self.assert_container_listing([])
# cached shard ranges still show first shard range as active so listing
# will include 'alpha' if the shard listing is fetched from node (0,1)
# but not if fetched from node 2; to achieve predictability we use
# x-newest to use shard ranges from the root so that only the second
# shard range is used for listing, so alpha object not in listing
self.assert_container_listing([], req_hdrs={'x-newest': 'true'})
self.assert_container_object_count(0)
# run the updaters: the async pending update will be redirected from

View File

@ -396,18 +396,29 @@ class FakeMemcache(object):
def __init__(self):
self.store = {}
self.calls = []
def clear_calls(self):
del self.calls[:]
def _called(self, method, key=None, value=None, time=None):
self.calls.append((method, key, value, time))
def get(self, key):
self._called('get', key)
return self.store.get(key)
def keys(self):
self._called('keys')
return self.store.keys()
def set(self, key, value, time=0):
self._called('set', key, value, time)
self.store[key] = value
return True
def incr(self, key, time=0):
self._called('incr', key, time=time)
self.store[key] = self.store.setdefault(key, 0) + 1
return self.store[key]
@ -416,12 +427,16 @@ class FakeMemcache(object):
yield True
def delete(self, key):
self._called('delete', key)
try:
del self.store[key]
except Exception:
pass
return True
def delete_all(self):
self.store.clear()
class FakeIterable(object):
def __init__(self, values):

View File

@ -42,6 +42,70 @@ class TestRequestHelpers(unittest.TestCase):
rh.constrain_req_limit(req, 10)
self.assertEqual(raised.exception.status_int, 412)
def test_validate_params(self):
req = Request.blank('')
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
self.assertEqual({}, actual)
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
actual = rh.validate_params(req, ())
self.assertEqual({}, actual)
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
expected = {'limit': '1', 'marker': 'foo'}
self.assertEqual(expected, actual)
req = Request.blank('', query_string='limit=1;junk=here;marker=')
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
expected = {'limit': '1', 'marker': ''}
self.assertEqual(expected, actual)
# ignore bad junk
req = Request.blank('', query_string='limit=1;junk=%ff;marker=foo')
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
expected = {'limit': '1', 'marker': 'foo'}
self.assertEqual(expected, actual)
# error on bad wanted parameter
req = Request.blank('', query_string='limit=1;junk=here;marker=%ff')
with self.assertRaises(HTTPException) as raised:
rh.validate_params(req, ('limit', 'marker', 'end_marker'))
self.assertEqual(raised.exception.status_int, 400)
def test_validate_container_params(self):
req = Request.blank('')
actual = rh.validate_container_params(req)
self.assertEqual({'limit': 10000}, actual)
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
actual = rh.validate_container_params(req)
expected = {'limit': 1, 'marker': 'foo'}
self.assertEqual(expected, actual)
req = Request.blank('', query_string='limit=1;junk=here;marker=')
actual = rh.validate_container_params(req)
expected = {'limit': 1, 'marker': ''}
self.assertEqual(expected, actual)
# ignore bad junk
req = Request.blank('', query_string='limit=1;junk=%ff;marker=foo')
actual = rh.validate_container_params(req)
expected = {'limit': 1, 'marker': 'foo'}
self.assertEqual(expected, actual)
# error on bad wanted parameter
req = Request.blank('', query_string='limit=1;junk=here;marker=%ff')
with self.assertRaises(HTTPException) as raised:
rh.validate_container_params(req)
self.assertEqual(raised.exception.status_int, 400)
# error on bad limit
req = Request.blank('', query_string='limit=10001')
with self.assertRaises(HTTPException) as raised:
rh.validate_container_params(req)
self.assertEqual(raised.exception.status_int, 412)
def test_is_user_meta(self):
m_type = 'meta'
for st in server_types:

View File

@ -2643,7 +2643,8 @@ class TestContainerController(unittest.TestCase):
# make a container
ts_iter = make_timestamp_iter()
ts_now = Timestamp.now() # used when mocking Timestamp.now()
headers = {'X-Timestamp': next(ts_iter).normal}
ts_put = next(ts_iter)
headers = {'X-Timestamp': ts_put.normal}
req = Request.blank('/sda1/p/a/c', method='PUT', headers=headers)
self.assertEqual(201, req.get_response(self.controller).status_int)
# PUT some objects
@ -2713,6 +2714,25 @@ class TestContainerController(unittest.TestCase):
self.assertIn('X-Backend-Record-Type', resp.headers)
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
def check_shard_GET_override_filter(
expected_shard_ranges, path, state, params=''):
req_headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': state}
req = Request.blank('/sda1/p/%s?format=json%s' %
(path, params), method='GET',
headers=req_headers)
with mock_timestamp_now(ts_now):
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.content_type, 'application/json')
expected = [
dict(sr, last_modified=Timestamp(sr.timestamp).isoformat)
for sr in expected_shard_ranges]
self.assertEqual(expected, json.loads(resp.body))
self.assertIn('X-Backend-Record-Type', resp.headers)
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
return resp
# all shards
check_shard_GET(shard_ranges, 'a/c')
check_shard_GET(reversed(shard_ranges), 'a/c', params='&reverse=true')
@ -2862,6 +2882,72 @@ class TestContainerController(unittest.TestCase):
check_shard_GET([], 'a/c',
params='&marker=cheese&end_marker=egg&reverse=true')
# now vary the sharding state and check the consequences of sending the
# x-backend-override-shard-name-filter header:
# in unsharded & sharding state the header should be ignored
self.assertEqual('unsharded', broker.get_db_state())
check_shard_GET(
reversed(shard_ranges[:2]), 'a/c',
params='&states=listing&reverse=true&marker=egg')
resp = check_shard_GET_override_filter(
reversed(shard_ranges[:2]), 'a/c', state='unsharded',
params='&states=listing&reverse=true&marker=egg')
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
resp = check_shard_GET_override_filter(
reversed(shard_ranges[:2]), 'a/c', state='sharded',
params='&states=listing&reverse=true&marker=egg')
self.assertIsNone(
resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
ts_epoch = next(ts_iter)
broker.enable_sharding(ts_epoch)
self.assertTrue(broker.set_sharding_state())
check_shard_GET(
reversed(shard_ranges[:2]), 'a/c',
params='&states=listing&reverse=true&marker=egg')
resp = check_shard_GET_override_filter(
reversed(shard_ranges[:2]), 'a/c', state='sharding',
params='&states=listing&reverse=true&marker=egg')
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
resp = check_shard_GET_override_filter(
reversed(shard_ranges[:2]), 'a/c', state='sharded',
params='&states=listing&reverse=true&marker=egg')
self.assertIsNone(
resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
# in sharded state the server *will* override the marker and reverse
# params and return listing shard ranges for entire namespace if
# X-Backend-Override-Shard-Name-Filter == 'sharded'
self.assertTrue(broker.set_sharded_state())
ts_now = next(ts_iter)
with mock_timestamp_now(ts_now):
extra_shard_range = broker.get_own_shard_range()
extra_shard_range.lower = shard_ranges[2].upper
extra_shard_range.upper = ShardRange.MAX
check_shard_GET(
reversed(shard_ranges[:2]), 'a/c',
params='&states=listing&reverse=true&marker=egg')
expected = shard_ranges[:3] + [extra_shard_range]
resp = check_shard_GET_override_filter(
reversed(shard_ranges[:2]), 'a/c', state='sharding',
params='&states=listing&reverse=true&marker=egg')
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
resp = check_shard_GET_override_filter(
expected, 'a/c', state='sharded',
params='&states=listing&reverse=true&marker=egg')
self.assertEqual(
'true', resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
# updating state excludes the first shard which has 'shrinking' state
# but includes the fourth which has 'created' state
extra_shard_range.lower = shard_ranges[3].upper
check_shard_GET(
shard_ranges[1:2], 'a/c',
params='&states=updating&includes=egg')
expected = shard_ranges[1:4] + [extra_shard_range]
resp = check_shard_GET_override_filter(
expected, 'a/c', state='sharded',
params='&states=updating&includes=egg')
self.assertEqual(
'true', resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
# delete a shard range
shard_range = shard_ranges[1]
shard_range.set_deleted(timestamp=next(ts_iter))

View File

@ -27,7 +27,7 @@ from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \
get_cache_key, get_account_info, get_info, get_object_info, \
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
set_info_cache, NodeIter
set_info_cache, NodeIter, headers_from_container_info
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
bytes_to_wsgi
from swift.common import exceptions
@ -470,6 +470,16 @@ class TestFuncs(BaseTest):
u"\U0001F334".encode('utf8')),
expected)
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
'shard-listing/account/cont')
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
'shard-updating/account/cont')
self.assertRaises(ValueError,
get_cache_key, "account", shard="listing")
self.assertRaises(ValueError,
get_cache_key, "account", "cont", "obj",
shard="listing")
def test_get_container_info_env(self):
cache_key = get_cache_key("account", "cont")
req = Request.blank(
@ -509,6 +519,16 @@ class TestFuncs(BaseTest):
check_not_in_cache(req, acct_cache_key)
check_not_in_cache(req, cont_cache_key)
# check shard cache-keys
shard_cache_key = get_cache_key('account', 'cont', shard='listing')
shard_data = [{'shard': 'ranges'}]
req.environ['swift.infocache'][shard_cache_key] = shard_data
req.environ['swift.cache'].set(shard_cache_key, shard_data, time=600)
check_in_cache(req, shard_cache_key)
clear_info_cache('app-is-unused', req.environ, 'account', 'cont',
shard='listing')
check_not_in_cache(req, shard_cache_key)
def test_get_account_info_swift_source(self):
app = FakeApp()
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()})
@ -718,6 +738,101 @@ class TestFuncs(BaseTest):
resp,
headers_to_container_info(headers.items(), 200))
def test_headers_from_container_info(self):
self.assertIsNone(headers_from_container_info(None))
self.assertIsNone(headers_from_container_info({}))
meta = {'fruit': 'cake'}
sysmeta = {'green': 'land'}
info = {
'status': 200,
'read_acl': 'my-read-acl',
'write_acl': 'my-write-acl',
'sync_to': 'my-sync-to',
'sync_key': 'my-sync-key',
'object_count': 99,
'bytes': 999,
'versions': 'my-versions',
'storage_policy': '0',
'cors': {
'allow_origin': 'my-cors-origin',
'expose_headers': 'my-cors-hdrs',
'max_age': 'my-cors-age'},
'created_at': '123.456_12',
'put_timestamp': '234.567_34',
'delete_timestamp': '345_67',
'status_changed_at': '246.8_9',
'meta': meta,
'sysmeta': sysmeta,
'sharding_state': 'unsharded'
}
res = headers_from_container_info(info)
expected = {
'X-Backend-Delete-Timestamp': '345_67',
'X-Backend-Put-Timestamp': '234.567_34',
'X-Backend-Sharding-State': 'unsharded',
'X-Backend-Status-Changed-At': '246.8_9',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Timestamp': '123.456_12',
'X-Container-Bytes-Used': '999',
'X-Container-Meta-Fruit': 'cake',
'X-Container-Object-Count': '99',
'X-Container-Read': 'my-read-acl',
'X-Container-Sync-Key': 'my-sync-key',
'X-Container-Sync-To': 'my-sync-to',
'X-Container-Sysmeta-Green': 'land',
'X-Container-Write': 'my-write-acl',
'X-Put-Timestamp': '0000000234.56700',
'X-Storage-Policy': 'zero',
'X-Timestamp': '0000000123.45600',
'X-Versions-Location': 'my-versions',
'X-Container-Meta-Access-Control-Allow-Origin': 'my-cors-origin',
'X-Container-Meta-Access-Control-Expose-Headers': 'my-cors-hdrs',
'X-Container-Meta-Access-Control-Max-Age': 'my-cors-age',
}
self.assertEqual(expected, res)
for required in (
'created_at', 'put_timestamp', 'delete_timestamp',
'status_changed_at', 'storage_policy', 'object_count', 'bytes',
'sharding_state'):
incomplete_info = dict(info)
incomplete_info.pop(required)
self.assertIsNone(headers_from_container_info(incomplete_info))
for hdr, optional in (
('X-Container-Read', 'read_acl'),
('X-Container-Write', 'write_acl'),
('X-Container-Sync-Key', 'sync_key'),
('X-Container-Sync-To', 'sync_to'),
('X-Versions-Location', 'versions'),
('X-Container-Meta-Fruit', 'meta'),
('X-Container-Sysmeta-Green', 'sysmeta'),
):
incomplete_info = dict(info)
incomplete_info.pop(optional)
incomplete_expected = dict(expected)
incomplete_expected.pop(hdr)
self.assertEqual(incomplete_expected,
headers_from_container_info(incomplete_info))
for hdr, optional in (
('Access-Control-Allow-Origin', 'allow_origin'),
('Access-Control-Expose-Headers', 'expose_headers'),
('Access-Control-Max-Age', 'max_age'),
):
incomplete_info = dict(info)
incomplete_cors = dict(info['cors'])
incomplete_cors.pop(optional)
incomplete_info['cors'] = incomplete_cors
incomplete_expected = dict(expected)
incomplete_expected.pop('X-Container-Meta-' + hdr)
self.assertEqual(incomplete_expected,
headers_from_container_info(incomplete_info))
def test_container_info_needs_req(self):
base = Controller(self.app)
base.account_name = 'a'

View File

@ -580,6 +580,7 @@ class TestContainerController(TestRingBase):
limit = CONTAINER_LISTING_LIMIT
expected_objects = all_objects
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
# pretend root object stats are not yet updated
'X-Container-Object-Count': num_all_objects - 1,
'X-Container-Bytes-Used': size_all_objects - 1,
@ -918,6 +919,7 @@ class TestContainerController(TestRingBase):
limit = CONTAINER_LISTING_LIMIT
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
# pretend root object stats are not yet updated
'X-Container-Object-Count': 6,
'X-Container-Bytes-Used': 12,
@ -1152,6 +1154,7 @@ class TestContainerController(TestRingBase):
num_all_objects = len(all_objects)
limit = CONTAINER_LISTING_LIMIT
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
# pretend root object stats are not yet updated
'X-Container-Object-Count': num_all_objects - 1,
'X-Container-Bytes-Used': size_all_objects - 1,
@ -1258,6 +1261,7 @@ class TestContainerController(TestRingBase):
num_all_objects = len(all_objects)
limit = CONTAINER_LISTING_LIMIT
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': num_all_objects,
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1320,6 +1324,7 @@ class TestContainerController(TestRingBase):
all_objects = sr_objs[1] + sr_objs[2]
size_all_objects = sum([obj['bytes'] for obj in all_objects])
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': len(all_objects),
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1359,6 +1364,7 @@ class TestContainerController(TestRingBase):
all_objects = sr_objs[0] + sr_objs[1]
size_all_objects = sum([obj['bytes'] for obj in all_objects])
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': len(all_objects),
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1399,6 +1405,7 @@ class TestContainerController(TestRingBase):
all_objects = sr_objs[0] + sr_objs[2]
size_all_objects = sum([obj['bytes'] for obj in all_objects])
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': len(all_objects),
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1513,6 +1520,7 @@ class TestContainerController(TestRingBase):
num_all_objects = len(all_objects)
limit = CONTAINER_LISTING_LIMIT
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': num_all_objects,
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1595,6 +1603,7 @@ class TestContainerController(TestRingBase):
num_all_objects = len(all_objects)
limit = CONTAINER_LISTING_LIMIT
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
'X-Backend-Timestamp': '99',
'X-Container-Object-Count': num_all_objects,
'X-Container-Bytes-Used': size_all_objects,
'X-Container-Meta-Flavour': 'peach',
@ -1652,6 +1661,789 @@ class TestContainerController(TestRingBase):
# root object count will overridden by actual length of listing
self.check_response(resp, root_resp_hdrs)
def _build_request(self, headers, params, infocache=None):
# helper to make a GET request with caches set in environ
query_string = '?' + ';'.join('%s=%s' % (k, v)
for k, v in params.items())
container_path = '/v1/a/c' + query_string
request = Request.blank(container_path, headers=headers)
request.environ['swift.cache'] = self.memcache
request.environ['swift.infocache'] = infocache if infocache else {}
return request
def _check_response(self, resp, exp_shards, extra_hdrs):
# helper to check a shard listing response
actual_shards = json.loads(resp.body)
self.assertEqual(exp_shards, actual_shards)
exp_hdrs = dict(self.root_resp_hdrs)
# x-put-timestamp is sent from backend but removed in proxy base
# controller GETorHEAD_base so not expected in response from proxy
exp_hdrs.pop('X-Put-Timestamp')
self.assertIn('X-Timestamp', resp.headers)
actual_timestamp = resp.headers.pop('X-Timestamp')
exp_timestamp = exp_hdrs.pop('X-Timestamp')
self.assertEqual(Timestamp(exp_timestamp),
Timestamp(actual_timestamp))
exp_hdrs.update(extra_hdrs)
exp_hdrs.update(
{'X-Storage-Policy': 'zero', # added in container controller
'Content-Length':
str(len(json.dumps(exp_shards).encode('ascii'))),
}
)
# we expect this header to be removed by proxy
exp_hdrs.pop('X-Backend-Override-Shard-Name-Filter', None)
for ignored in ('x-account-container-count', 'x-object-meta-test',
'x-delete-at', 'etag', 'x-works'):
# FakeConn adds these
resp.headers.pop(ignored, None)
self.assertEqual(exp_hdrs, resp.headers)
def _capture_backend_request(self, req, resp_status, resp_body,
resp_extra_hdrs, num_resp=1):
self.assertGreater(num_resp, 0) # sanity check
resp_hdrs = dict(self.root_resp_hdrs)
resp_hdrs.update(resp_extra_hdrs)
resp_status = [resp_status] * num_resp
with mocked_http_conn(
*resp_status, body_iter=[resp_body] * num_resp,
headers=[resp_hdrs] * num_resp) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp_status[0], resp.status_int)
self.assertEqual(num_resp, len(fake_conn.requests))
return fake_conn.requests[0], resp
def _check_backend_req(self, req, backend_req, extra_params=None,
extra_hdrs=None):
self.assertEqual('a/c', backend_req['path'][7:])
expected_params = {'states': 'listing', 'format': 'json'}
if extra_params:
expected_params.update(extra_params)
if six.PY2:
backend_params = dict(urllib.parse.parse_qsl(
backend_req['qs'], True))
else:
backend_params = dict(urllib.parse.parse_qsl(
backend_req['qs'], True, encoding='latin1'))
self.assertEqual(expected_params, backend_params)
backend_hdrs = backend_req['headers']
self.assertIsNotNone(backend_hdrs.pop('Referer', None))
self.assertIsNotNone(backend_hdrs.pop('X-Timestamp', None))
self.assertTrue(backend_hdrs.pop('User-Agent', '').startswith(
'proxy-server'))
expected_headers = {
'Connection': 'close',
'Host': 'localhost:80',
'X-Trans-Id': req.headers['X-Trans-Id']}
if extra_hdrs:
expected_headers.update(extra_hdrs)
self.assertEqual(expected_headers, backend_hdrs)
for k, v in expected_headers.items():
self.assertIn(k, backend_hdrs)
self.assertEqual(v, backend_hdrs.get(k))
def _setup_shard_range_stubs(self):
self.memcache = FakeMemcache()
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
shard_ranges = [
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
for lower, upper in shard_bounds]
self.sr_dicts = [dict(sr) for sr in shard_ranges]
self._stub_shards_dump = json.dumps(self.sr_dicts).encode('ascii')
self.root_resp_hdrs = {
'Accept-Ranges': 'bytes',
'Content-Type': 'application/json',
'Last-Modified': 'Thu, 01 Jan 1970 00:00:03 GMT',
'X-Backend-Timestamp': '2',
'X-Backend-Put-Timestamp': '3',
'X-Backend-Delete-Timestamp': '0',
'X-Backend-Status-Changed-At': '0',
'X-Timestamp': '2',
'X-Put-Timestamp': '3',
'X-Container-Object-Count': '6',
'X-Container-Bytes-Used': '12',
'X-Backend-Storage-Policy-Index': '0'}
def _do_test_caching(self, record_type, exp_recheck_listing):
# this test gest shard ranges into cache and then reads from cache
sharding_state = 'sharded'
self.memcache.delete_all()
self.memcache.clear_calls()
# container is sharded but proxy does not have that state cached;
# expect a backend request and expect shard ranges to be cached
self.memcache.clear_calls()
req = self._build_request({'X-Backend-Record-Type': record_type},
{'states': 'listing'}, {})
backend_req, resp = self._capture_backend_request(
req, 200, self._stub_shards_dump,
{'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state,
'X-Backend-Override-Shard-Name-Filter': 'true'})
self._check_backend_req(
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': record_type,
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
self._check_response(resp, self.sr_dicts, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[('get', 'container/a/c', None, None),
('set', 'shard-listing/a/c', self.sr_dicts,
exp_recheck_listing),
('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual(self.sr_dicts, self.memcache.calls[1][2])
self.assertEqual(sharding_state,
self.memcache.calls[2][2]['sharding_state'])
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
self.assertEqual(tuple(self.sr_dicts),
req.environ['swift.infocache']['shard-listing/a/c'])
# container is sharded and proxy does have that state cached and
# also has shard ranges cached; expect a read from cache
self.memcache.clear_calls()
req = self._build_request({'X-Backend-Record-Type': record_type},
{'states': 'listing'}, {})
resp = req.get_response(self.app)
self._check_response(resp, self.sr_dicts, {
'X-Backend-Cached-Results': 'true',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[('get', 'container/a/c', None, None),
('get', 'shard-listing/a/c', None, None)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
self.assertEqual(tuple(self.sr_dicts),
req.environ['swift.infocache']['shard-listing/a/c'])
# delete the container; check that shard ranges are evicted from cache
self.memcache.clear_calls()
infocache = {}
req = Request.blank('/v1/a/c', method='DELETE')
req.environ['swift.cache'] = self.memcache
req.environ['swift.infocache'] = infocache
self._capture_backend_request(req, 204, b'', {},
num_resp=self.CONTAINER_REPLICAS)
self.assertEqual(
[('delete', 'container/a/c', None, None),
('delete', 'shard-listing/a/c', None, None)],
self.memcache.calls)
def test_GET_shard_ranges(self):
self._setup_shard_range_stubs()
# expect shard ranges cache time to be default value of 600
self._do_test_caching('shard', 600)
# expect shard ranges cache time to be configured value of 120
self.app.recheck_listing_shard_ranges = 120
self._do_test_caching('shard', 120)
def mock_get_from_shards(self, req, resp):
# for the purposes of these tests we override _get_from_shards so
# that the response contains the shard listing even though the
# record_type is 'auto'; these tests are verifying the content and
# caching of the backend shard range response so we're not
# interested in gathering object from the shards
return resp
with mock.patch('swift.proxy.controllers.container.'
'ContainerController._get_from_shards',
mock_get_from_shards):
self.app.recheck_listing_shard_ranges = 600
self._do_test_caching('auto', 600)
def test_GET_shard_ranges_404_response(self):
# pre-warm cache with container info but not shard ranges so that the
# backend request tries to get a cacheable listing, but backend 404's
self._setup_shard_range_stubs()
self.memcache.delete_all()
info = headers_to_container_info(self.root_resp_hdrs)
info['status'] = 200
info['sharding_state'] = 'sharded'
self.memcache.set('container/a/c', info)
self.memcache.clear_calls()
req = self._build_request({'X-Backend-Record-Type': 'shard'},
{'states': 'listing'}, {})
backend_req, resp = self._capture_backend_request(
req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS)
self._check_backend_req(
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
self.assertNotIn('X-Backend-Cached-Results', resp.headers)
# Note: container metadata is updated in cache but shard ranges are not
# deleted from cache
self.assertEqual(
[('get', 'container/a/c', None, None),
('get', 'shard-listing/a/c', None, None),
('set', 'container/a/c', mock.ANY, 6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][2]['status'])
self.assertEqual(b'', resp.body)
self.assertEqual(404, resp.status_int)
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
# pre-warm cache with container metadata and shard ranges and verify
# that shard range listing are read from cache when appropriate
self.memcache.delete_all()
info = headers_to_container_info(self.root_resp_hdrs)
info['status'] = 200
info['sharding_state'] = 'sharded'
self.memcache.set('container/a/c', info)
self.memcache.set('shard-listing/a/c', self.sr_dicts)
self.memcache.clear_calls()
req_hdrs = {'X-Backend-Record-Type': record_type}
req = self._build_request(req_hdrs, params, {})
resp = req.get_response(self.app)
self.assertEqual(
[('get', 'container/a/c', None, None),
('get', 'shard-listing/a/c', None, None)],
self.memcache.calls)
return resp
def test_GET_shard_ranges_read_from_cache(self):
self._setup_shard_range_stubs()
exp_hdrs = {'X-Backend-Cached-Results': 'true',
'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'}
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing'}, 'shard')
self._check_response(resp, self.sr_dicts, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'reverse': 'true'}, 'shard')
exp_shards = list(self.sr_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam'}, 'shard')
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'shard')
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'includes': 'egg'}, 'shard')
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
# override _get_from_shards so that the response contains the shard
# listing that we want to verify even though the record_type is 'auto'
def mock_get_from_shards(self, req, resp):
return resp
with mock.patch('swift.proxy.controllers.container.'
'ContainerController._get_from_shards',
mock_get_from_shards):
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'reverse': 'true'}, 'auto')
exp_shards = list(self.sr_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam'}, 'auto')
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'auto')
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'includes': 'egg'}, 'auto')
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
def _do_test_GET_shard_ranges_write_to_cache(self, params, record_type):
# verify that shard range listing are written to cache when appropriate
self.memcache.delete_all()
self.memcache.clear_calls()
# set request up for cacheable listing
req_hdrs = {'X-Backend-Record-Type': record_type}
req = self._build_request(req_hdrs, params, {})
# response indicates cacheable listing
resp_hdrs = {'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'}
backend_req, resp = self._capture_backend_request(
req, 200, self._stub_shards_dump, resp_hdrs)
self._check_backend_req(
req, backend_req,
extra_params=params,
extra_hdrs={'X-Backend-Record-Type': record_type,
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
expected_hdrs.update(resp_hdrs)
self.assertEqual(
[('get', 'container/a/c', None, None),
('set', 'shard-listing/a/c', self.sr_dicts, 600),
('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
# shards were cached
self.assertEqual(self.sr_dicts, self.memcache.calls[1][2])
self.assertEqual('sharded',
self.memcache.calls[2][2]['sharding_state'])
return resp
def test_GET_shard_ranges_write_to_cache(self):
self._setup_shard_range_stubs()
exp_hdrs = {'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'}
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing'}, 'shard')
self._check_response(resp, self.sr_dicts, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'reverse': 'true'}, 'shard')
exp_shards = list(self.sr_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam'}, 'shard')
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'shard')
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'includes': 'egg'}, 'shard')
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
# override _get_from_shards so that the response contains the shard
# listing that we want to verify even though the record_type is 'auto'
def mock_get_from_shards(self, req, resp):
return resp
with mock.patch('swift.proxy.controllers.container.'
'ContainerController._get_from_shards',
mock_get_from_shards):
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'reverse': 'true'}, 'auto')
exp_shards = list(self.sr_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam'}, 'auto')
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'auto')
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'includes': 'egg'}, 'auto')
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
def test_GET_shard_ranges_write_to_cache_with_x_newest(self):
# when x-newest is sent, verify that there is no cache lookup to check
# sharding state but then backend requests are made requesting complete
# shard list which can be cached
self._setup_shard_range_stubs()
self.memcache.delete_all()
self.memcache.clear_calls()
req_hdrs = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true'}
params = {'states': 'listing'}
req = self._build_request(req_hdrs, params, {})
resp_hdrs = {'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'}
backend_req, resp = self._capture_backend_request(
req, 200, self._stub_shards_dump, resp_hdrs,
num_resp=2 * self.CONTAINER_REPLICAS)
self._check_backend_req(
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
expected_hdrs.update(resp_hdrs)
self._check_response(resp, self.sr_dicts, expected_hdrs)
self.assertEqual(
[('set', 'shard-listing/a/c', self.sr_dicts, 600),
('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual(self.sr_dicts, self.memcache.calls[0][2])
self.assertEqual('sharded',
self.memcache.calls[1][2]['sharding_state'])
def _do_test_GET_shard_ranges_no_cache_write(self, resp_hdrs):
# verify that there is a cache lookup to check container info but then
# a backend request is made requesting complete shard list, but do not
# expect shard ranges to be cached; check that marker, end_marker etc
# are passed to backend
self.memcache.clear_calls()
req = self._build_request(
{'X-Backend-Record-Type': 'shard'},
{'states': 'listing', 'marker': 'egg', 'end_marker': 'jam',
'reverse': 'true'}, {})
resp_shards = self.sr_dicts[:2]
resp_shards.reverse()
backend_req, resp = self._capture_backend_request(
req, 200, json.dumps(resp_shards).encode('ascii'),
resp_hdrs)
self._check_backend_req(
req, backend_req,
extra_params={'marker': 'egg', 'end_marker': 'jam',
'reverse': 'true'},
extra_hdrs={'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
expected_shards = self.sr_dicts[:2]
expected_shards.reverse()
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
expected_hdrs.update(resp_hdrs)
self._check_response(resp, expected_shards, expected_hdrs)
# container metadata is looked up in memcache for sharding state
# container metadata is set in memcache
self.assertEqual(
[('get', 'container/a/c', None, None),
('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
self.memcache.calls[1][2]['sharding_state'])
self.memcache.delete_all()
def test_GET_shard_ranges_no_cache_write_with_cached_container_info(self):
# pre-warm cache with container info, but verify that shard range cache
# lookup is only attempted when the cached sharding state and status
# are suitable, and full set of headers can be constructed from cache;
# Note: backend response has state unsharded so no shard ranges cached
self._setup_shard_range_stubs()
def do_test(info):
self._setup_shard_range_stubs()
self.memcache.set('container/a/c', info)
# expect the same outcomes as if there was no cached container info
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'unsharded'})
# setup a default 'good' info
info = headers_to_container_info(self.root_resp_hdrs)
info['status'] = 200
info['sharding_state'] = 'sharded'
do_test(dict(info, status=404))
do_test(dict(info, sharding_state='unsharded'))
do_test(dict(info, sharding_state='sharding'))
do_test(dict(info, sharding_state='collapsed'))
do_test(dict(info, sharding_state='unexpected'))
stale_info = dict(info)
stale_info.pop('created_at')
do_test(stale_info)
stale_info = dict(info)
stale_info.pop('put_timestamp')
do_test(stale_info)
stale_info = dict(info)
stale_info.pop('delete_timestamp')
do_test(stale_info)
stale_info = dict(info)
stale_info.pop('status_changed_at')
do_test(stale_info)
def test_GET_shard_ranges_no_cache_write_for_non_sharded_states(self):
# verify that shard ranges are not written to cache when container
# state returned by backend is not 'sharded'; we don't expect
# 'X-Backend-Override-Shard-Name-Filter': 'true' to be returned unless
# the sharding state is 'sharded' but include it in this test to check
# that the state is checked by proxy controller
self._setup_shard_range_stubs()
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'unsharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharding'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'collapsed'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'unexpected'})
def test_GET_shard_ranges_no_cache_write_for_incomplete_listing(self):
# verify that shard ranges are not written to cache when container
# response does not acknowledge x-backend-override-shard-name-filter
# e.g. container server not upgraded
self._setup_shard_range_stubs()
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': 'sharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'false',
'X-Backend-Sharding-State': 'sharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'rogue',
'X-Backend-Sharding-State': 'sharded'})
def test_GET_shard_ranges_no_cache_write_for_object_listing(self):
# verify that shard ranges are not written to cache when container
# response does not return shard ranges
self._setup_shard_range_stubs()
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'object',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'other',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Record-Type': 'true',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'})
self._do_test_GET_shard_ranges_no_cache_write(
{'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'})
def _do_test_GET_shard_ranges_bad_response_body(self, resp_body):
# verify that resp body is not cached if shard range parsing fails;
# check the original unparseable response body is returned
self._setup_shard_range_stubs()
self.memcache.clear_calls()
req = self._build_request(
{'X-Backend-Record-Type': 'shard'},
{'states': 'listing'}, {})
resp_hdrs = {'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'true',
'X-Backend-Sharding-State': 'sharded'}
backend_req, resp = self._capture_backend_request(
req, 200, json.dumps(resp_body).encode('ascii'),
resp_hdrs)
self._check_backend_req(
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
expected_hdrs.update(resp_hdrs)
self._check_response(resp, resp_body, expected_hdrs)
# container metadata is looked up in memcache for sharding state
# container metadata is set in memcache
self.assertEqual(
[('get', 'container/a/c', None, None),
('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
self.memcache.calls[1][2]['sharding_state'])
self.memcache.delete_all()
def test_GET_shard_ranges_bad_response_body(self):
self._do_test_GET_shard_ranges_bad_response_body(
{'bad': 'data', 'not': ' a list'})
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines), error_lines)
self.assertIn('Problem with listing response', error_lines[0])
self.logger.clear()
self._do_test_GET_shard_ranges_bad_response_body(
[{'not': ' a shard range'}])
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines), error_lines)
self.assertIn('Failed to get shard ranges', error_lines[0])
self.logger.clear()
self._do_test_GET_shard_ranges_bad_response_body(
'not json')
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines), error_lines)
self.assertIn('Problem with listing response', error_lines[0])
def _do_test_GET_shards_no_cache(self, sharding_state, req_params,
req_hdrs=None):
# verify that a shard GET request does not lookup in cache or attempt
# to cache shard ranges fetched from backend
self.memcache.delete_all()
self.memcache.clear_calls()
req_params.update(dict(marker='egg', end_marker='jam'))
hdrs = {'X-Backend-Record-Type': 'shard'}
if req_hdrs:
hdrs.update(req_hdrs)
req = self._build_request(hdrs, req_params, {})
resp_shards = self.sr_dicts[:2]
backend_req, resp = self._capture_backend_request(
req, 200, json.dumps(resp_shards).encode('ascii'),
{'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self._check_backend_req(
req, backend_req, extra_hdrs=hdrs, extra_params=req_params)
expected_shards = self.sr_dicts[:2]
self._check_response(resp, expected_shards, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
# container metadata from backend response is set in memcache
self.assertEqual(
[('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual(sharding_state,
self.memcache.calls[0][2]['sharding_state'])
def test_GET_shard_ranges_no_cache_recheck_listing_shard_ranges(self):
# verify that a GET for shards does not lookup or store in cache when
# cache expiry time is set to zero
self._setup_shard_range_stubs()
self.app.recheck_listing_shard_ranges = 0
self._do_test_GET_shards_no_cache('unsharded', {'states': 'listing'})
self._do_test_GET_shards_no_cache('sharding', {'states': 'listing'})
self._do_test_GET_shards_no_cache('sharded', {'states': 'listing'})
self._do_test_GET_shards_no_cache('collapsed', {'states': 'listing'})
self._do_test_GET_shards_no_cache('unexpected', {'states': 'listing'})
def test_GET_shard_ranges_no_cache_when_requesting_updating_shards(self):
# verify that a GET for shards in updating states does not lookup or
# store in cache
self._setup_shard_range_stubs()
self._do_test_GET_shards_no_cache('unsharded', {'states': 'updating'})
self._do_test_GET_shards_no_cache('sharding', {'states': 'updating'})
self._do_test_GET_shards_no_cache('sharded', {'states': 'updating'})
self._do_test_GET_shards_no_cache('collapsed', {'states': 'updating'})
self._do_test_GET_shards_no_cache('unexpected', {'states': 'updating'})
def test_GET_shard_ranges_no_cache_when_include_deleted_shards(self):
# verify that a GET for shards in listing states does not lookup or
# store in cache if x-backend-include-deleted is true
self._setup_shard_range_stubs()
self._do_test_GET_shards_no_cache(
'unsharded', {'states': 'listing'},
{'X-Backend-Include-Deleted': 'true'})
self._do_test_GET_shards_no_cache(
'sharding', {'states': 'listing'},
{'X-Backend-Include-Deleted': 'true'})
self._do_test_GET_shards_no_cache(
'sharded', {'states': 'listing'},
{'X-Backend-Include-Deleted': 'true'})
self._do_test_GET_shards_no_cache(
'collapsed', {'states': 'listing'},
{'X-Backend-Include-Deleted': 'true'})
self._do_test_GET_shards_no_cache(
'unexpected', {'states': 'listing'},
{'X-Backend-Include-Deleted': 'true'})
def test_GET_objects_makes_no_cache_lookup(self):
# verify that an object GET request does not lookup container metadata
# in cache
self._setup_shard_range_stubs()
self.memcache.delete_all()
self.memcache.clear_calls()
req_hdrs = {'X-Backend-Record-Type': 'object'}
# we would not expect states=listing to be used with an object request
# but include it here to verify that it is ignored
req = self._build_request(req_hdrs, {'states': 'listing'}, {})
resp_body = json.dumps(['object listing']).encode('ascii')
backend_req, resp = self._capture_backend_request(
req, 200, resp_body,
{'X-Backend-Record-Type': 'object',
'X-Backend-Sharding-State': 'sharded'})
self._check_backend_req(
req, backend_req,
extra_hdrs=req_hdrs)
self._check_response(resp, ['object listing'], {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'object',
'X-Backend-Sharding-State': 'sharded'})
# container metadata from backend response is set in memcache
self.assertEqual(
[('set', 'container/a/c', mock.ANY, 60)],
self.memcache.calls)
self.assertEqual('sharded',
self.memcache.calls[0][2]['sharding_state'])
def test_GET_shard_ranges_no_memcache_available(self):
self._setup_shard_range_stubs()
self.memcache.clear_calls()
hdrs = {'X-Backend-Record-Type': 'shard'}
params = {'states': 'listing'}
req = self._build_request(hdrs, params, {})
req.environ['swift.cache'] = None
backend_req, resp = self._capture_backend_request(
req, 200, self._stub_shards_dump,
{'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': 'sharded'})
self._check_backend_req(
req, backend_req, extra_params=params, extra_hdrs=hdrs)
expected_shards = self.sr_dicts
self._check_response(resp, expected_shards, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': 'sharded'})
self.assertEqual([], self.memcache.calls) # sanity check
def test_cache_clearing(self):
# verify that both metadata and shard ranges are purged form memcache
# on PUT, POST and DELETE
def do_test(method, resp_status, num_resp):
self.assertGreater(num_resp, 0) # sanity check
memcache = FakeMemcache()
cont_key = get_cache_key('a', 'c')
shard_key = get_cache_key('a', 'c', shard='listing')
memcache.set(cont_key, 'container info', 60)
memcache.set(shard_key, 'shard ranges', 600)
req = Request.blank('/v1/a/c', method=method)
req.environ['swift.cache'] = memcache
self.assertIn(cont_key, req.environ['swift.cache'].store)
self.assertIn(shard_key, req.environ['swift.cache'].store)
resp_status = [resp_status] * num_resp
with mocked_http_conn(
*resp_status, body_iter=[b''] * num_resp,
headers=[{}] * num_resp):
resp = req.get_response(self.app)
self.assertEqual(resp_status[0], resp.status_int)
self.assertNotIn(cont_key, req.environ['swift.cache'].store)
self.assertNotIn(shard_key, req.environ['swift.cache'].store)
do_test('DELETE', 204, self.CONTAINER_REPLICAS)
do_test('POST', 204, self.CONTAINER_REPLICAS)
do_test('PUT', 202, self.CONTAINER_REPLICAS)
def test_GET_bad_requests(self):
# verify that the proxy controller enforces checks on request params
req = Request.blank(
'/v1/a/c?limit=%d' % (CONTAINER_LISTING_LIMIT + 1))
self.assertEqual(412, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?delimiter=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?marker=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?end_marker=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?prefix=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?format=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?path=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?includes=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
req = Request.blank('/v1/a/c?states=%ff')
self.assertEqual(400, req.get_response(self.app).status_int)
@patch_policies(
[StoragePolicy(0, 'zero', True, object_ring=FakeRing(replicas=4))])

View File

@ -622,6 +622,23 @@ class TestProxyServerConfiguration(unittest.TestCase):
set(app.cors_expose_headers))
self.assertFalse(app.strict_cors_mode)
def test_memcache_recheck_options(self):
# check default options
app = self._make_app({})
self.assertEqual(app.recheck_account_existence, 60)
self.assertEqual(app.recheck_container_existence, 60)
self.assertEqual(app.recheck_updating_shard_ranges, 3600)
self.assertEqual(app.recheck_listing_shard_ranges, 600)
# check custom options
app = self._make_app({'recheck_account_existence': '30',
'recheck_container_existence': '40',
'recheck_updating_shard_ranges': '1800',
'recheck_listing_shard_ranges': ' 900'})
self.assertEqual(app.recheck_account_existence, 30)
self.assertEqual(app.recheck_container_existence, 40)
self.assertEqual(app.recheck_updating_shard_ranges, 1800)
self.assertEqual(app.recheck_listing_shard_ranges, 900)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestProxyServer(unittest.TestCase):