Merge "Refactor object updater's async-pending finder"

This commit is contained in:
Zuul 2018-06-05 12:14:43 +00:00 committed by Gerrit Code Review
commit 3e9fdfceb4
1 changed files with 49 additions and 36 deletions

View File

@ -28,7 +28,7 @@ from swift.common.constraints import check_drive
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ratelimit_sleep, split_path, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
@ -200,19 +200,14 @@ class ObjectUpdater(Daemon):
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
def object_sweep(self, device):
def _iter_async_pendings(self, device):
"""
If there are async pendings on the device, walk each one and update.
Locate and yield all the async pendings on the device. Multiple updates
for the same object will come out in reverse-chronological order
(i.e. newest first) so that callers can skip stale async_pendings.
:param device: path to device
Tries to clean up empty directories as it goes.
"""
start_time = time.time()
last_status_update = start_time
start_stats = self.stats.copy()
my_pid = os.getpid()
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
# we only care about directories
@ -235,7 +230,6 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
@ -250,34 +244,53 @@ class ObjectUpdater(Daemon):
'name %s')
% (update_path))
continue
if obj_hash == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
policy)
last_obj_hash = obj_hash
self.objects_running_time = ratelimit_sleep(
self.objects_running_time,
self.max_objects_per_second)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
try:
os.rmdir(prefix_path)
except OSError:
pass
def object_sweep(self, device):
"""
If there are async pendings on the device, walk each one and update.
:param device: path to device
"""
start_time = time.time()
last_status_update = start_time
start_stats = self.stats.copy()
my_pid = os.getpid()
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
last_obj_hash = None
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
for update in ap_iter:
if update['obj_hash'] == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update['path'])
else:
self.process_object_update(update['path'], update['device'],
update['policy'])
last_obj_hash = update['obj_hash']
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats)
self.logger.info(