Add operator tool to async-delete some or all objects in a container

Adds a tool, swift-container-deleter, that takes an account/container
and optional prefix, marker, and/or end-marker; spins up an internal
client; makes listing requests against the container; and pushes the
found objects into the object-expirer queue with a special
application/async-deleted content-type.

In order to do this enqueuing efficiently, a new internal-to-the-cluster
container method is introduced: UPDATE. It takes a JSON list of object
entries and runs them through merge_items.

The object-expirer is updated to look for work items with this
content-type and skip the X-If-Deleted-At check that it would normally
do.

Note that the target-container's listing will continue to show the
objects until data is actually deleted, bypassing some of the concerns
raised in the related change about clearing out a container entirely and
then deleting it.

Change-Id: Ia13ee5da3d1b5c536eccaadc7a6fdcd997374443
Related-Change: I50e403dee75585fc1ff2bb385d6b2d2f13653cf8
This commit is contained in:
Tim Burke 2019-03-27 15:28:50 -07:00
parent b4c8cf192a
commit 83d0161991
12 changed files with 771 additions and 51 deletions

View File

@ -78,6 +78,7 @@ keystone =
[entry_points]
console_scripts =
swift-manage-shard-ranges = swift.cli.manage_shard_ranges:main
swift-container-deleter = swift.cli.container_deleter:main
paste.app_factory =
proxy = swift.proxy.server:app_factory

View File

