Deprecate expirer options
The following configuration options are deprecated: * expiring_objects_container_divisor * expiring_objects_account_name The upstream maintainers are not aware of any clusters where these have been configured to non-default values. UpgradeImpact: Operators are encouraged to remove their "container_divisor" setting and use the default value of 86400. If a cluster was deployed with a non-standard "account_name", operators should remove the option from all configs so they are using a supported configuration going forward, but will need to deploy stand-alone expirer processes with legacy expirer config to clean-up old expiration tasks from the previously configured account name. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Jianjian Huo <jhuo@nvidia.com> Change-Id: I5ea9e6dc8b44c8c5f55837debe24dd76be7d6248
This commit is contained in:
parent
ae6300af86
commit
b69a2bef45
doc
manpages
source/config
etc
swift
test/unit
common
obj
proxy
@ -211,8 +211,6 @@ This is normally \fBegg:swift#proxy_logging\fR. See proxy-server.conf-sample for
|
||||
.RS 3
|
||||
.IP \fBinterval\fR
|
||||
Replaces run_pause with the more standard "interval", which means the replicator won't pause unless it takes less than the interval set. The default is 300.
|
||||
.IP \fBexpiring_objects_account_name\fR
|
||||
The default is 'expiring_objects'.
|
||||
.IP \fBreport_interval\fR
|
||||
The default is 300 seconds.
|
||||
.IP \fBrequest_tries\fR
|
||||
|
@ -86,11 +86,6 @@ Whether or not check if the devices are mounted to prevent accidentally writing
|
||||
the root device. The default is set to true.
|
||||
.IP \fBdisable_fallocate\fR
|
||||
Disable pre-allocate disk space for a file. The default is false.
|
||||
.IP \fBexpiring_objects_container_divisor\fR
|
||||
The default is 86400.
|
||||
.IP \fBexpiring_objects_account_name\fR
|
||||
Account name used for legacy style expirer task queue.
|
||||
The default is 'expiring_objects'.
|
||||
.IP \fBservers_per_port\fR
|
||||
Make object-server run this many worker processes per unique port of "local"
|
||||
ring devices across all storage policies. The default value of 0 disables this
|
||||
|
@ -96,10 +96,6 @@ disabled by default.
|
||||
.IP \fBkey_file\fR
|
||||
Location of the SSL certificate key file. The default path is /etc/swift/proxy.key. This is
|
||||
disabled by default.
|
||||
.IP \fBexpiring_objects_container_divisor\fR
|
||||
The default is 86400.
|
||||
.IP \fBexpiring_objects_account_name\fR
|
||||
The default is 'expiring_objects'.
|
||||
.IP \fBlog_name\fR
|
||||
Label used when logging. The default is swift.
|
||||
.IP \fBlog_facility\fR
|
||||
|
@ -703,8 +703,8 @@ interval 300 Time in seconds to
|
||||
report_interval 300 Frequency of status logs in seconds.
|
||||
concurrency 1 Level of concurrency to use to do the work,
|
||||
this value must be set to at least 1
|
||||
expiring_objects_account_name expiring_objects name for legacy expirer task queue
|
||||
dequeue_from_legacy False This service will look for jobs on the legacy expirer task queue.
|
||||
dequeue_from_legacy False This service will look for jobs on the
|
||||
legacy expirer task queue.
|
||||
round_robin_task_cache_size 100000 Number of tasks objects to cache before processing.
|
||||
processes 0 How many parts to divide the legacy work into,
|
||||
one part per process that will be doing the work.
|
||||
|
@ -124,8 +124,6 @@ disallowed_sections swift.valid_api_versions Allows the abili
|
||||
public calls to /info. You can
|
||||
withhold subsections by separating
|
||||
the dict level with a ".".
|
||||
expiring_objects_container_divisor 86400
|
||||
expiring_objects_account_name expiring_objects
|
||||
nice_priority None Scheduling priority of server
|
||||
processes.
|
||||
Niceness values range from -20 (most
|
||||
|
@ -39,7 +39,6 @@
|
||||
|
||||
[object-expirer]
|
||||
# interval = 300.0
|
||||
# expiring_objects_account_name = expiring_objects
|
||||
# report_interval = 300.0
|
||||
#
|
||||
# request_tries is the number of times the expirer's internal client will
|
||||
|
@ -9,8 +9,6 @@ bind_port = 6200
|
||||
# devices = /srv/node
|
||||
# mount_check = true
|
||||
# disable_fallocate = false
|
||||
# expiring_objects_container_divisor = 86400
|
||||
# expiring_objects_account_name = expiring_objects
|
||||
#
|
||||
# Use an integer to override the number of pre-forked processes that will
|
||||
# accept connections. NOTE: if servers_per_port is set, this setting is
|
||||
|
@ -37,8 +37,6 @@ bind_port = 8080
|
||||
# cert_file = /etc/swift/proxy.crt
|
||||
# key_file = /etc/swift/proxy.key
|
||||
#
|
||||
# expiring_objects_container_divisor = 86400
|
||||
# expiring_objects_account_name = expiring_objects
|
||||
#
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
|
@ -362,18 +362,19 @@ from swift.common.utils import get_logger, config_true_value, \
|
||||
override_bytes_from_content_type, split_path, \
|
||||
RateLimitedIterator, quote, closing_if_possible, \
|
||||
LRUCache, StreamingPile, strict_b64decode, Timestamp, friendly_close, \
|
||||
get_expirer_container, md5
|
||||
md5
|
||||
from swift.common.registry import register_swift_info
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
|
||||
get_container_update_override_key, update_ignore_range_header, \
|
||||
get_param, get_valid_part_num
|
||||
from swift.common.constraints import check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common.constraints import check_utf8
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
|
||||
make_pre_authed_request
|
||||
from swift.common.middleware.bulk import get_response_body, \
|
||||
ACCEPTABLE_FORMATS, Bulk
|
||||
from swift.obj import expirer
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
|
||||
|
||||
@ -1322,11 +1323,7 @@ class StaticLargeObject(object):
|
||||
delete_concurrency=delete_concurrency,
|
||||
logger=self.logger)
|
||||
|
||||
prefix = AUTO_CREATE_ACCOUNT_PREFIX
|
||||
self.expiring_objects_account = prefix + (
|
||||
conf.get('expiring_objects_account_name') or 'expiring_objects')
|
||||
self.expiring_objects_container_divisor = int(
|
||||
conf.get('expiring_objects_container_divisor', 86400))
|
||||
self.expirer_config = expirer.ExpirerConfig(conf, logger=self.logger)
|
||||
|
||||
def handle_multipart_get_or_head(self, req, start_response):
|
||||
"""
|
||||
@ -1774,13 +1771,14 @@ class StaticLargeObject(object):
|
||||
ts = req.ensure_x_timestamp()
|
||||
expirer_jobs = make_delete_jobs(
|
||||
wsgi_to_str(account), segment_container, segment_objects, ts)
|
||||
expirer_cont = get_expirer_container(
|
||||
ts, self.expiring_objects_container_divisor,
|
||||
wsgi_to_str(account), wsgi_to_str(container), wsgi_to_str(obj))
|
||||
expiring_objects_account, expirer_cont = \
|
||||
self.expirer_config.get_expirer_account_and_container(
|
||||
ts, wsgi_to_str(account), wsgi_to_str(container),
|
||||
wsgi_to_str(obj))
|
||||
enqueue_req = make_pre_authed_request(
|
||||
req.environ,
|
||||
method='UPDATE',
|
||||
path="/v1/%s/%s" % (self.expiring_objects_account, expirer_cont),
|
||||
path="/v1/%s/%s" % (expiring_objects_account, expirer_cont),
|
||||
body=json.dumps(expirer_jobs),
|
||||
headers={'Content-Type': 'application/json',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
|
@ -2928,16 +2928,6 @@ def clean_content_type(value):
|
||||
return value
|
||||
|
||||
|
||||
def get_expirer_container(x_delete_at, expirer_divisor, acc, cont, obj):
|
||||
"""
|
||||
Returns an expiring object container name for given X-Delete-At and
|
||||
(native string) a/c/o.
|
||||
"""
|
||||
shard_int = int(hash_path(acc, cont, obj), 16) % 100
|
||||
return normalize_delete_at_timestamp(
|
||||
int(x_delete_at) // expirer_divisor * expirer_divisor - shard_int)
|
||||
|
||||
|
||||
class _MultipartMimeFileLikeObject(object):
|
||||
|
||||
def __init__(self, wsgi_input, boundary, input_buffer, read_chunk_size):
|
||||
|
@ -27,10 +27,11 @@ from eventlet.greenpool import GreenPool
|
||||
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common.daemon import Daemon, run_daemon
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common import utils
|
||||
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
||||
Timestamp, config_true_value, normalize_delete_at_timestamp, \
|
||||
RateLimitedIterator, md5, non_negative_float, non_negative_int, \
|
||||
parse_content_type, parse_options
|
||||
parse_content_type, parse_options, config_positive_int_value
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||
HTTP_PRECONDITION_FAILED
|
||||
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
|
||||
@ -41,6 +42,117 @@ MAX_OBJECTS_TO_CACHE = 100000
|
||||
X_DELETE_TYPE = 'text/plain'
|
||||
ASYNC_DELETE_TYPE = 'application/async-deleted'
|
||||
|
||||
# expiring_objects_account_name used to be a supported configuration across
|
||||
# proxy/expirer configs, but AUTO_CREATE_ACCOUNT_PREFIX is configured in
|
||||
# swift.conf constraints; neither should be changed
|
||||
EXPIRER_ACCOUNT_NAME = AUTO_CREATE_ACCOUNT_PREFIX + 'expiring_objects'
|
||||
# Most clusters use the default "expiring_objects_container_divisor" of 86400
|
||||
EXPIRER_CONTAINER_DIVISOR = 86400
|
||||
EXPIRER_CONTAINER_PER_DIVISOR = 100
|
||||
|
||||
|
||||
class ExpirerConfig(object):
|
||||
|
||||
def __init__(self, conf, container_ring=None, logger=None):
|
||||
"""
|
||||
Read the configurable object-expirer values consistently and issue
|
||||
warnings appropriately when we encounter deprecated options.
|
||||
|
||||
This class is used in multiple contexts on proxy and object servers.
|
||||
|
||||
:param conf: a config dictionary
|
||||
:param container_ring: optional, required in proxy context to lookup
|
||||
task container (part, nodes)
|
||||
:param logger: optional, will create one from the conf if not given
|
||||
"""
|
||||
logger = logger or get_logger(conf)
|
||||
if 'expiring_objects_container_divisor' in conf:
|
||||
logger.warning(
|
||||
'expiring_objects_container_divisor is deprecated')
|
||||
expirer_divisor = config_positive_int_value(
|
||||
conf['expiring_objects_container_divisor'])
|
||||
else:
|
||||
expirer_divisor = EXPIRER_CONTAINER_DIVISOR
|
||||
|
||||
if 'expiring_objects_account_name' in conf:
|
||||
logger.warning(
|
||||
'expiring_objects_account_name is deprecated; you need '
|
||||
'to migrate to the standard .expiring_objects account')
|
||||
account_name = (AUTO_CREATE_ACCOUNT_PREFIX +
|
||||
conf['expiring_objects_account_name'])
|
||||
else:
|
||||
account_name = EXPIRER_ACCOUNT_NAME
|
||||
self.account_name = account_name
|
||||
self.expirer_divisor = expirer_divisor
|
||||
self.task_container_per_day = EXPIRER_CONTAINER_PER_DIVISOR
|
||||
if self.task_container_per_day >= self.expirer_divisor:
|
||||
msg = 'expiring_objects_container_divisor MUST be greater than 100'
|
||||
if self.expirer_divisor != 86400:
|
||||
msg += '; expiring_objects_container_divisor (%s) SHOULD be ' \
|
||||
'default value of %d' \
|
||||
% (self.expirer_divisor, EXPIRER_CONTAINER_DIVISOR)
|
||||
raise ValueError(msg)
|
||||
self.container_ring = container_ring
|
||||
|
||||
def get_expirer_container(self, x_delete_at, acc, cont, obj):
|
||||
"""
|
||||
Returns an expiring object task container name for given X-Delete-At
|
||||
and (native string) a/c/o.
|
||||
"""
|
||||
# offset backwards from the expected day is a hash of size "per day"
|
||||
shard_int = (int(utils.hash_path(acc, cont, obj), 16) %
|
||||
self.task_container_per_day)
|
||||
# even though the attr is named "task_container_per_day" it's actually
|
||||
# "task_container_per_divisor" if for some reason the deprecated config
|
||||
# "expirer_divisor" option doesn't have the default value of 86400
|
||||
return normalize_delete_at_timestamp(
|
||||
int(x_delete_at) // self.expirer_divisor *
|
||||
self.expirer_divisor - shard_int)
|
||||
|
||||
def get_expirer_account_and_container(self, x_delete_at, acc, cont, obj):
|
||||
"""
|
||||
Calculates the expected expirer account and container for the target
|
||||
given the current configuration.
|
||||
|
||||
:returns: a tuple, (account_name, task_container)
|
||||
"""
|
||||
task_container = self.get_expirer_container(
|
||||
x_delete_at, acc, cont, obj)
|
||||
return self.account_name, task_container
|
||||
|
||||
def is_expected_task_container(self, task_container_int):
|
||||
"""
|
||||
Validate the task_container timestamp as an expected value given the
|
||||
current configuration. Changing the expirer configuration will lead to
|
||||
orphaned x-delete-at task objects on overwrite, which may stick around
|
||||
a whole reclaim age.
|
||||
|
||||
:params task_container_int: an int, all task_containers are expected
|
||||
to be integer timestamps
|
||||
|
||||
:returns: a boolean, True if name fits with the given config
|
||||
"""
|
||||
# calculate seconds offset into previous divisor window
|
||||
r = (task_container_int - 1) % self.expirer_divisor
|
||||
# seconds offset should be no more than task_container_per_day i.e.
|
||||
# given % 86400, r==86359 is ok (because 41 is less than 100), but
|
||||
# 49768 would be unexpected
|
||||
return self.expirer_divisor - r <= self.task_container_per_day
|
||||
|
||||
def get_delete_at_nodes(self, x_delete_at, acc, cont, obj):
|
||||
"""
|
||||
Get the task_container part, nodes, and name.
|
||||
|
||||
:returns: a tuple, (part, nodes, task_container_name)
|
||||
"""
|
||||
if not self.container_ring:
|
||||
raise RuntimeError('%s was not created with container_ring' % self)
|
||||
account_name, task_container = self.get_expirer_account_and_container(
|
||||
x_delete_at, acc, cont, obj)
|
||||
part, nodes = self.container_ring.get_nodes(
|
||||
account_name, task_container)
|
||||
return part, nodes, task_container
|
||||
|
||||
|
||||
def build_task_obj(timestamp, target_account, target_container,
|
||||
target_obj, high_precision=False):
|
||||
@ -157,6 +269,7 @@ class ObjectExpirer(Daemon):
|
||||
self.logger = logger or get_logger(conf, log_route=self.log_route)
|
||||
self.interval = float(conf.get('interval') or 300)
|
||||
self.tasks_per_second = float(conf.get('tasks_per_second', 50.0))
|
||||
self.expirer_config = ExpirerConfig(conf, logger=self.logger)
|
||||
|
||||
self.conf_path = \
|
||||
self.conf.get('__file__') or '/etc/swift/object-expirer.conf'
|
||||
@ -301,29 +414,44 @@ class ObjectExpirer(Daemon):
|
||||
only one expirer.
|
||||
"""
|
||||
if self.processes > 0:
|
||||
yield self.expiring_objects_account, self.process, self.processes
|
||||
yield (self.expirer_config.account_name,
|
||||
self.process, self.processes)
|
||||
else:
|
||||
yield self.expiring_objects_account, 0, 1
|
||||
yield self.expirer_config.account_name, 0, 1
|
||||
|
||||
def delete_at_time_of_task_container(self, task_container):
|
||||
def get_task_containers_to_expire(self, task_account):
|
||||
"""
|
||||
get delete_at timestamp from task_container name
|
||||
"""
|
||||
# task_container name is timestamp
|
||||
return Timestamp(task_container)
|
||||
|
||||
def iter_task_containers_to_expire(self, task_account):
|
||||
"""
|
||||
Yields task_container names under the task_account if the delete at
|
||||
Collects task_container names under the task_account if the delete at
|
||||
timestamp of task_container is past.
|
||||
"""
|
||||
container_list = []
|
||||
unexpected_task_containers = {
|
||||
'examples': [],
|
||||
'count': 0,
|
||||
}
|
||||
for c in self.swift.iter_containers(task_account,
|
||||
prefix=self.task_container_prefix):
|
||||
task_container = str(c['name'])
|
||||
timestamp = self.delete_at_time_of_task_container(task_container)
|
||||
if timestamp > Timestamp.now():
|
||||
try:
|
||||
task_container_int = int(Timestamp(c['name']))
|
||||
except ValueError:
|
||||
self.logger.error('skipping invalid task container: %s/%s',
|
||||
task_account, c['name'])
|
||||
continue
|
||||
if not self.expirer_config.is_expected_task_container(
|
||||
task_container_int):
|
||||
unexpected_task_containers['count'] += 1
|
||||
if unexpected_task_containers['count'] < 5:
|
||||
unexpected_task_containers['examples'].append(c['name'])
|
||||
if task_container_int > Timestamp.now():
|
||||
break
|
||||
yield task_container
|
||||
container_list.append(str(task_container_int))
|
||||
|
||||
if unexpected_task_containers['count']:
|
||||
self.logger.info(
|
||||
'processing %s unexpected task containers (e.g. %s)',
|
||||
unexpected_task_containers['count'],
|
||||
' '.join(unexpected_task_containers['examples']))
|
||||
return container_list
|
||||
|
||||
def get_delay_reaping(self, target_account, target_container):
|
||||
return get_delay_reaping(self.delay_reaping_times, target_account,
|
||||
@ -470,7 +598,7 @@ class ObjectExpirer(Daemon):
|
||||
|
||||
task_account_container_list = \
|
||||
[(task_account, task_container) for task_container in
|
||||
self.iter_task_containers_to_expire(task_account)]
|
||||
self.get_task_containers_to_expire(task_account)]
|
||||
|
||||
# delete_task_iter is a generator to yield a dict of
|
||||
# task_account, task_container, task_object, delete_timestamp,
|
||||
|
@ -31,7 +31,7 @@ from eventlet.greenthread import spawn
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, config_percent_value, timing_stats, replication, \
|
||||
normalize_delete_at_timestamp, get_log_line, Timestamp, \
|
||||
get_expirer_container, parse_mime_headers, \
|
||||
parse_mime_headers, \
|
||||
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
|
||||
config_auto_int_value, split_path, get_redirect_data, \
|
||||
normalize_timestamp, md5, parse_options, CooperativeIterator
|
||||
@ -44,7 +44,7 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
|
||||
ChunkReadError, DiskFileXattrNotSupported
|
||||
from swift.common.request_helpers import resolve_ignore_range_header, \
|
||||
OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX
|
||||
from swift.obj import ssync_receiver
|
||||
from swift.obj import ssync_receiver, expirer
|
||||
from swift.common.http import is_success, HTTP_MOVED_PERMANENTLY
|
||||
from swift.common.base_storage_server import BaseStorageServer
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
@ -181,10 +181,7 @@ class ObjectController(BaseStorageServer):
|
||||
self.allowed_headers.add(header)
|
||||
|
||||
self.auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
|
||||
self.expiring_objects_account = self.auto_create_account_prefix + \
|
||||
(conf.get('expiring_objects_account_name') or 'expiring_objects')
|
||||
self.expiring_objects_container_divisor = \
|
||||
int(conf.get('expiring_objects_container_divisor') or 86400)
|
||||
self.expirer_config = expirer.ExpirerConfig(conf, logger=self.logger)
|
||||
# Initialization was successful, so now apply the network chunk size
|
||||
# parameter as the default read / write buffer size for the network
|
||||
# sockets.
|
||||
@ -462,11 +459,9 @@ class ObjectController(BaseStorageServer):
|
||||
if config_true_value(
|
||||
request.headers.get('x-backend-replication', 'f')):
|
||||
return
|
||||
delete_at = normalize_delete_at_timestamp(delete_at)
|
||||
updates = [(None, None)]
|
||||
|
||||
partition = None
|
||||
hosts = contdevices = [None]
|
||||
delete_at = normalize_delete_at_timestamp(delete_at)
|
||||
|
||||
headers_in = request.headers
|
||||
headers_out = HeaderKeyDict({
|
||||
# system accounts are always Policy-0
|
||||
@ -474,26 +469,42 @@ class ObjectController(BaseStorageServer):
|
||||
'x-timestamp': request.timestamp.internal,
|
||||
'x-trans-id': headers_in.get('x-trans-id', '-'),
|
||||
'referer': request.as_referer()})
|
||||
|
||||
expiring_objects_account_name, delete_at_container = \
|
||||
self.expirer_config.get_expirer_account_and_container(
|
||||
delete_at, account, container, obj)
|
||||
if op != 'DELETE':
|
||||
hosts = headers_in.get('X-Delete-At-Host', None)
|
||||
if hosts is None:
|
||||
# If header is missing, no update needed as sufficient other
|
||||
# object servers should perform the required update.
|
||||
return
|
||||
delete_at_container = headers_in.get('X-Delete-At-Container', None)
|
||||
if not delete_at_container:
|
||||
# older proxy servers did not send X-Delete-At-Container so for
|
||||
# backwards compatibility calculate the value here, but also
|
||||
# log a warning because this is prone to inconsistent
|
||||
# expiring_objects_container_divisor configurations.
|
||||
# See https://bugs.launchpad.net/swift/+bug/1187200
|
||||
self.logger.warning(
|
||||
'X-Delete-At-Container header must be specified for '
|
||||
'expiring objects background %s to work properly. Making '
|
||||
'best guess as to the container name for now.' % op)
|
||||
delete_at_container = get_expirer_container(
|
||||
delete_at, self.expiring_objects_container_divisor,
|
||||
account, container, obj)
|
||||
|
||||
proxy_delete_at_container = headers_in.get(
|
||||
'X-Delete-At-Container', None)
|
||||
if delete_at_container != proxy_delete_at_container:
|
||||
if not proxy_delete_at_container:
|
||||
# We carry this warning around for pre-2013 proxies
|
||||
self.logger.warning(
|
||||
'X-Delete-At-Container header must be specified for '
|
||||
'expiring objects background %s to work properly. '
|
||||
'Making best guess as to the container name '
|
||||
'for now.', op)
|
||||
proxy_delete_at_container = delete_at_container
|
||||
else:
|
||||
# Inconsistent configuration may lead to orphaned expirer
|
||||
# task queue objects when X-Delete-At is updated, which can
|
||||
# stick around for a whole reclaim age.
|
||||
self.logger.debug(
|
||||
'Proxy X-Delete-At-Container %r does not match '
|
||||
'expected %r for current expirer_config.',
|
||||
proxy_delete_at_container, delete_at_container)
|
||||
# it's not possible to say which is "more correct", this will
|
||||
# at least match the host/part/device
|
||||
delete_at_container = normalize_delete_at_timestamp(
|
||||
proxy_delete_at_container)
|
||||
|
||||
# new updates need to enqueue new x-delete-at
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
updates = [upd for upd in
|
||||
@ -512,23 +523,13 @@ class ObjectController(BaseStorageServer):
|
||||
request.headers.get(
|
||||
'X-Backend-Clean-Expiring-Object-Queue', 't')):
|
||||
return
|
||||
|
||||
# DELETEs of old expiration data have no way of knowing what the
|
||||
# old X-Delete-At-Container was at the time of the initial setting
|
||||
# of the data, so a best guess is made here.
|
||||
# Worst case is a DELETE is issued now for something that doesn't
|
||||
# exist there and the original data is left where it is, where
|
||||
# it will be ignored when the expirer eventually tries to issue the
|
||||
# object DELETE later since the X-Delete-At value won't match up.
|
||||
delete_at_container = get_expirer_container(
|
||||
delete_at, self.expiring_objects_container_divisor,
|
||||
account, container, obj)
|
||||
delete_at_container = normalize_delete_at_timestamp(
|
||||
delete_at_container)
|
||||
# DELETE op always go directly to async_pending
|
||||
partition = None
|
||||
updates = [(None, None)]
|
||||
|
||||
for host, contdevice in updates:
|
||||
self.async_update(
|
||||
op, self.expiring_objects_account, delete_at_container,
|
||||
op, expiring_objects_account_name, delete_at_container,
|
||||
build_task_obj(delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice,
|
||||
policy)
|
||||
|
@ -42,7 +42,7 @@ from eventlet.timeout import Timeout
|
||||
from swift.common.utils import (
|
||||
clean_content_type, config_true_value, ContextPool, csv_append,
|
||||
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
|
||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||
normalize_delete_at_timestamp, public,
|
||||
document_iters_to_http_response_body, parse_content_range,
|
||||
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
|
||||
NamespaceBoundList, CooperativeIterator)
|
||||
@ -630,13 +630,10 @@ class BaseObjectController(Controller):
|
||||
|
||||
append_log_info(req.environ, 'x-delete-at:%s' % x_delete_at)
|
||||
|
||||
delete_at_container = get_expirer_container(
|
||||
x_delete_at, self.app.expiring_objects_container_divisor,
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
|
||||
delete_at_part, delete_at_nodes = \
|
||||
self.app.container_ring.get_nodes(
|
||||
self.app.expiring_objects_account, delete_at_container)
|
||||
delete_at_part, delete_at_nodes, delete_at_container = \
|
||||
self.app.expirer_config.get_delete_at_nodes(
|
||||
x_delete_at, self.account_name, self.container_name,
|
||||
self.object_name)
|
||||
|
||||
return req, delete_at_container, delete_at_part, delete_at_nodes
|
||||
|
||||
|
@ -51,6 +51,7 @@ from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
wsgi_to_str
|
||||
from swift.common.exceptions import APIVersionError
|
||||
from swift.common.wsgi import run_wsgi
|
||||
from swift.obj import expirer
|
||||
|
||||
|
||||
# List of entry points for mandatory middlewares.
|
||||
@ -264,10 +265,8 @@ class Application(object):
|
||||
config_true_value(conf.get('account_autocreate', 'no'))
|
||||
self.auto_create_account_prefix = \
|
||||
constraints.AUTO_CREATE_ACCOUNT_PREFIX
|
||||
self.expiring_objects_account = self.auto_create_account_prefix + \
|
||||
(conf.get('expiring_objects_account_name') or 'expiring_objects')
|
||||
self.expiring_objects_container_divisor = \
|
||||
int(conf.get('expiring_objects_container_divisor') or 86400)
|
||||
self.expirer_config = expirer.ExpirerConfig(
|
||||
conf, container_ring=self.container_ring, logger=self.logger)
|
||||
self.max_containers_per_account = \
|
||||
int(conf.get('max_containers_per_account') or 0)
|
||||
self.max_containers_whitelist = [
|
||||
|
@ -33,7 +33,7 @@ from swift.common.swob import Request, HTTPException, str_to_wsgi, \
|
||||
bytes_to_wsgi
|
||||
from swift.common.utils import quote, closing_if_possible, close_if_possible, \
|
||||
parse_content_type, iter_multipart_mime_documents, parse_mime_headers, \
|
||||
Timestamp, get_expirer_container, md5
|
||||
Timestamp, md5
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
@ -1609,8 +1609,8 @@ class TestSloDeleteManifest(SloTestCase):
|
||||
def test_handle_async_delete_whole(self):
|
||||
self.slo.allow_async_delete = True
|
||||
now = Timestamp(time.time())
|
||||
exp_obj_cont = get_expirer_container(
|
||||
int(now), 86400, 'AUTH_test', 'deltest', 'man-all-there')
|
||||
exp_obj_cont = self.slo.expirer_config.get_expirer_container(
|
||||
int(now), 'AUTH_test', 'deltest', 'man-all-there')
|
||||
self.app.register(
|
||||
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
|
||||
swob.HTTPNoContent, {}, None)
|
||||
@ -1663,8 +1663,8 @@ class TestSloDeleteManifest(SloTestCase):
|
||||
unicode_acct = u'AUTH_test-un\u00efcode'
|
||||
wsgi_acct = bytes_to_wsgi(unicode_acct.encode('utf-8'))
|
||||
now = Timestamp(time.time())
|
||||
exp_obj_cont = get_expirer_container(
|
||||
int(now), 86400, unicode_acct, 'deltest', 'man-all-there')
|
||||
exp_obj_cont = self.slo.expirer_config.get_expirer_container(
|
||||
int(now), unicode_acct, 'deltest', 'man-all-there')
|
||||
self.app.register(
|
||||
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
|
||||
swob.HTTPNoContent, {}, None)
|
||||
@ -1737,8 +1737,8 @@ class TestSloDeleteManifest(SloTestCase):
|
||||
unicode_acct = u'AUTH_test-un\u00efcode'
|
||||
wsgi_acct = bytes_to_wsgi(unicode_acct.encode('utf-8'))
|
||||
now = Timestamp(time.time())
|
||||
exp_obj_cont = get_expirer_container(
|
||||
int(now), 86400, unicode_acct, u'\N{SNOWMAN}', 'same-container')
|
||||
exp_obj_cont = self.slo.expirer_config.get_expirer_container(
|
||||
int(now), unicode_acct, u'\N{SNOWMAN}', 'same-container')
|
||||
self.app.register(
|
||||
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
|
||||
swob.HTTPNoContent, {}, None)
|
||||
@ -1802,6 +1802,32 @@ class TestSloDeleteManifest(SloTestCase):
|
||||
'storage_policy_index': 0},
|
||||
])
|
||||
|
||||
def test_handle_async_delete_alternative_expirer_config(self):
|
||||
# Test that SLO async delete operation will send UPDATE requests to the
|
||||
# alternative expirer container when using a non-default account name
|
||||
# and container divisor.
|
||||
slo_conf = {
|
||||
'expiring_objects_account_name': 'exp',
|
||||
'expiring_objects_container_divisor': '5400',
|
||||
}
|
||||
self.slo = slo.filter_factory(slo_conf)(self.app)
|
||||
now = Timestamp(time.time())
|
||||
exp_obj_cont = self.slo.expirer_config.get_expirer_container(
|
||||
int(now), 'AUTH_test', 'deltest', 'man-all-there')
|
||||
self.app.register(
|
||||
'UPDATE', '/v1/.exp/%s' % exp_obj_cont,
|
||||
swob.HTTPNoContent, {}, None)
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/deltest/man-all-there',
|
||||
method='DELETE')
|
||||
with patch('swift.common.utils.Timestamp.now', return_value=now):
|
||||
self.slo.handle_async_delete(req)
|
||||
self.assertEqual([
|
||||
('GET', '/v1/AUTH_test/deltest/man-all-there'
|
||||
'?multipart-manifest=get'),
|
||||
('UPDATE', '/v1/.exp/%s' % exp_obj_cont),
|
||||
], self.app.calls)
|
||||
|
||||
def test_handle_async_delete_nested(self):
|
||||
self.slo.allow_async_delete = True
|
||||
req = Request.blank(
|
||||
|
@ -4213,16 +4213,6 @@ class TestParseContentDisposition(unittest.TestCase):
|
||||
self.assertEqual(attrs, {'name': 'somefile', 'filename': 'test.html'})
|
||||
|
||||
|
||||
class TestGetExpirerContainer(unittest.TestCase):
|
||||
|
||||
@mock.patch.object(utils, 'hash_path', return_value=hex(101)[2:])
|
||||
def test_get_expirer_container(self, mock_hash_path):
|
||||
container = utils.get_expirer_container(1234, 20, 'a', 'c', 'o')
|
||||
self.assertEqual(container, '0000001219')
|
||||
container = utils.get_expirer_container(1234, 200, 'a', 'c', 'o')
|
||||
self.assertEqual(container, '0000001199')
|
||||
|
||||
|
||||
class TestIterMultipartMimeDocuments(unittest.TestCase):
|
||||
|
||||
def test_bad_start(self):
|
||||
|
@ -32,6 +32,7 @@ from swift.common import internal_client, utils, swob
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.swob import Response
|
||||
from swift.obj import expirer, diskfile
|
||||
from swift.obj.expirer import ExpirerConfig
|
||||
|
||||
|
||||
def not_random():
|
||||
@ -114,6 +115,166 @@ class FakeInternalClient(object):
|
||||
)
|
||||
|
||||
|
||||
class TestExpirerConfig(TestCase):
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
|
||||
@mock.patch('swift.obj.expirer.utils.hash_path', return_value=hex(101)[2:])
|
||||
def test_get_expirer_container(self, mock_hash_path):
|
||||
expirer_config = ExpirerConfig(
|
||||
{'expiring_objects_container_divisor': 200}, logger=self.logger)
|
||||
container = expirer_config.get_expirer_container(
|
||||
12340, 'a', 'c', 'o')
|
||||
self.assertEqual(container, '0000012199')
|
||||
expirer_config = ExpirerConfig(
|
||||
{'expiring_objects_container_divisor': 2000}, logger=self.logger)
|
||||
container = expirer_config.get_expirer_container(
|
||||
12340, 'a', 'c', 'o')
|
||||
self.assertEqual(container, '0000011999')
|
||||
|
||||
def test_is_expected_task_container(self):
|
||||
expirer_config = ExpirerConfig({}, logger=self.logger)
|
||||
self.assertEqual('.expiring_objects', expirer_config.account_name)
|
||||
self.assertEqual(86400, expirer_config.expirer_divisor)
|
||||
self.assertEqual(100, expirer_config.task_container_per_day)
|
||||
self.assertFalse(expirer_config.is_expected_task_container(172801))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(172800))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(172799))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(172701))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(172700))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(86401))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(86400))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(86399))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(86301))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(86300))
|
||||
|
||||
expirer_config = ExpirerConfig({
|
||||
'expiring_objects_container_divisor': 1000,
|
||||
}, logger=self.logger)
|
||||
self.assertEqual('.expiring_objects', expirer_config.account_name)
|
||||
self.assertEqual(1000, expirer_config.expirer_divisor)
|
||||
self.assertEqual(100, expirer_config.task_container_per_day)
|
||||
self.assertFalse(expirer_config.is_expected_task_container(2001))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(2000))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(1999))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(1901))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(1900))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(1001))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(1000))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(999))
|
||||
self.assertTrue(expirer_config.is_expected_task_container(901))
|
||||
self.assertFalse(expirer_config.is_expected_task_container(900))
|
||||
|
||||
def test_get_expirer_container_legacy_config(self):
|
||||
per_divisor = 100
|
||||
expirer_config = ExpirerConfig({
|
||||
'expiring_objects_container_divisor': 86400 * 2,
|
||||
}, logger=self.logger)
|
||||
delete_at = time()
|
||||
found = set()
|
||||
for i in range(per_divisor * 10):
|
||||
c = expirer_config.get_expirer_container(
|
||||
delete_at, 'a', 'c', 'obj%s' % i)
|
||||
found.add(c)
|
||||
self.assertEqual(per_divisor, len(found))
|
||||
|
||||
def test_get_expirer_config_default(self):
|
||||
conf = {}
|
||||
config = ExpirerConfig(conf, logger=self.logger)
|
||||
self.assertEqual('.expiring_objects', config.account_name)
|
||||
self.assertEqual(86400, config.expirer_divisor)
|
||||
self.assertEqual(100, config.task_container_per_day)
|
||||
self.assertFalse(self.logger.all_log_lines())
|
||||
|
||||
def test_get_expirer_config_legacy(self):
|
||||
conf = {
|
||||
'expiring_objects_account_name': 'exp',
|
||||
'expiring_objects_container_divisor': '1000',
|
||||
}
|
||||
config = ExpirerConfig(conf, logger=self.logger)
|
||||
self.assertEqual('.exp', config.account_name)
|
||||
self.assertEqual(1000, config.expirer_divisor)
|
||||
self.assertEqual(100, config.task_container_per_day)
|
||||
self.assertEqual([
|
||||
'expiring_objects_container_divisor is deprecated',
|
||||
'expiring_objects_account_name is deprecated; you need to '
|
||||
'migrate to the standard .expiring_objects account',
|
||||
], self.logger.get_lines_for_level('warning'))
|
||||
|
||||
def test_get_expirer_config_legacy_no_logger_given(self):
|
||||
# verify that a logger is constructed from conf if not given
|
||||
conf = {
|
||||
'expiring_objects_account_name': 'exp',
|
||||
'expiring_objects_container_divisor': '1000',
|
||||
'log_route': 'test',
|
||||
}
|
||||
|
||||
with mock.patch(
|
||||
'swift.obj.expirer.get_logger', return_value=self.logger
|
||||
) as mock_get_logger:
|
||||
config = ExpirerConfig(conf, logger=None)
|
||||
self.assertEqual('.exp', config.account_name)
|
||||
self.assertEqual(1000, config.expirer_divisor)
|
||||
self.assertEqual(100, config.task_container_per_day)
|
||||
self.assertEqual([
|
||||
'expiring_objects_container_divisor is deprecated',
|
||||
'expiring_objects_account_name is deprecated; you need to '
|
||||
'migrate to the standard .expiring_objects account',
|
||||
], self.logger.get_lines_for_level('warning'))
|
||||
self.assertEqual([mock.call(conf)], mock_get_logger.call_args_list)
|
||||
|
||||
def test_get_expirer_account_and_container_default(self):
|
||||
expirer_config = ExpirerConfig({}, logger=self.logger)
|
||||
delete_at = time()
|
||||
account, container = \
|
||||
expirer_config.get_expirer_account_and_container(
|
||||
delete_at, 'a', 'c', 'o')
|
||||
self.assertEqual('.expiring_objects', account)
|
||||
self.assertTrue(expirer_config.is_expected_task_container(
|
||||
int(container)))
|
||||
|
||||
def test_get_expirer_account_and_container_legacy(self):
|
||||
expirer_config = ExpirerConfig({
|
||||
'expiring_objects_account_name': 'exp',
|
||||
'expiring_objects_container_divisor': 1000,
|
||||
}, logger=self.logger)
|
||||
delete_at = time()
|
||||
account, container = expirer_config.get_expirer_account_and_container(
|
||||
delete_at, 'a', 'c', 'o')
|
||||
self.assertEqual('.exp', account)
|
||||
self.assertEqual(1000, expirer_config.expirer_divisor)
|
||||
self.assertEqual(100, expirer_config.task_container_per_day)
|
||||
self.assertTrue(expirer_config.is_expected_task_container(
|
||||
int(container)))
|
||||
|
||||
def test_get_delete_at_nodes(self):
|
||||
container_ring = FakeRing()
|
||||
# it seems default FakeRing is very predictable
|
||||
self.assertEqual(32, container_ring._part_shift)
|
||||
self.assertEqual(3, container_ring.replicas)
|
||||
self.assertEqual(3, len(container_ring.devs))
|
||||
expirer_config = ExpirerConfig(
|
||||
{}, logger=self.logger, container_ring=container_ring)
|
||||
delete_at = time()
|
||||
part, nodes, task_container = expirer_config.get_delete_at_nodes(
|
||||
delete_at, 'a', 'c', 'o2')
|
||||
self.assertEqual(0, part) # only one part
|
||||
self.assertEqual([
|
||||
dict(n, index=i) for i, n in enumerate(container_ring.devs)
|
||||
], nodes) # assigned to all ring devices
|
||||
self.assertTrue(expirer_config.is_expected_task_container(
|
||||
int(task_container)))
|
||||
|
||||
def test_get_delete_at_nodes_no_ring(self):
|
||||
expirer_config = ExpirerConfig({}, logger=self.logger)
|
||||
delete_at = time()
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
expirer_config.get_delete_at_nodes(
|
||||
delete_at, 'a', 'c', 'o2')
|
||||
self.assertIn('ExpirerConfig', str(ctx.exception))
|
||||
self.assertIn('container_ring', str(ctx.exception))
|
||||
|
||||
|
||||
class TestExpirerHelpers(TestCase):
|
||||
|
||||
def test_add_expirer_bytes_to_ctype(self):
|
||||
@ -225,14 +386,15 @@ class TestObjectExpirer(TestCase):
|
||||
internal_client = None
|
||||
|
||||
def get_expirer_container(self, delete_at, target_account='a',
|
||||
target_container='c', target_object='o',
|
||||
expirer_divisor=86400):
|
||||
target_container='c', target_object='o'):
|
||||
# the actual target a/c/o used only matters for consistent
|
||||
# distribution, tests typically only create one task container per-day,
|
||||
# but we want the task container names to be realistic
|
||||
return utils.get_expirer_container(
|
||||
delete_at, expirer_divisor,
|
||||
target_account, target_container, target_object)
|
||||
expirer = getattr(self, 'expirer', None)
|
||||
expirer_config = expirer.expirer_config if expirer else \
|
||||
ExpirerConfig(self.conf, self.logger)
|
||||
return expirer_config.get_expirer_container(
|
||||
delete_at, target_account, target_container, target_object)
|
||||
|
||||
def setUp(self):
|
||||
global not_sleep
|
||||
@ -250,9 +412,11 @@ class TestObjectExpirer(TestCase):
|
||||
self.now = now = int(time())
|
||||
|
||||
self.empty_time = str(now - 864000)
|
||||
self.empty_time_container = self.get_expirer_container(self.empty_time)
|
||||
self.empty_time_container = self.get_expirer_container(
|
||||
self.empty_time)
|
||||
self.past_time = str(now - 86400)
|
||||
self.past_time_container = self.get_expirer_container(self.past_time)
|
||||
self.past_time_container = self.get_expirer_container(
|
||||
self.past_time)
|
||||
self.just_past_time = str(now - 1)
|
||||
self.just_past_time_container = self.get_expirer_container(
|
||||
self.just_past_time)
|
||||
@ -260,7 +424,7 @@ class TestObjectExpirer(TestCase):
|
||||
self.future_time_container = self.get_expirer_container(
|
||||
self.future_time)
|
||||
# Dummy task queue for test
|
||||
self.fake_swift = FakeInternalClient({
|
||||
self._setup_fake_swift({
|
||||
'.expiring_objects': {
|
||||
# this task container will be checked
|
||||
self.empty_time_container: [],
|
||||
@ -285,8 +449,6 @@ class TestObjectExpirer(TestCase):
|
||||
self.future_time_container: [
|
||||
self.future_time + '-a11/c11/o11']}
|
||||
})
|
||||
self.expirer = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=self.fake_swift)
|
||||
|
||||
# map of times to target object paths which should be expirerd now
|
||||
self.expired_target_paths = {
|
||||
@ -303,6 +465,12 @@ class TestObjectExpirer(TestCase):
|
||||
],
|
||||
}
|
||||
|
||||
def _setup_fake_swift(self, aco_dict):
|
||||
self.fake_swift = FakeInternalClient(aco_dict)
|
||||
self.expirer = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=self.fake_swift)
|
||||
self.expirer_config = self.expirer.expirer_config
|
||||
|
||||
def make_fake_ic(self, app):
|
||||
app._pipeline_final_app = mock.MagicMock()
|
||||
return internal_client.InternalClient(None, 'fake-ic', 1, app=app)
|
||||
@ -320,7 +488,7 @@ class TestObjectExpirer(TestCase):
|
||||
use_replication_network=True,
|
||||
global_conf={'log_name': 'object-expirer-ic'})])
|
||||
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
|
||||
self.assertEqual(x.expiring_objects_account, '.expiring_objects')
|
||||
self.assertEqual(x.expirer_config.account_name, '.expiring_objects')
|
||||
self.assertIs(x.swift, self.fake_swift)
|
||||
|
||||
def test_init_default_round_robin_cache_default(self):
|
||||
@ -1054,13 +1222,6 @@ class TestObjectExpirer(TestCase):
|
||||
results = [_ for _ in x.iter_task_accounts_to_expire()]
|
||||
self.assertEqual(results, [('.expiring_objects', 1, 2)])
|
||||
|
||||
def test_delete_at_time_of_task_container(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=self.fake_swift)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('0000'), 0)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('0001'), 1)
|
||||
self.assertEqual(x.delete_at_time_of_task_container('1000'), 1000)
|
||||
|
||||
def test_run_once_nothing_to_do(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
|
||||
swift=self.fake_swift)
|
||||
@ -1123,6 +1284,102 @@ class TestObjectExpirer(TestCase):
|
||||
'Pass completed in 0s; 0 objects expired',
|
||||
])
|
||||
|
||||
def test_get_task_containers_unexpected_container(self):
|
||||
expected = self.get_expirer_container(time())
|
||||
unexpected = str(int(expected) - 200)
|
||||
for name in (expected, unexpected):
|
||||
self.assertTrue(name.isdigit()) # sanity
|
||||
|
||||
container_list = [{'name': unexpected}, {'name': expected}]
|
||||
with mock.patch.object(self.expirer.swift, 'iter_containers',
|
||||
return_value=container_list):
|
||||
self.assertEqual(
|
||||
self.expirer.get_task_containers_to_expire('task_account'),
|
||||
[unexpected, expected])
|
||||
self.assertEqual(self.expirer.logger.all_log_lines(), {'info': [
|
||||
'processing 1 unexpected task containers (e.g. %s)' % unexpected,
|
||||
]})
|
||||
|
||||
def test_get_task_containers_invalid_container(self):
|
||||
ok_names = ['86301', '86400']
|
||||
bad_names = ['-1', 'rogue']
|
||||
unexpected = ['86300', '86401']
|
||||
|
||||
container_list = [{'name': name} for name in bad_names] + \
|
||||
[{'name': name} for name in ok_names] + \
|
||||
[{'name': name} for name in unexpected]
|
||||
with mock.patch.object(self.expirer.swift, 'iter_containers',
|
||||
return_value=container_list):
|
||||
self.assertEqual(
|
||||
self.expirer.get_task_containers_to_expire('task_account'),
|
||||
ok_names + unexpected)
|
||||
lines = self.expirer.logger.get_lines_for_level('error')
|
||||
self.assertEqual(lines, [
|
||||
'skipping invalid task container: task_account/-1',
|
||||
'skipping invalid task container: task_account/rogue',
|
||||
])
|
||||
lines = self.expirer.logger.get_lines_for_level('info')
|
||||
self.assertEqual(lines, [
|
||||
'processing 2 unexpected task containers (e.g. 86300 86401)'
|
||||
])
|
||||
|
||||
def _expirer_run_once_with_mocks(self, now=None, stub_pop_queue=None):
|
||||
"""
|
||||
call self.expirer.run_once() with some things (optionally) stubbed out
|
||||
"""
|
||||
now = now or time()
|
||||
# IME abuse of MagicMock's call tracking will pop OOM
|
||||
memory_efficient_noop = lambda *args, **kwargs: None
|
||||
stub_pop_queue = stub_pop_queue or memory_efficient_noop
|
||||
memory_efficient_time = lambda: now
|
||||
with mock.patch.object(self.expirer, 'pop_queue', stub_pop_queue), \
|
||||
mock.patch('eventlet.sleep', memory_efficient_noop), \
|
||||
mock.patch('swift.common.utils.timestamp.time.time',
|
||||
memory_efficient_time), \
|
||||
mock.patch('swift.obj.expirer.time', memory_efficient_time):
|
||||
self.expirer.run_once()
|
||||
|
||||
def test_run_once_with_invalid_container(self):
|
||||
now = time()
|
||||
t0 = Timestamp(now - 100000)
|
||||
t1 = Timestamp(now - 10000)
|
||||
normal_task_container = self.get_expirer_container(t0)
|
||||
self.assertTrue(normal_task_container.isdigit())
|
||||
next_task_container = self.get_expirer_container(t1)
|
||||
for name in (normal_task_container, next_task_container):
|
||||
self.assertTrue(name.isdigit()) # sanity
|
||||
|
||||
strange_task_container = normal_task_container + '-crazy'
|
||||
self.assertFalse(strange_task_container.isdigit())
|
||||
|
||||
task_per_container = 3
|
||||
self._setup_fake_swift({
|
||||
'.expiring_objects': {
|
||||
normal_task_container: [
|
||||
expirer.build_task_obj(t0, 'a', 'c1', 'o%s' % i)
|
||||
for i in range(task_per_container)
|
||||
],
|
||||
strange_task_container: [
|
||||
expirer.build_task_obj(t0, 'a', 'c2', 'o%s' % i)
|
||||
for i in range(task_per_container)
|
||||
],
|
||||
next_task_container: [
|
||||
expirer.build_task_obj(t1, 'a', 'c3', 'o%s' % i)
|
||||
for i in range(task_per_container)
|
||||
],
|
||||
}
|
||||
})
|
||||
# sanity
|
||||
self.assertEqual(
|
||||
sorted(self.expirer.swift.aco_dict['.expiring_objects'].keys()), [
|
||||
normal_task_container,
|
||||
strange_task_container,
|
||||
next_task_container,
|
||||
])
|
||||
self._expirer_run_once_with_mocks(now=now)
|
||||
# we processed all tasks in all valid containers
|
||||
self.assertEqual(task_per_container * 2, self.expirer.report_objects)
|
||||
|
||||
def test_iter_task_to_expire(self):
|
||||
# In this test, all tasks are assigned to the tested expirer
|
||||
my_index = 0
|
||||
|
@ -39,6 +39,7 @@ from eventlet.green.http import client as http_client
|
||||
|
||||
from swift import __version__ as swift_version
|
||||
from swift.common.http import is_success
|
||||
from swift.obj.expirer import ExpirerConfig
|
||||
from test import listen_zero, BaseTestCase
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import mocked_http_conn, \
|
||||
@ -1388,10 +1389,10 @@ class TestObjectController(BaseTestCase):
|
||||
def _update_delete_at_headers(self, headers, a='a', c='c', o='o',
|
||||
node_count=1):
|
||||
delete_at = headers['X-Delete-At']
|
||||
delete_at_container = utils.get_expirer_container(delete_at, 84600,
|
||||
a, c, o)
|
||||
part, nodes = self.container_ring.get_nodes(
|
||||
'.expiring_objects', delete_at_container)
|
||||
expirer_config = ExpirerConfig(
|
||||
self.conf, logger=self.logger, container_ring=self.container_ring)
|
||||
part, nodes, delete_at_container = expirer_config.get_delete_at_nodes(
|
||||
delete_at, a, c, o)
|
||||
# proxy assigns each replica a node, index 0 for test stability
|
||||
nodes = nodes[:node_count]
|
||||
headers.update({
|
||||
@ -5769,7 +5770,7 @@ class TestObjectController(BaseTestCase):
|
||||
self.object_controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.object_controller.logger)
|
||||
policy = random.choice(list(POLICIES))
|
||||
self.object_controller.expiring_objects_account = 'exp'
|
||||
self.object_controller.expirer_config.account_name = 'exp'
|
||||
|
||||
http_connect_args = []
|
||||
|
||||
@ -6806,9 +6807,10 @@ class TestObjectController(BaseTestCase):
|
||||
self.object_controller.delete_at_update(
|
||||
'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', policy)
|
||||
expiring_obj_container = given_args.pop(2)
|
||||
expected_exp_cont = utils.get_expirer_container(
|
||||
utils.normalize_delete_at_timestamp(12345678901),
|
||||
86400, 'a', 'c', 'o')
|
||||
expected_exp_cont = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
utils.normalize_delete_at_timestamp(12345678901),
|
||||
'a', 'c', 'o')
|
||||
self.assertEqual(expiring_obj_container, expected_exp_cont)
|
||||
|
||||
self.assertEqual(given_args, [
|
||||
@ -6886,6 +6888,9 @@ class TestObjectController(BaseTestCase):
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
# proxy servers started sending the x-delete-at-container along with
|
||||
# host/part/device in 2013 Ia0081693f01631d3f2a59612308683e939ced76a
|
||||
# it may be no longer necessary to say "warning: upgrade faster"
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('warning'),
|
||||
['X-Delete-At-Container header must be specified for expiring '
|
||||
@ -6906,6 +6911,60 @@ class TestObjectController(BaseTestCase):
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
'sda1', policy])
|
||||
|
||||
def test_delete_at_update_put_with_info_but_wrong_container(self):
|
||||
# Same as test_delete_at_update_put_with_info, but the
|
||||
# X-Delete-At-Container is "wrong"
|
||||
policy = random.choice(list(POLICIES))
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.logger = self.logger
|
||||
delete_at = time()
|
||||
req_headers = {
|
||||
'X-Timestamp': 1,
|
||||
'X-Trans-Id': '1234',
|
||||
'X-Delete-At': delete_at,
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
}
|
||||
self._update_delete_at_headers(req_headers)
|
||||
delete_at = str(int(time() + 30))
|
||||
expected_container = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
delete_at, 'a', 'c', 'o')
|
||||
unexpected_container = str(int(delete_at) + 100)
|
||||
req_headers['X-Delete-At-Container'] = unexpected_container
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers=req_headers)
|
||||
self.object_controller.delete_at_update('PUT', delete_at,
|
||||
'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
self.assertEqual({'debug': [
|
||||
"Proxy X-Delete-At-Container '%s' does not match expected "
|
||||
"'%s' for current expirer_config." % (unexpected_container,
|
||||
expected_container)
|
||||
]}, self.logger.all_log_lines())
|
||||
self.assertEqual(
|
||||
given_args, [
|
||||
'PUT', '.expiring_objects', unexpected_container,
|
||||
'%s-a/c/o' % delete_at,
|
||||
req_headers['X-Delete-At-Host'],
|
||||
req_headers['X-Delete-At-Partition'],
|
||||
req_headers['X-Delete-At-Device'], HeaderKeyDict({
|
||||
# the .expiring_objects account is always policy-0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-size': '0',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain',
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
'sda1', policy])
|
||||
|
||||
def test_delete_at_update_put_with_info_but_missing_host(self):
|
||||
# Same as test_delete_at_update_put_with_info, but just
|
||||
# missing the X-Delete-At-Host header.
|
||||
@ -6948,8 +7007,9 @@ class TestObjectController(BaseTestCase):
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.logger = self.logger
|
||||
delete_at_container = utils.get_expirer_container(
|
||||
'1', 84600, 'a', 'c', 'o')
|
||||
delete_at_container = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
'1', 'a', 'c', 'o')
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
@ -7773,10 +7833,9 @@ class TestObjectController(BaseTestCase):
|
||||
def test_extra_headers_contain_object_bytes(self):
|
||||
timestamp1 = next(self.ts).normal
|
||||
delete_at_timestamp1 = int(time() + 1000)
|
||||
delete_at_container1 = str(
|
||||
delete_at_timestamp1 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
delete_at_container1 = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
delete_at_timestamp1, 'a', 'c', 'o')
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp1,
|
||||
@ -7861,8 +7920,9 @@ class TestObjectController(BaseTestCase):
|
||||
|
||||
policy = random.choice(list(POLICIES))
|
||||
delete_at = int(next(self.ts)) + 30
|
||||
delete_at_container = utils.get_expirer_container(delete_at, 86400,
|
||||
'a', 'c', 'o')
|
||||
delete_at_container = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
delete_at, 'a', 'c', 'o')
|
||||
base_headers = {
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'Content-Type': 'application/octet-stream',
|
||||
@ -7957,8 +8017,9 @@ class TestObjectController(BaseTestCase):
|
||||
put_ts = next(self.ts)
|
||||
put_size = 1548
|
||||
put_delete_at = int(next(self.ts)) + 30
|
||||
put_delete_at_container = utils.get_expirer_container(
|
||||
put_delete_at, 86400, 'a', 'c', 'o')
|
||||
put_delete_at_container = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
put_delete_at, 'a', 'c', 'o')
|
||||
put_req = Request.blank(
|
||||
'/sda1/p/a/c/o', method='PUT', body='\x01' * put_size,
|
||||
headers={
|
||||
@ -8008,8 +8069,9 @@ class TestObjectController(BaseTestCase):
|
||||
|
||||
delete_at = int(next(self.ts)) + 100
|
||||
self.assertNotEqual(delete_at, put_delete_at) # sanity
|
||||
delete_at_container = utils.get_expirer_container(
|
||||
delete_at, 86400, 'a', 'c', 'o')
|
||||
delete_at_container = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
delete_at, 'a', 'c', 'o')
|
||||
|
||||
base_headers = {
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
@ -8119,10 +8181,9 @@ class TestObjectController(BaseTestCase):
|
||||
self.object_controller.delete_at_update = fake_delete_at_update
|
||||
timestamp1 = normalize_timestamp(time())
|
||||
delete_at_timestamp1 = int(time() + 1000)
|
||||
delete_at_container1 = str(
|
||||
delete_at_timestamp1 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
delete_at_container1 = \
|
||||
self.object_controller.expirer_config.get_expirer_container(
|
||||
delete_at_timestamp1, 'a', 'c', 'o')
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp1,
|
||||
|
@ -191,15 +191,8 @@ class BaseObjectControllerMixin(object):
|
||||
self.logger = debug_logger('proxy-server')
|
||||
self.logger.thread_locals = ('txn1', '127.0.0.2')
|
||||
# increase connection timeout to avoid intermittent failures
|
||||
conf = {'conn_timeout': 1.0}
|
||||
self.app = PatchedObjControllerApp(
|
||||
conf, account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=self.logger)
|
||||
self.logger.clear() # startup/loading debug msgs not helpful
|
||||
|
||||
# you can over-ride the container_info just by setting it on the app
|
||||
# (see PatchedObjControllerApp for details)
|
||||
self.app.container_info = dict(self.fake_container_info())
|
||||
self.conf = {'conn_timeout': 1.0}
|
||||
self._make_app()
|
||||
|
||||
# default policy and ring references
|
||||
self.policy = POLICIES.default
|
||||
@ -207,6 +200,16 @@ class BaseObjectControllerMixin(object):
|
||||
self._ts_iter = (utils.Timestamp(t) for t in
|
||||
itertools.count(int(time.time())))
|
||||
|
||||
def _make_app(self):
|
||||
self.app = PatchedObjControllerApp(
|
||||
self.conf, account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=self.logger)
|
||||
self.logger.clear() # startup/loading debug msgs not helpful
|
||||
|
||||
# you can over-ride the container_info just by setting it on the app
|
||||
# (see PatchedObjControllerApp for details)
|
||||
self.app.container_info = dict(self.fake_container_info())
|
||||
|
||||
def ts(self):
|
||||
return next(self._ts_iter)
|
||||
|
||||
@ -2587,6 +2590,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
||||
|
||||
def test_PUT_delete_at(self):
|
||||
t = str(int(time.time() + 100))
|
||||
expected_part, expected_nodes, expected_delete_at_container = \
|
||||
self.app.expirer_config.get_delete_at_nodes(t, 'a', 'c', 'o')
|
||||
req = swob.Request.blank('/v1/a/c/o', method='PUT', body=b'',
|
||||
headers={'Content-Type': 'foo/bar',
|
||||
'X-Delete-At': t})
|
||||
@ -2600,12 +2605,52 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
found_host_device = set()
|
||||
for given_headers in put_headers:
|
||||
found_host_device.add('%s/%s' % (
|
||||
given_headers['X-Delete-At-Host'],
|
||||
given_headers['X-Delete-At-Device']))
|
||||
self.assertEqual(given_headers.get('X-Delete-At'), t)
|
||||
self.assertIn('X-Delete-At-Host', given_headers)
|
||||
self.assertIn('X-Delete-At-Device', given_headers)
|
||||
self.assertIn('X-Delete-At-Partition', given_headers)
|
||||
self.assertIn('X-Delete-At-Container', given_headers)
|
||||
self.assertEqual(str(expected_part),
|
||||
given_headers['X-Delete-At-Partition'])
|
||||
self.assertEqual(expected_delete_at_container,
|
||||
given_headers['X-Delete-At-Container'])
|
||||
self.assertEqual({'%(ip)s:%(port)s/%(device)s' % n
|
||||
for n in expected_nodes},
|
||||
found_host_device)
|
||||
|
||||
def test_POST_delete_at_configure_task_container_per_day(self):
|
||||
self.assertEqual(100, self.app.expirer_config.task_container_per_day)
|
||||
t = str(int(time.time() + 100))
|
||||
expected_part, expected_nodes, expected_delete_at_container = \
|
||||
self.app.expirer_config.get_delete_at_nodes(t, 'a', 'c', 'o')
|
||||
req = swob.Request.blank('/v1/a/c/o', method='POST', body=b'',
|
||||
headers={'Content-Type': 'foo/bar',
|
||||
'X-Delete-At': t})
|
||||
post_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path, headers,
|
||||
**kwargs):
|
||||
if method == 'POST':
|
||||
post_headers.append(headers)
|
||||
codes = [201] * self.obj_ring.replicas
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
found_host_device = set()
|
||||
for given_headers in post_headers:
|
||||
found_host_device.add('%s/%s' % (
|
||||
given_headers['X-Delete-At-Host'],
|
||||
given_headers['X-Delete-At-Device']))
|
||||
self.assertEqual(given_headers.get('X-Delete-At'), t)
|
||||
self.assertEqual(str(expected_part),
|
||||
given_headers['X-Delete-At-Partition'])
|
||||
self.assertEqual(expected_delete_at_container,
|
||||
given_headers['X-Delete-At-Container'])
|
||||
self.assertEqual({'%(ip)s:%(port)s/%(device)s' % n
|
||||
for n in expected_nodes},
|
||||
found_host_device)
|
||||
|
||||
def test_POST_delete_at_with_x_open_expired(self):
|
||||
t_delete = str(int(time.time() + 30))
|
||||
@ -2635,11 +2680,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
||||
self.assertIn('X-Delete-At-Container', given_headers)
|
||||
|
||||
# Check when allow_open_expired config is set to true
|
||||
conf = {'allow_open_expired': 'true'}
|
||||
self.app = PatchedObjControllerApp(
|
||||
conf, account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=None)
|
||||
self.app.container_info = dict(self.fake_container_info())
|
||||
self.conf['allow_open_expired'] = 'true'
|
||||
self._make_app()
|
||||
self.obj_ring = self.app.get_object_ring(int(self.policy))
|
||||
|
||||
post_headers = []
|
||||
|
@ -7575,9 +7575,8 @@ class TestReplicatedObjectController(
|
||||
self.app.container_ring.set_replicas(2)
|
||||
|
||||
delete_at_timestamp = int(time.time()) + 100000
|
||||
delete_at_container = utils.get_expirer_container(
|
||||
delete_at_timestamp, self.app.expiring_objects_container_divisor,
|
||||
'a', 'c', 'o')
|
||||
delete_at_container = self.app.expirer_config.get_expirer_container(
|
||||
delete_at_timestamp, 'a', 'c', 'o')
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': '0',
|
||||
@ -7608,13 +7607,22 @@ class TestReplicatedObjectController(
|
||||
@mock.patch('time.time', new=lambda: STATIC_TIME)
|
||||
def test_PUT_x_delete_at_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
self.app.expiring_objects_account = 'expires'
|
||||
self.app.expiring_objects_container_divisor = 60
|
||||
self.app.expirer_config = proxy_server.expirer.ExpirerConfig(
|
||||
{
|
||||
'expiring_objects_account_name': 'expires',
|
||||
'expiring_objects_container_divisor': 600,
|
||||
}, logger=self.logger, container_ring=self.app.container_ring)
|
||||
self.assertEqual([
|
||||
'expiring_objects_container_divisor is deprecated',
|
||||
'expiring_objects_account_name is deprecated; you need to migrate '
|
||||
'to the standard .expiring_objects account',
|
||||
], self.logger.get_lines_for_level('warning'))
|
||||
self.assertIs(self.app.container_ring,
|
||||
self.app.expirer_config.container_ring)
|
||||
|
||||
delete_at_timestamp = int(time.time()) + 100000
|
||||
delete_at_container = utils.get_expirer_container(
|
||||
delete_at_timestamp, self.app.expiring_objects_container_divisor,
|
||||
'a', 'c', 'o')
|
||||
delete_at_container = self.app.expirer_config.get_expirer_container(
|
||||
delete_at_timestamp, 'a', 'c', 'o')
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': 0,
|
||||
|
Loading…
x
Reference in New Issue
Block a user