Merge "Refactor expirer's task round robin implementation"
This commit is contained in:
commit
d296ec8be4
@ -19,6 +19,7 @@ from random import random
|
||||
from time import time
|
||||
from os.path import join
|
||||
from swift import gettext_ as _
|
||||
from collections import defaultdict, deque
|
||||
import hashlib
|
||||
|
||||
from eventlet import sleep, Timeout
|
||||
@ -26,7 +27,8 @@ from eventlet.greenpool import GreenPool
|
||||
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common.utils import get_logger, dump_recon_cache, split_path
|
||||
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
||||
Timestamp
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||
HTTP_PRECONDITION_FAILED
|
||||
|
||||
@ -93,65 +95,84 @@ class ObjectExpirer(Daemon):
|
||||
'time': elapsed, 'objects': self.report_objects})
|
||||
self.report_last_time = time()
|
||||
|
||||
def iter_cont_objs_to_expire(self):
|
||||
def round_robin_order(self, task_iter):
|
||||
"""
|
||||
Yields (container, obj) tuples to be deleted
|
||||
Change order of expiration tasks to avoid deleting objects in a
|
||||
certain container continuously.
|
||||
|
||||
:param task_iter: An iterator of delete-task dicts, which should each
|
||||
have a ``target_path`` key.
|
||||
"""
|
||||
obj_cache = {}
|
||||
obj_cache = defaultdict(deque)
|
||||
cnt = 0
|
||||
|
||||
all_containers = set()
|
||||
def dump_obj_cache_in_round_robin():
|
||||
while obj_cache:
|
||||
for key in sorted(obj_cache):
|
||||
if obj_cache[key]:
|
||||
yield obj_cache[key].popleft()
|
||||
else:
|
||||
del obj_cache[key]
|
||||
|
||||
for delete_task in task_iter:
|
||||
try:
|
||||
target_account, target_container, _junk = \
|
||||
split_path('/' + delete_task['target_path'], 3, 3, True)
|
||||
cache_key = '%s/%s' % (target_account, target_container)
|
||||
except ValueError:
|
||||
cache_key = None
|
||||
|
||||
obj_cache[cache_key].append(delete_task)
|
||||
cnt += 1
|
||||
|
||||
if cnt > MAX_OBJECTS_TO_CACHE:
|
||||
for task in dump_obj_cache_in_round_robin():
|
||||
yield task
|
||||
cnt = 0
|
||||
|
||||
for task in dump_obj_cache_in_round_robin():
|
||||
yield task
|
||||
|
||||
def iter_task_containers_to_expire(self):
|
||||
"""
|
||||
Yields container name under the expiring_objects_account if
|
||||
the container name (i.e. timestamp) is past.
|
||||
"""
|
||||
for c in self.swift.iter_containers(self.expiring_objects_account):
|
||||
container = str(c['name'])
|
||||
timestamp = int(container)
|
||||
if timestamp > int(time()):
|
||||
task_container = str(c['name'])
|
||||
timestamp = Timestamp(task_container)
|
||||
if timestamp > Timestamp.now():
|
||||
break
|
||||
all_containers.add(container)
|
||||
yield task_container
|
||||
|
||||
def iter_task_to_expire(self, task_containers):
|
||||
"""
|
||||
Yields task expire info dict which consists of task_container,
|
||||
target_path, timestamp_to_delete, and target_path
|
||||
"""
|
||||
|
||||
for task_container in task_containers:
|
||||
for o in self.swift.iter_objects(self.expiring_objects_account,
|
||||
container):
|
||||
obj = o['name'].encode('utf8')
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
task_container):
|
||||
task_object = o['name'].encode('utf8')
|
||||
delete_timestamp, target_path = task_object.split('-', 1)
|
||||
delete_timestamp = Timestamp(delete_timestamp)
|
||||
if delete_timestamp > Timestamp.now():
|
||||
# we shouldn't yield the object that doesn't reach
|
||||
# the expiration date yet.
|
||||
break
|
||||
try:
|
||||
cust_account, cust_cont, cust_obj = \
|
||||
split_path('/' + actual_obj, 3, 3, True)
|
||||
cache_key = '%s/%s' % (cust_account, cust_cont)
|
||||
except ValueError:
|
||||
cache_key = None
|
||||
|
||||
if self.processes > 0:
|
||||
obj_process = int(
|
||||
hashlib.md5('%s/%s' % (container, obj)).
|
||||
hashlib.md5('%s/%s' % (task_container, task_object)).
|
||||
hexdigest(), 16)
|
||||
if obj_process % self.processes != self.process:
|
||||
continue
|
||||
|
||||
if cache_key not in obj_cache:
|
||||
obj_cache[cache_key] = []
|
||||
obj_cache[cache_key].append((container, obj))
|
||||
cnt += 1
|
||||
|
||||
if cnt > MAX_OBJECTS_TO_CACHE:
|
||||
while obj_cache:
|
||||
for key in obj_cache.keys():
|
||||
if obj_cache[key]:
|
||||
yield obj_cache[key].pop()
|
||||
cnt -= 1
|
||||
else:
|
||||
del obj_cache[key]
|
||||
|
||||
while obj_cache:
|
||||
for key in obj_cache.keys():
|
||||
if obj_cache[key]:
|
||||
yield obj_cache[key].pop()
|
||||
else:
|
||||
del obj_cache[key]
|
||||
|
||||
for container in all_containers:
|
||||
yield (container, None)
|
||||
yield {'task_container': task_container,
|
||||
'task_object': task_object,
|
||||
'target_path': target_path,
|
||||
'delete_timestamp': delete_timestamp}
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""
|
||||
@ -166,7 +187,6 @@ class ObjectExpirer(Daemon):
|
||||
"""
|
||||
self.get_process_values(kwargs)
|
||||
pool = GreenPool(self.concurrency)
|
||||
containers_to_delete = set([])
|
||||
self.report_first_time = self.report_last_time = time()
|
||||
self.report_objects = 0
|
||||
try:
|
||||
@ -178,22 +198,19 @@ class ObjectExpirer(Daemon):
|
||||
'%(objects)s possible objects') % {
|
||||
'containers': containers, 'objects': objects})
|
||||
|
||||
for container, obj in self.iter_cont_objs_to_expire():
|
||||
containers_to_delete.add(container)
|
||||
task_containers = list(self.iter_task_containers_to_expire())
|
||||
|
||||
if not obj:
|
||||
continue
|
||||
# delete_task_iter is a generator to yield a dict of
|
||||
# task_container, task_object, delete_timestamp, target_path
|
||||
# to handle delete actual object and pop the task from the queue.
|
||||
delete_task_iter = self.round_robin_order(
|
||||
self.iter_task_to_expire(task_containers))
|
||||
|
||||
timestamp, actual_obj = obj.split('-', 1)
|
||||
timestamp = int(timestamp)
|
||||
if timestamp > int(time()):
|
||||
break
|
||||
pool.spawn_n(
|
||||
self.delete_object, actual_obj, timestamp,
|
||||
container, obj)
|
||||
for delete_task in delete_task_iter:
|
||||
pool.spawn_n(self.delete_object, **delete_task)
|
||||
|
||||
pool.waitall()
|
||||
for container in containers_to_delete:
|
||||
for container in task_containers:
|
||||
try:
|
||||
self.swift.delete_container(
|
||||
self.expiring_objects_account,
|
||||
@ -257,33 +274,35 @@ class ObjectExpirer(Daemon):
|
||||
raise ValueError(
|
||||
'process must be less than processes')
|
||||
|
||||
def delete_object(self, actual_obj, timestamp, container, obj):
|
||||
def delete_object(self, target_path, delete_timestamp,
|
||||
task_container, task_object):
|
||||
start_time = time()
|
||||
try:
|
||||
try:
|
||||
self.delete_actual_object(actual_obj, timestamp)
|
||||
self.delete_actual_object(target_path, delete_timestamp)
|
||||
except UnexpectedResponse as err:
|
||||
if err.resp.status_int not in {HTTP_NOT_FOUND,
|
||||
HTTP_PRECONDITION_FAILED}:
|
||||
raise
|
||||
if float(timestamp) > time() - self.reclaim_age:
|
||||
if float(delete_timestamp) > time() - self.reclaim_age:
|
||||
# we'll have to retry the DELETE later
|
||||
raise
|
||||
self.pop_queue(container, obj)
|
||||
self.pop_queue(task_container, task_object)
|
||||
self.report_objects += 1
|
||||
self.logger.increment('objects')
|
||||
except UnexpectedResponse as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
'Unexpected response while deleting object %(container)s '
|
||||
'%(obj)s: %(err)s' % {'container': container, 'obj': obj,
|
||||
'err': str(err.resp.status_int)})
|
||||
'%(obj)s: %(err)s' % {
|
||||
'container': task_container, 'obj': task_object,
|
||||
'err': str(err.resp.status_int)})
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(
|
||||
'Exception while deleting object %(container)s %(obj)s'
|
||||
' %(err)s' % {'container': container,
|
||||
'obj': obj, 'err': str(err)})
|
||||
' %(err)s' % {'container': task_container,
|
||||
'obj': task_object, 'err': str(err)})
|
||||
self.logger.timing_since('timing', start_time)
|
||||
self.report()
|
||||
|
||||
@ -304,15 +323,16 @@ class ObjectExpirer(Daemon):
|
||||
|
||||
:param actual_obj: The name of the end-user object to delete:
|
||||
'<account>/<container>/<object>'
|
||||
:param timestamp: The timestamp the X-Delete-At value must match to
|
||||
perform the actual delete.
|
||||
:param timestamp: The swift.common.utils.Timestamp instance the
|
||||
X-Delete-At value must match to perform the actual
|
||||
delete.
|
||||
:raises UnexpectedResponse: if the delete was unsuccessful and
|
||||
should be retried later
|
||||
"""
|
||||
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
|
||||
self.swift.make_request(
|
||||
'DELETE', path,
|
||||
{'X-If-Delete-At': str(timestamp),
|
||||
'X-Timestamp': str(timestamp),
|
||||
{'X-If-Delete-At': timestamp.normal,
|
||||
'X-Timestamp': timestamp.normal,
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'no'},
|
||||
(2, HTTP_CONFLICT))
|
||||
|
@ -26,6 +26,7 @@ import six
|
||||
from six.moves import urllib
|
||||
|
||||
from swift.common import internal_client, utils, swob
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import expirer
|
||||
|
||||
|
||||
@ -215,11 +216,12 @@ class TestObjectExpirer(TestCase):
|
||||
self.deleted_objects = {}
|
||||
self.obj_containers_in_order = []
|
||||
|
||||
def delete_object(self, actual_obj, timestamp, container, obj):
|
||||
if container not in self.deleted_objects:
|
||||
self.deleted_objects[container] = set()
|
||||
self.deleted_objects[container].add(obj)
|
||||
self.obj_containers_in_order.append(container)
|
||||
def delete_object(self, target_path, delete_timestamp,
|
||||
task_container, task_object):
|
||||
if task_container not in self.deleted_objects:
|
||||
self.deleted_objects[task_container] = set()
|
||||
self.deleted_objects[task_container].add(task_object)
|
||||
self.obj_containers_in_order.append(task_container)
|
||||
|
||||
aco_dict = {
|
||||
'.expiring_objects': {
|
||||
@ -321,6 +323,45 @@ class TestObjectExpirer(TestCase):
|
||||
self.assertTrue(
|
||||
'so far' in str(x.logger.get_lines_for_level('info')))
|
||||
|
||||
def test_round_robin_order(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
task_con_obj_list = [
|
||||
# objects in 0000 timestamp container
|
||||
{'task_container': '0000', 'task_object': '0000-a/c0/o0',
|
||||
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'},
|
||||
{'task_container': '0000', 'task_object': '0000-a/c0/o1',
|
||||
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'},
|
||||
# objects in 0001 timestamp container
|
||||
{'task_container': '0001', 'task_object': '0001-a/c1/o0',
|
||||
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'},
|
||||
{'task_container': '0001', 'task_object': '0001-a/c1/o1',
|
||||
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'},
|
||||
# objects in 0002 timestamp container
|
||||
{'task_container': '0002', 'task_object': '0002-a/c2/o0',
|
||||
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'},
|
||||
{'task_container': '0002', 'task_object': '0002-a/c2/o1',
|
||||
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'},
|
||||
]
|
||||
result = list(x.round_robin_order(task_con_obj_list))
|
||||
|
||||
# sorted by poping one object to delete for each target_container
|
||||
expected = [
|
||||
# objects in 0000 timestamp container
|
||||
{'task_container': '0000', 'task_object': '0000-a/c0/o0',
|
||||
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o0'},
|
||||
{'task_container': '0001', 'task_object': '0001-a/c1/o0',
|
||||
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o0'},
|
||||
{'task_container': '0002', 'task_object': '0002-a/c2/o0',
|
||||
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o0'},
|
||||
{'task_container': '0000', 'task_object': '0000-a/c0/o1',
|
||||
'delete_timestamp': Timestamp('0000'), 'target_path': 'a/c0/o1'},
|
||||
{'task_container': '0001', 'task_object': '0001-a/c1/o1',
|
||||
'delete_timestamp': Timestamp('0001'), 'target_path': 'a/c1/o1'},
|
||||
{'task_container': '0002', 'task_object': '0002-a/c2/o1',
|
||||
'delete_timestamp': Timestamp('0002'), 'target_path': 'a/c2/o1'},
|
||||
]
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_run_once_nothing_to_do(self):
|
||||
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
|
||||
x.swift = 'throw error because a string does not have needed methods'
|
||||
@ -621,7 +662,7 @@ class TestObjectExpirer(TestCase):
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
ts = '1234'
|
||||
ts = Timestamp('1234')
|
||||
x.delete_actual_object('/path/to/object', ts)
|
||||
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
|
||||
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
|
||||
@ -640,7 +681,7 @@ class TestObjectExpirer(TestCase):
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
ts = '1234'
|
||||
ts = Timestamp('1234')
|
||||
x.delete_actual_object('/path/to/object name', ts)
|
||||
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
|
||||
self.assertEqual(got_env[0]['HTTP_X_TIMESTAMP'],
|
||||
@ -659,11 +700,12 @@ class TestObjectExpirer(TestCase):
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
ts = Timestamp('1234')
|
||||
if should_raise:
|
||||
with self.assertRaises(internal_client.UnexpectedResponse):
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
x.delete_actual_object('/path/to/object', ts)
|
||||
else:
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
x.delete_actual_object('/path/to/object', ts)
|
||||
self.assertEqual(calls[0], 1)
|
||||
|
||||
# object was deleted and tombstone reaped
|
||||
@ -688,7 +730,7 @@ class TestObjectExpirer(TestCase):
|
||||
x = expirer.ObjectExpirer({})
|
||||
exc = None
|
||||
try:
|
||||
x.delete_actual_object('/path/to/object', '1234')
|
||||
x.delete_actual_object('/path/to/object', Timestamp('1234'))
|
||||
except Exception as err:
|
||||
exc = err
|
||||
finally:
|
||||
@ -697,7 +739,7 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
def test_delete_actual_object_quotes(self):
|
||||
name = 'this name should get quoted'
|
||||
timestamp = '1366063156.863045'
|
||||
timestamp = Timestamp('1366063156.863045')
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.swift.make_request = mock.Mock()
|
||||
x.swift.make_request.return_value.status_int = 204
|
||||
@ -708,7 +750,7 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
def test_delete_actual_object_queue_cleaning(self):
|
||||
name = 'something'
|
||||
timestamp = '1515544858.80602'
|
||||
timestamp = Timestamp('1515544858.80602')
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.swift.make_request = mock.MagicMock()
|
||||
x.delete_actual_object(name, timestamp)
|
||||
|
Loading…
x
Reference in New Issue
Block a user