@ -0,0 +1,174 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''
Enqueue background jobs to delete portions of a container's namespace.
Accepts prefix, marker, and end-marker args that work as in container
listings. Objects found in the listing will be marked to be deleted
by the object-expirer; until the object is actually deleted, it will
continue to appear in listings.
If there are many objects, this operation may take some time. Stats will
periodically be emitted so you know the process hasn't hung. These will
also include the last object marked for deletion; if there is a failure,
pass this as the ``--marker`` when retrying to minimize duplicative work.
'''
import argparse
import io
import itertools
import json
import six
import time
from swift.common.internal_client import InternalClient
from swift.common.utils import Timestamp, MD5_OF_EMPTY_STRING
from swift.obj.expirer import build_task_obj, ASYNC_DELETE_TYPE
OBJECTS_PER_UPDATE = 10000
def make_delete_jobs(account, container, objects, timestamp):
'''
Create a list of async-delete jobs
:param account: (native or unicode string) account to delete from
:param container: (native or unicode string) container to delete from
:param objects: (list of native or unicode strings) objects to delete
:param timestamp: (Timestamp) time at which objects should be marked
deleted
:returns: list of dicts appropriate for an UPDATE request to an
expiring-object queue
'''
if six.PY2:
if isinstance(account, str):
account = account.decode('utf8')
if isinstance(container, str):
container = container.decode('utf8')
return [
{
'name': build_task_obj(
timestamp, account, container,
obj.decode('utf8') if six.PY2 and isinstance(obj, str)
else obj),
'deleted': 0,
'created_at': timestamp.internal,
'etag': MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': ASYNC_DELETE_TYPE,
} for obj in objects]
def mark_for_deletion(swift, account, container, marker, end_marker,
prefix, timestamp=None, yield_time=10):
'''
Enqueue jobs to async-delete some portion of a container's namespace
:param swift: InternalClient to use
:param account: account to delete from
:param container: container to delete from
:param marker: only delete objects after this name
:param end_marker: only delete objects before this name. Use ``None`` or
empty string to delete to the end of the namespace.
:param prefix: only delete objects starting with this prefix
:param timestamp: delete all objects as of this time. If ``None``, the
current time will be used.
:param yield_time: approximate period with which intermediate results
should be returned. If ``None``, disable intermediate
results.
:returns: If ``yield_time`` is ``None``, the number of objects marked for
deletion. Otherwise, a generator that will yield out tuples of
``(number of marked objects, last object name)`` approximately
every ``yield_time`` seconds. The final tuple will have ``None``
as the second element. This form allows you to retry when an
error occurs partway through while minimizing duplicate work.
'''
if timestamp is None:
timestamp = Timestamp.now()
def enqueue_deletes():
deleted = 0
obj_iter = swift.iter_objects(
account, container,
marker=marker, end_marker=end_marker, prefix=prefix)
time_marker = time.time()
while True:
to_delete = [obj['name'] for obj in itertools.islice(
obj_iter, OBJECTS_PER_UPDATE)]
if not to_delete:
break
delete_jobs = make_delete_jobs(
account, container, to_delete, timestamp)
swift.make_request(
'UPDATE',
swift.make_path('.expiring_objects', str(int(timestamp))),
headers={'X-Backend-Allow-Method': 'UPDATE',
'X-Backend-Storage-Policy-Index': '0',
'X-Timestamp': timestamp.internal},
acceptable_statuses=(2,),
body_file=io.BytesIO(json.dumps(delete_jobs).encode('ascii')))
deleted += len(delete_jobs)
if yield_time is not None and \
time.time() - time_marker > yield_time:
yield deleted, to_delete[-1]
time_marker = time.time()
yield deleted, None
if yield_time is None:
for deleted, marker in enqueue_deletes():
if marker is None:
return deleted
else:
return enqueue_deletes()
def main():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--config', default='/etc/swift/internal-client.conf',
help=('internal-client config file '
'(default: /etc/swift/internal-client.conf'))
parser.add_argument('--request-tries', type=int, default=3,
help='(default: 3)')
parser.add_argument('account', help='account from which to delete')
parser.add_argument('container', help='container from which to delete')
parser.add_argument(
'--prefix', default='',
help='only delete objects with this prefix (default: none)')
parser.add_argument(
'--marker', default='',
help='only delete objects after this marker (default: none)')
parser.add_argument(
'--end-marker', default='',
help='only delete objects before this end-marker (default: none)')
parser.add_argument(
'--timestamp', type=Timestamp, default=Timestamp.now(),
help='delete all objects as of this time (default: now)')
args = parser.parse_args()
swift = InternalClient(
args.config, 'Swift Container Deleter', args.request_tries)
for deleted, marker in mark_for_deletion(
swift, args.account, args.container,
args.marker, args.end_marker, args.prefix, args.timestamp):
if marker is None:
print('Finished. Marked %d objects for deletion.' % deleted)
else:
print('Marked %d objects for deletion, through %r' % (
deleted, marker))
if __name__ == '__main__':
main()

View File

@ -751,6 +751,32 @@ class ContainerController(BaseStorageServer):
ret.request = req
return ret
@public
@timing_stats()
def UPDATE(self, req):
"""
Handle HTTP UPDATE request (merge_items RPCs coming from the proxy.)
"""
drive, part, account, container = split_and_validate_path(req, 4)
req_timestamp = valid_timestamp(req)
try:
check_drive(self.root, drive, self.mount_check)
except ValueError:
return HTTPInsufficientStorage(drive=drive, request=req)
if not self.check_free_space(drive):
return HTTPInsufficientStorage(drive=drive, request=req)
requested_policy_index = self.get_and_validate_policy_index(req)
broker = self._get_container_broker(drive, part, account, container)
self._maybe_autocreate(broker, req_timestamp, account,
requested_policy_index)
try:
objs = json.load(req.environ['wsgi.input'])
except ValueError as err:
return HTTPBadRequest(body=str(err), content_type='text/plain')
broker.merge_items(objs)
return HTTPAccepted(request=req)
@public
@timing_stats()
def POST(self, req):

View File

@ -28,7 +28,7 @@ from eventlet.greenpool import GreenPool
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
Timestamp, config_true_value
Timestamp, config_true_value, normalize_delete_at_timestamp
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED
from swift.common.swob import wsgi_quote, str_to_wsgi
@ -36,6 +36,34 @@ from swift.common.swob import wsgi_quote, str_to_wsgi
from swift.container.reconciler import direct_delete_container_entry
MAX_OBJECTS_TO_CACHE = 100000
ASYNC_DELETE_TYPE = 'application/async-deleted'
def build_task_obj(timestamp, target_account, target_container,
target_obj):
"""
:return: a task object name in format of
"<timestamp>-<target_account>/<target_container>/<target_obj>"
"""
timestamp = Timestamp(timestamp)
return '%s-%s/%s/%s' % (
normalize_delete_at_timestamp(timestamp),
target_account, target_container, target_obj)
def parse_task_obj(task_obj):
"""
:param task_obj: a task object name in format of
"<timestamp>-<target_account>/<target_container>" +
"/<target_obj>"
:return: 4-tuples of (delete_at_time, target_account, target_container,
target_obj)
"""
timestamp, target_path = task_obj.split('-', 1)
timestamp = Timestamp(timestamp)
target_account, target_container, target_obj = \
split_path('/' + target_path, 3, 3, True)
return timestamp, target_account, target_container, target_obj
class ObjectExpirer(Daemon):
@ -123,18 +151,7 @@ class ObjectExpirer(Daemon):
self.report_last_time = time()
def parse_task_obj(self, task_obj):
"""
:param task_obj: a task object name in format of
"<timestamp>-<target_account>/<target_container>" +
"/<target_obj>"
:return: 4-tuples of (delete_at_time, target_account, target_container,
target_obj)
"""
timestamp, target_path = task_obj.split('-', 1)
timestamp = Timestamp(timestamp)
target_account, target_container, target_obj = \
split_path('/' + target_path, 3, 3, True)
return timestamp, target_account, target_container, target_obj
return parse_task_obj(task_obj)
def round_robin_order(self, task_iter):
"""
@ -238,7 +255,7 @@ class ObjectExpirer(Daemon):
task_object = o['name']
try:
delete_timestamp, target_account, target_container, \
target_object = self.parse_task_obj(task_object)
target_object = parse_task_obj(task_object)
except ValueError:
self.logger.exception('Unexcepted error handling task %r' %
task_object)
@ -253,12 +270,14 @@ class ObjectExpirer(Daemon):
divisor) != my_index:
continue
is_async = o.get('content_type') == ASYNC_DELETE_TYPE
yield {'task_account': task_account,
'task_container': task_container,
'task_object': task_object,
'target_path': '/'.join([
target_account, target_container, target_object]),
'delete_timestamp': delete_timestamp}
'delete_timestamp': delete_timestamp,
'is_async_delete': is_async}
def run_once(self, *args, **kwargs):
"""
@ -390,11 +409,13 @@ class ObjectExpirer(Daemon):
'process must be less than processes')
def delete_object(self, target_path, delete_timestamp,
task_account, task_container, task_object):
task_account, task_container, task_object,
is_async_delete):
start_time = time()
try:
try:
self.delete_actual_object(target_path, delete_timestamp)
self.delete_actual_object(target_path, delete_timestamp,
is_async_delete)
except UnexpectedResponse as err:
if err.resp.status_int not in {HTTP_NOT_FOUND,
HTTP_PRECONDITION_FAILED}:
@ -431,7 +452,7 @@ class ObjectExpirer(Daemon):
direct_delete_container_entry(self.swift.container_ring, task_account,
task_container, task_object)
def delete_actual_object(self, actual_obj, timestamp):
def delete_actual_object(self, actual_obj, timestamp, is_async_delete):
"""
Deletes the end-user object indicated by the actual object name given
'<account>/<container>/<object>' if and only if the X-Delete-At value
@ -442,13 +463,19 @@ class ObjectExpirer(Daemon):
:param timestamp: The swift.common.utils.Timestamp instance the
X-Delete-At value must match to perform the actual
delete.
:param is_async_delete: False if the object should be deleted because
of "normal" expiration, or True if it should
be async-deleted.
:raises UnexpectedResponse: if the delete was unsuccessful and
should be retried later
"""
path = '/v1/' + wsgi_quote(str_to_wsgi(actual_obj.lstrip('/')))
self.swift.make_request(
'DELETE', path,
{'X-If-Delete-At': timestamp.normal,
'X-Timestamp': timestamp.normal,
'X-Backend-Clean-Expiring-Object-Queue': 'no'},
(2, HTTP_CONFLICT))
if is_async_delete:
headers = {'X-Timestamp': timestamp.normal}
acceptable_statuses = (2, HTTP_CONFLICT, HTTP_NOT_FOUND)
else:
headers = {'X-Timestamp': timestamp.normal,
'X-If-Delete-At': timestamp.normal,
'X-Backend-Clean-Expiring-Object-Queue': 'no'}
acceptable_statuses = (2, HTTP_CONFLICT)
self.swift.make_request('DELETE', path, headers, acceptable_statuses)

View File

@ -57,6 +57,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HTTPConflict, \
HTTPServerError, wsgi_to_bytes, wsgi_to_str
from swift.obj.diskfile import RESERVED_DATAFILE_META, DiskFileRouter
from swift.obj.expirer import build_task_obj
def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
@ -493,7 +494,7 @@ class ObjectController(BaseStorageServer):
for host, contdevice in updates:
self.async_update(
op, self.expiring_objects_account, delete_at_container,
'%s-%s/%s/%s' % (delete_at, account, container, obj),
build_task_obj(delete_at, account, container, obj),
host, partition, contdevice, headers_out, objdevice,
policy)

View File

@ -1643,7 +1643,7 @@ class Controller(object):
return info
def _make_request(self, nodes, part, method, path, headers, query,
logger_thread_locals):
body, logger_thread_locals):
"""
Iterates over the given node iterator, sending an HTTP request to one
node at a time. The first non-informational, non-server-error
@ -1657,12 +1657,18 @@ class Controller(object):
(full path ends up being /<$device>/<$part>/<$path>)
:param headers: dictionary of headers
:param query: query string to send to the backend.
:param body: byte string to use as the request body.
Try to keep it small.
:param logger_thread_locals: The thread local values to be set on the
self.app.logger to retain transaction
logging information.
:returns: a swob.Response object, or None if no responses were received
"""
self.app.logger.thread_locals = logger_thread_locals
if body:
if not isinstance(body, bytes):
raise TypeError('body must be bytes, not %s' % type(body))
headers['Content-Length'] = str(len(body))
for node in nodes:
try:
start_node_timing = time.time()
@ -1672,6 +1678,9 @@ class Controller(object):
headers=headers, query_string=query)
conn.node = node
self.app.set_node_timing(node, time.time() - start_node_timing)
if body:
with Timeout(self.app.node_timeout):
conn.send(body)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
if not is_informational(resp.status) and \
@ -1698,7 +1707,7 @@ class Controller(object):
def make_requests(self, req, ring, part, method, path, headers,
query_string='', overrides=None, node_count=None,
node_iterator=None):
node_iterator=None, body=None):
"""
Sends an HTTP request to multiple nodes and aggregates the results.
It attempts the primary nodes concurrently, then iterates over the
@ -1727,7 +1736,7 @@ class Controller(object):
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
head, query_string, self.app.logger.thread_locals)
head, query_string, body, self.app.logger.thread_locals)
response = []
statuses = []
for resp in pile:

View File

@ -356,6 +356,26 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
return resp
def UPDATE(self, req):
"""HTTP UPDATE request handler.
Method to perform bulk operations on container DBs,
similar to a merge_items REPLICATE request.
Not client facing; internal clients or middlewares must include
``X-Backend-Allow-Method: UPDATE`` header to access.
"""
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
# Since this isn't client facing, expect callers to supply an index
policy_index = req.headers['X-Backend-Storage-Policy-Index']
headers = self._backend_requests(
req, len(containers), account_partition=None, accounts=[],
policy_index=policy_index)
return self.make_requests(
req, self.app.container_ring, container_partition, 'UPDATE',
req.swift_entity_path, headers, body=req.body)
def _backend_requests(self, req, n_outgoing, account_partition, accounts,
policy_index=None):
additional = {'X-Timestamp': Timestamp.now().internal}

View File

@ -507,8 +507,12 @@ class Application(object):
controller.trans_id = req.environ['swift.trans_id']
self.logger.client_ip = get_remote_client(req)
if req.method not in controller.allowed_methods:
allowed_methods = set(controller.allowed_methods)
if 'X-Backend-Allow-Method' in req.headers:
allowed_methods.add(req.headers['X-Backend-Allow-Method'])
if req.method not in allowed_methods:
return HTTPMethodNotAllowed(request=req, headers={
# Only advertise the *controller's* allowed_methods
'Allow': ', '.join(controller.allowed_methods)})
handler = getattr(controller, req.method)

View File

@ -0,0 +1,277 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import itertools
import json
import mock
import six
import unittest
from swift.cli import container_deleter
from swift.common import internal_client
from swift.common import swob
from swift.common import utils
AppCall = collections.namedtuple('AppCall', [
'method', 'path', 'query', 'headers', 'body'])
class FakeInternalClient(internal_client.InternalClient):
def __init__(self, responses):
self.resp_iter = iter(responses)
self.calls = []
def make_request(self, method, path, headers, acceptable_statuses,
body_file=None, params=None):
if body_file is None:
body = None
else:
body = body_file.read()
path, _, query = path.partition('?')
self.calls.append(AppCall(method, path, query, headers, body))
resp = next(self.resp_iter)
if isinstance(resp, Exception):
raise resp
return resp
def __enter__(self):
return self
def __exit__(self, *args):
unused_responses = [r for r in self.resp_iter]
if unused_responses:
raise Exception('Unused responses: %r' % unused_responses)
class TestContainerDeleter(unittest.TestCase):
def setUp(self):
patcher = mock.patch.object(container_deleter.time, 'time',
side_effect=itertools.count())
patcher.__enter__()
self.addCleanup(patcher.__exit__)
patcher = mock.patch.object(container_deleter, 'OBJECTS_PER_UPDATE', 5)
patcher.__enter__()
self.addCleanup(patcher.__exit__)
def test_make_delete_jobs(self):
ts = '1558463777.42739'
self.assertEqual(
container_deleter.make_delete_jobs(
'acct', 'cont', ['obj1', 'obj2'],
utils.Timestamp(ts)),
[{'name': ts.split('.')[0] + '-acct/cont/obj1',
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': ts.split('.')[0] + '-acct/cont/obj2',
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'}])
def test_make_delete_jobs_native_utf8(self):
ts = '1558463777.42739'
uacct = acct = u'acct-\U0001f334'
ucont = cont = u'cont-\N{SNOWMAN}'
uobj1 = obj1 = u'obj-\N{GREEK CAPITAL LETTER ALPHA}'
uobj2 = obj2 = u'obj-\N{GREEK CAPITAL LETTER OMEGA}'
if six.PY2:
acct = acct.encode('utf8')
cont = cont.encode('utf8')
obj1 = obj1.encode('utf8')
obj2 = obj2.encode('utf8')
self.assertEqual(
container_deleter.make_delete_jobs(
acct, cont, [obj1, obj2], utils.Timestamp(ts)),
[{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj1),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj2),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'}])
def test_make_delete_jobs_unicode_utf8(self):
ts = '1558463777.42739'
acct = u'acct-\U0001f334'
cont = u'cont-\N{SNOWMAN}'
obj1 = u'obj-\N{GREEK CAPITAL LETTER ALPHA}'
obj2 = u'obj-\N{GREEK CAPITAL LETTER OMEGA}'
self.assertEqual(
container_deleter.make_delete_jobs(
acct, cont, [obj1, obj2], utils.Timestamp(ts)),
[{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj1),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj2),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'}])
def test_mark_for_deletion_empty_no_yield(self):
with FakeInternalClient([
swob.Response(json.dumps([
])),
]) as swift:
self.assertEqual(container_deleter.mark_for_deletion(
swift,
'account',
'container',
'marker',
'end',
'prefix',
timestamp=None,
yield_time=None,
), 0)
self.assertEqual(swift.calls, [
('GET', '/v1/account/container',
'format=json&marker=marker&end_marker=end&prefix=prefix',
{}, None),
])
def test_mark_for_deletion_empty_with_yield(self):
with FakeInternalClient([
swob.Response(json.dumps([
])),
]) as swift:
self.assertEqual(list(container_deleter.mark_for_deletion(
swift,
'account',
'container',
'marker',
'end',
'prefix',
timestamp=None,
yield_time=0.5,
)), [(0, None)])
self.assertEqual(swift.calls, [
('GET', '/v1/account/container',
'format=json&marker=marker&end_marker=end&prefix=prefix',
{}, None),
])
def test_mark_for_deletion_one_update_no_yield(self):
ts = '1558463777.42739'
with FakeInternalClient([
swob.Response(json.dumps([
{'name': 'obj1'},
{'name': 'obj2'},
{'name': 'obj3'},
])),
swob.Response(json.dumps([
])),
swob.Response(status=202),
]) as swift:
self.assertEqual(container_deleter.mark_for_deletion(
swift,
'account',
'container',
'',
'',
'',
timestamp=utils.Timestamp(ts),
yield_time=None,
), 3)
self.assertEqual(swift.calls, [
('GET', '/v1/account/container',
'format=json&marker=&end_marker=&prefix=', {}, None),
('GET', '/v1/account/container',
'format=json&marker=obj3&end_marker=&prefix=', {}, None),
('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', {
'X-Backend-Allow-Method': 'UPDATE',
'X-Backend-Storage-Policy-Index': '0',
'X-Timestamp': ts}, mock.ANY),
])
self.assertEqual(
json.loads(swift.calls[-1].body),
container_deleter.make_delete_jobs(
'account', 'container', ['obj1', 'obj2', 'obj3'],
utils.Timestamp(ts)
)
)
def test_mark_for_deletion_two_updates_with_yield(self):
ts = '1558463777.42739'
with FakeInternalClient([
swob.Response(json.dumps([
{'name': 'obj1'},
{'name': 'obj2'},
{'name': 'obj3'},
{'name': u'obj4-\N{SNOWMAN}'},
{'name': 'obj5'},
{'name': 'obj6'},
])),
swob.Response(status=202),
swob.Response(json.dumps([
])),
swob.Response(status=202),
]) as swift:
self.assertEqual(list(container_deleter.mark_for_deletion(
swift,
'account',
'container',
'',
'',
'',
timestamp=utils.Timestamp(ts),
yield_time=0,
)), [(5, 'obj5'), (6, 'obj6'), (6, None)])
self.assertEqual(swift.calls, [
('GET', '/v1/account/container',
'format=json&marker=&end_marker=&prefix=', {}, None),
('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', {
'X-Backend-Allow-Method': 'UPDATE',
'X-Backend-Storage-Policy-Index': '0',
'X-Timestamp': ts}, mock.ANY),
('GET', '/v1/account/container',
'format=json&marker=obj6&end_marker=&prefix=', {}, None),
('UPDATE', '/v1/.expiring_objects/' + ts.split('.')[0], '', {
'X-Backend-Allow-Method': 'UPDATE',
'X-Backend-Storage-Policy-Index': '0',
'X-Timestamp': ts}, mock.ANY),
])
self.assertEqual(
json.loads(swift.calls[-3].body),
container_deleter.make_delete_jobs(
'account', 'container',
['obj1', 'obj2', 'obj3', u'obj4-\N{SNOWMAN}', 'obj5'],
utils.Timestamp(ts)
)
)
self.assertEqual(
json.loads(swift.calls[-1].body),
container_deleter.make_delete_jobs(
'account', 'container', ['obj6'],
utils.Timestamp(ts)
)
)

View File

@ -354,10 +354,8 @@ class TestContainerController(unittest.TestCase):
req.content_length = 0
resp = server_handler.OPTIONS(req)
self.assertEqual(200, resp.status_int)
for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE'.split():
self.assertTrue(
verb in resp.headers['Allow'].split(', '))
self.assertEqual(len(resp.headers['Allow'].split(', ')), 7)
self.assertEqual(sorted(resp.headers['Allow'].split(', ')), sorted(
'OPTIONS GET POST PUT DELETE HEAD REPLICATE UPDATE'.split()))
self.assertEqual(resp.headers['Server'],
(self.controller.server_type + '/' + swift_version))
@ -1477,6 +1475,115 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(mock_statvfs.mock_calls,
[mock.call(os.path.join(self.testdir, 'sda1'))])
def test_UPDATE(self):
ts_iter = make_timestamp_iter()
req = Request.blank(
'/sda1/p/a/c',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': next(ts_iter).internal})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
ts_iter = make_timestamp_iter()
req = Request.blank(
'/sda1/p/a/c',
environ={'REQUEST_METHOD': 'UPDATE'},
headers={'X-Timestamp': next(ts_iter).internal},
body='[invalid json')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 400)
ts_iter = make_timestamp_iter()
req = Request.blank(
'/sda1/p/a/c',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': next(ts_iter).internal})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
obj_ts = next(ts_iter)
req = Request.blank(
'/sda1/p/a/c',
environ={'REQUEST_METHOD': 'UPDATE'},
headers={'X-Timestamp': next(ts_iter).internal},
body=json.dumps([
{'name': 'some obj', 'deleted': 0,
'created_at': obj_ts.internal,
'etag': 'whatever', 'size': 1234,
'storage_policy_index': POLICIES.default.idx,
'content_type': 'foo/bar'},
{'name': 'some tombstone', 'deleted': 1,
'created_at': next(ts_iter).internal,
'etag': 'noetag', 'size': 0,
'storage_policy_index': POLICIES.default.idx,
'content_type': 'application/deleted'},
{'name': 'wrong policy', 'deleted': 0,
'created_at': next(ts_iter).internal,
'etag': 'whatever', 'size': 6789,
'storage_policy_index': 1,
'content_type': 'foo/bar'},
]))
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 202)
req = Request.blank(
'/sda1/p/a/c?format=json',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': next(ts_iter).internal})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(json.loads(resp.body), [
{'name': 'some obj', 'hash': 'whatever', 'bytes': 1234,
'content_type': 'foo/bar', 'last_modified': obj_ts.isoformat},
])
def test_UPDATE_autocreate(self):
ts_iter = make_timestamp_iter()
req = Request.blank(
'/sda1/p/.a/c',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': next(ts_iter).internal})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
obj_ts = next(ts_iter)
req = Request.blank(
'/sda1/p/.a/c',
environ={'REQUEST_METHOD': 'UPDATE'},
headers={
'X-Timestamp': next(ts_iter).internal,
'X-Backend-Storage-Policy-Index': str(POLICIES.default.idx)},
body=json.dumps([
{'name': 'some obj', 'deleted': 0,
'created_at': obj_ts.internal,
'etag': 'whatever', 'size': 1234,
'storage_policy_index': POLICIES.default.idx,
'content_type': 'foo/bar'},
{'name': 'some tombstone', 'deleted': 1,
'created_at': next(ts_iter).internal,
'etag': 'noetag', 'size': 0,
'storage_policy_index': POLICIES.default.idx,
'content_type': 'application/deleted'},
{'name': 'wrong policy', 'deleted': 0,
'created_at': next(ts_iter).internal,
'etag': 'whatever', 'size': 6789,
'storage_policy_index': 1,
'content_type': 'foo/bar'},
]))
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 202, resp.body)
req = Request.blank(
'/sda1/p/.a/c?format=json',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': next(ts_iter).internal})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(json.loads(resp.body), [
{'name': 'some obj', 'hash': 'whatever', 'bytes': 1234,
'content_type': 'foo/bar', 'last_modified': obj_ts.isoformat},
])
def test_DELETE(self):
ts_iter = make_timestamp_iter()
req = Request.blank(
@ -4591,7 +4698,7 @@ class TestNonLegacyDefaultStoragePolicy(TestContainerController):
def _update_object_put_headers(self, req):
"""
Add policy index headers for containers created with default policy
- which in this TestCase is 1.
- which in this TestCase is 2.
"""
req.headers['X-Backend-Storage-Policy-Index'] = \
str(POLICIES.default.idx)

View File

@ -254,7 +254,8 @@ class TestObjectExpirer(TestCase):
self.deleted_objects = {}
def delete_object(self, target_path, delete_timestamp,
task_account, task_container, task_object):
task_account, task_container, task_object,
is_async_delete):
if task_container not in self.deleted_objects:
self.deleted_objects[task_container] = set()
self.deleted_objects[task_container].add(task_object)
@ -303,9 +304,10 @@ class TestObjectExpirer(TestCase):
with mock.patch.object(x, 'delete_actual_object',
side_effect=exc) as delete_actual:
with mock.patch.object(x, 'pop_queue') as pop_queue:
x.delete_object(actual_obj, ts, account, container, obj)
x.delete_object(actual_obj, ts, account, container, obj,
False)
delete_actual.assert_called_once_with(actual_obj, ts)
delete_actual.assert_called_once_with(actual_obj, ts, False)
log_lines = x.logger.get_lines_for_level('error')
if should_pop:
pop_queue.assert_called_once_with(account, container, obj)
@ -377,13 +379,14 @@ class TestObjectExpirer(TestCase):
assert_parse_task_obj('1000-a/c/o', 1000, 'a', 'c', 'o')
assert_parse_task_obj('0000-acc/con/obj', 0, 'acc', 'con', 'obj')
def make_task(self, delete_at, target):
def make_task(self, delete_at, target, is_async_delete=False):
return {
'task_account': '.expiring_objects',
'task_container': delete_at,
'task_object': delete_at + '-' + target,
'delete_timestamp': Timestamp(delete_at),
'target_path': target,
'is_async_delete': is_async_delete,
}
def test_round_robin_order(self):
@ -620,7 +623,7 @@ class TestObjectExpirer(TestCase):
# executed tasks are with past time
self.assertEqual(
mock_method.call_args_list,
[mock.call(target_path, self.past_time)
[mock.call(target_path, self.past_time, False)
for target_path in self.expired_target_path_list])
def test_failed_delete_keeps_entry(self):
@ -638,7 +641,7 @@ class TestObjectExpirer(TestCase):
# all tasks are done
with mock.patch.object(self.expirer, 'delete_actual_object',
lambda o, t: None), \
lambda o, t, b: None), \
mock.patch.object(self.expirer, 'pop_queue') as mock_method:
self.expirer.run_once()
@ -653,7 +656,7 @@ class TestObjectExpirer(TestCase):
self.assertEqual(self.expirer.report_objects, 0)
with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0), \
mock.patch.object(self.expirer, 'delete_actual_object',
lambda o, t: None), \
lambda o, t, b: None), \
mock.patch.object(self.expirer, 'pop_queue',
lambda a, c, o: None):
self.expirer.run_once()
@ -662,7 +665,8 @@ class TestObjectExpirer(TestCase):
def test_delete_actual_object_gets_native_string(self):
got_str = [False]
def delete_actual_object_test_for_string(actual_obj, timestamp):
def delete_actual_object_test_for_string(actual_obj, timestamp,
is_async_delete):
if isinstance(actual_obj, str):
got_str[0] = True
@ -681,7 +685,7 @@ class TestObjectExpirer(TestCase):
def fail_delete_container(*a, **kw):
raise Exception('failed to delete container')
def fail_delete_actual_object(actual_obj, timestamp):
def fail_delete_actual_object(actual_obj, timestamp, is_async_delete):
raise Exception('failed to delete actual object')
with mock.patch.object(self.fake_swift, 'delete_container',
@ -761,10 +765,30 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
ts = Timestamp('1234')
x.delete_actual_object('/path/to/object', ts)
x.delete_actual_object('/path/to/object', ts, False)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
got_env[0]['HTTP_X_IF_DELETE_AT'])
self.assertEqual(
got_env[0]['HTTP_X_BACKEND_CLEAN_EXPIRING_OBJECT_QUEUE'], 'no')
def test_delete_actual_object_bulk(self):
got_env = [None]
def fake_app(env, start_response):
got_env[0] = env
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
ts = Timestamp('1234')
x.delete_actual_object('/path/to/object', ts, True)
self.assertNotIn('HTTP_X_IF_DELETE_AT', got_env[0])
self.assertNotIn('HTTP_X_BACKEND_CLEAN_EXPIRING_OBJECT_QUEUE',
got_env[0])
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'], ts.internal)
def test_delete_actual_object_nourlquoting(self):
# delete_actual_object should not do its own url quoting because
@ -780,12 +804,41 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
ts = Timestamp('1234')
x.delete_actual_object('/path/to/object name', ts)
x.delete_actual_object('/path/to/object name', ts, False)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
got_env[0]['HTTP_X_IF_DELETE_AT'])
self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name')
def test_delete_actual_object_async_returns_expected_error(self):
def do_test(test_status, should_raise):
calls = [0]
def fake_app(env, start_response):
calls[0] += 1
calls.append(env['PATH_INFO'])
start_response(test_status, [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
ts = Timestamp('1234')
if should_raise:
with self.assertRaises(internal_client.UnexpectedResponse):
x.delete_actual_object('/path/to/object', ts, True)
else:
x.delete_actual_object('/path/to/object', ts, True)
self.assertEqual(calls[0], 1, calls)
# object was deleted and tombstone reaped
do_test('404 Not Found', False)
# object was overwritten *after* the original delete, or
# object was deleted but tombstone still exists, or ...
do_test('409 Conflict', False)
# Anything else, raise
do_test('400 Bad Request', True)
def test_delete_actual_object_returns_expected_error(self):
def do_test(test_status, should_raise):
calls = [0]
@ -801,9 +854,9 @@ class TestObjectExpirer(TestCase):
ts = Timestamp('1234')
if should_raise:
with self.assertRaises(internal_client.UnexpectedResponse):
x.delete_actual_object('/path/to/object', ts)
x.delete_actual_object('/path/to/object', ts, False)
else:
x.delete_actual_object('/path/to/object', ts)
x.delete_actual_object('/path/to/object', ts, False)
self.assertEqual(calls[0], 1)
# object was deleted and tombstone reaped
@ -828,7 +881,7 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
exc = None
try:
x.delete_actual_object('/path/to/object', Timestamp('1234'))
x.delete_actual_object('/path/to/object', Timestamp('1234'), False)
except Exception as err:
exc = err
finally:
@ -841,7 +894,7 @@ class TestObjectExpirer(TestCase):
x = expirer.ObjectExpirer({})
x.swift.make_request = mock.Mock()
x.swift.make_request.return_value.status_int = 204
x.delete_actual_object(name, timestamp)
x.delete_actual_object(name, timestamp, False)
self.assertEqual(x.swift.make_request.call_count, 1)
self.assertEqual(x.swift.make_request.call_args[0][1],
'/v1/' + urllib.parse.quote(name))
@ -851,7 +904,7 @@ class TestObjectExpirer(TestCase):
timestamp = Timestamp('1515544858.80602')
x = expirer.ObjectExpirer({})
x.swift.make_request = mock.MagicMock()
x.delete_actual_object(name, timestamp)
x.delete_actual_object(name, timestamp, False)
self.assertEqual(x.swift.make_request.call_count, 1)
header = 'X-Backend-Clean-Expiring-Object-Queue'
self.assertEqual(

View File

@ -321,7 +321,7 @@ class TestController(unittest.TestCase):
self.controller.account_info(self.account, self.request)
set_http_connect(201, raise_timeout_exc=True)
self.controller._make_request(
nodes, partition, 'POST', '/', '', '',
nodes, partition, 'POST', '/', '', '', None,
self.controller.app.logger.thread_locals)
# tests if 200 is cached and used
@ -668,6 +668,27 @@ class TestProxyServer(unittest.TestCase):
Request.blank('/v1/a', environ={'REQUEST_METHOD': '!invalid'}))
self.assertEqual(resp.status, '405 Method Not Allowed')
def test_private_method_request(self):
baseapp = proxy_server.Application({},
FakeMemcache(),
container_ring=FakeRing(),
account_ring=FakeRing())
baseapp.logger = debug_logger()
resp = baseapp.handle_request(
Request.blank('/v1/a/c', environ={'REQUEST_METHOD': 'UPDATE'}))
self.assertEqual(resp.status, '405 Method Not Allowed')
# Note that UPDATE definitely *isn't* advertised
self.assertEqual(sorted(resp.headers['Allow'].split(', ')), [
'DELETE', 'GET', 'HEAD', 'OPTIONS', 'POST', 'PUT'])
# But with appropriate (internal-only) overrides, you can still use it
resp = baseapp.handle_request(
Request.blank('/v1/a/c', environ={'REQUEST_METHOD': 'UPDATE'},
headers={'X-Backend-Allow-Method': 'UPDATE',
'X-Backend-Storage-Policy-Index': '0'}))
# Now we actually make the requests, but there aren't any nodes
self.assertEqual(resp.status, '503 Service Unavailable')
def test_calls_authorize_allow(self):
called = [False]