Merge branch 'master' into feature/deep

Most of the conflicts had to do with Sam's recent replicator
improvements.

Related-Change: Ic108f5c38f700ac4c7bcf8315bf4c55306951361
Related-Change: Ib66ee035229d0718b68450587550176c91ce9ca6
Change-Id: I756cf9837799ebb32547e27cbc8b5a980acd3f2f
This commit is contained in:
Tim Burke 2018-04-05 14:12:52 -07:00
commit 04c4ec3f9f
25 changed files with 950 additions and 612 deletions

View File

@ -14,10 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import optparse
from swift.account.replicator import AccountReplicator
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
parser = optparse.OptionParser("%prog CONFIG [options]")
parser.add_option('-d', '--devices',
help=('Replicate only given devices. '
'Comma-separated list. '
'Only has effect if --once is used.'))
parser.add_option('-p', '--partitions',
help=('Replicate only given partitions. '
'Comma-separated list. '
'Only has effect if --once is used.'))
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(AccountReplicator, conf_file, **options)

View File

@ -14,10 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import optparse
from swift.container.replicator import ContainerReplicator
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
parser = optparse.OptionParser("%prog CONFIG [options]")
parser.add_option('-d', '--devices',
help=('Replicate only given devices. '
'Comma-separated list. '
'Only has effect if --once is used.'))
parser.add_option('-p', '--partitions',
help=('Replicate only given partitions. '
'Comma-separated list. '
'Only has effect if --once is used.'))
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ContainerReplicator, conf_file, **options)

View File

@ -163,6 +163,25 @@ use = egg:swift#recon
# Work only with ionice_class.
# ionice_class =
# ionice_priority =
#
# The handoffs_only mode option is for special-case emergency
# situations such as full disks in the cluster. This option SHOULD NOT
# BE ENABLED except in emergencies. When handoffs_only mode is enabled
# the replicator will *only* replicate from handoff nodes to primary
# nodes and will not sync primary nodes with other primary nodes.
#
# This has two main effects: first, the replicator becomes much more
# effective at removing misplaced databases, thereby freeing up disk
# space at a much faster pace than normal. Second, the replicator does
# not sync data between primary nodes, so out-of-sync account and
# container listings will not resolve while handoffs_only is enabled.
#
# This mode is intended to allow operators to temporarily sacrifice
# consistency in order to gain faster rebalancing, such as during a
# capacity addition with nearly-full disks. It is not intended for
# long-term use.
#
# handoffs_only = no
[account-auditor]
# You can override the default log routing for this app here (don't use set!):

View File

@ -176,6 +176,25 @@ use = egg:swift#recon
# Work only with ionice_class.
# ionice_class =
# ionice_priority =
#
# The handoffs_only mode option is for special-case emergency
# situations such as full disks in the cluster. This option SHOULD NOT
# BE ENABLED except in emergencies. When handoffs_only mode is enabled
# the replicator will *only* replicate from handoff nodes to primary
# nodes and will not sync primary nodes with other primary nodes.
#
# This has two main effects: first, the replicator becomes much more
# effective at removing misplaced databases, thereby freeing up disk
# space at a much faster pace than normal. Second, the replicator does
# not sync data between primary nodes, so out-of-sync account and
# container listings will not resolve while handoffs_only is enabled.
#
# This mode is intended to allow operators to temporarily sacrifice
# consistency in order to gain faster rebalancing, such as during a
# capacity addition with nearly-full disks. It is not intended for
# long-term use.
#
# handoffs_only = no
[container-updater]
# You can override the default log routing for this app here (don't use set!):

View File

@ -593,7 +593,7 @@ use = egg:swift#cname_lookup
#
# Specify the nameservers to use to do the CNAME resolution. If unset, the
# system configuration is used. Multiple nameservers can be specified
# separated by a comma. Default port 53 can be overriden. IPv6 is accepted.
# separated by a comma. Default port 53 can be overridden. IPv6 is accepted.
# Example: 127.0.0.1, 127.0.0.2, 127.0.0.3:5353, [::1], [::1]:5353
# nameservers =

View File

@ -17,6 +17,7 @@ Script for generating a form signature for use with FormPost middleware.
"""
from __future__ import print_function
import hmac
import six
from hashlib import sha1
from os.path import basename
from time import time
@ -92,8 +93,14 @@ def main(argv):
print('For example: /v1/account/container')
print(' Or: /v1/account/container/object_prefix')
return 1
sig = hmac.new(key, '%s\n%s\n%s\n%s\n%s' % (path, redirect, max_file_size,
max_file_count, expires),
data = '%s\n%s\n%s\n%s\n%s' % (path, redirect, max_file_size,
max_file_count, expires)
if six.PY3:
data = data if isinstance(data, six.binary_type) else \
data.encode('utf8')
key = key if isinstance(key, six.binary_type) else \
key.encode('utf8')
sig = hmac.new(key, data,
sha1).hexdigest()
print(' Expires:', expires)
print('Signature:', sig)

View File

@ -33,7 +33,8 @@ from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
json, Timestamp, get_db_files, parse_db_filename
json, Timestamp, parse_overrides, round_robin_iter, get_db_files, \
parse_db_filename
from swift.common import ring
from swift.common.ring.utils import is_local_device
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
@ -87,13 +88,13 @@ def roundrobin_datadirs(datadirs):
found (in their proper places). The partitions within each data
dir are walked randomly, however.
:param datadirs: a list of (path, node_id) to walk
:param datadirs: a list of (path, node_id, partition_filter) to walk
:returns: A generator of (partition, path_to_db_file, node_id)
"""
def walk_datadir(datadir, node_id):
def walk_datadir(datadir, node_id, part_filter):
partitions = [pd for pd in os.listdir(datadir)
if looks_like_partition(pd)]
if looks_like_partition(pd) and part_filter(pd)]
random.shuffle(partitions)
for partition in partitions:
part_dir = os.path.join(datadir, partition)
@ -130,13 +131,12 @@ def roundrobin_datadirs(datadirs):
if e.errno != errno.ENOTEMPTY:
raise
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
while its:
for it in its:
try:
yield next(it)
except StopIteration:
its.remove(it)
its = [walk_datadir(datadir, node_id, filt)
for datadir, node_id, filt in datadirs]
rr_its = round_robin_iter(its)
for datadir in rr_its:
yield datadir
class ReplConnection(BufferedHTTPConnection):
@ -211,6 +211,7 @@ class Replicator(Daemon):
self.recon_replicator)
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
self.root, os.path.sep, os.path.sep))
self.handoffs_only = config_true_value(conf.get('handoffs_only', 'no'))
def _zero_stats(self):
"""Zero out the stats."""
@ -731,17 +732,44 @@ class Replicator(Daemon):
return match.groups()[0]
return "UNKNOWN"
def _partition_dir_filter(self, device_id, partitions_to_replicate):
def filt(partition_dir):
partition = int(partition_dir)
if self.handoffs_only:
primary_node_ids = [
d['id'] for d in self.ring.get_part_nodes(partition)]
if device_id in primary_node_ids:
return False
if partition not in partitions_to_replicate:
return False
return True
return filt
def report_up_to_date(self, full_info):
return True
def run_once(self, *args, **kwargs):
"""Run a replication pass once."""
devices_to_replicate, partitions_to_replicate = parse_overrides(
**kwargs)
self._zero_stats()
dirs = []
ips = whataremyips(self.bind_ip)
if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?'))
return
if self.handoffs_only:
self.logger.warning(
'Starting replication pass with handoffs_only enabled. '
'This mode is not intended for normal '
'operation; use handoffs_only with care.')
self._local_device_ids = set()
found_local = False
for node in self.ring.devs:
@ -758,13 +786,20 @@ class Replicator(Daemon):
self.logger.warning(
_('Skipping %(device)s as it is not mounted') % node)
continue
if node['device'] not in devices_to_replicate:
self.logger.debug(
'Skipping device %s due to given arguments',
node['device'])
continue
unlink_older_than(
os.path.join(self.root, node['device'], 'tmp'),
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
part_filt = self._partition_dir_filter(
node['id'], partitions_to_replicate)
dirs.append((datadir, node['id'], part_filt))
if not found_local:
self.logger.error("Can't find itself %s with port %s in ring "
"file, not replicating",
@ -775,6 +810,10 @@ class Replicator(Daemon):
self._replicate_object, part, object_file, node_id)
self.cpool.waitall()
self.logger.info(_('Replication run OVER'))
if self.handoffs_only:
self.logger.warning(
'Finished replication pass with handoffs_only enabled. '
'If handoffs_only is no longer required, disable it.')
self._report_stats()
def run_forever(self, *args, **kwargs):

View File

@ -314,7 +314,7 @@ class Bulk(object):
resp = head_cont_req.get_response(self.app)
if resp.is_success:
return False
if resp.status_int == 404:
if resp.status_int == HTTP_NOT_FOUND:
new_env = req.environ.copy()
new_env['PATH_INFO'] = container_path
new_env['swift.source'] = 'EA'

View File

@ -47,7 +47,7 @@ import ctypes
import ctypes.util
from optparse import OptionParser
from tempfile import mkstemp, NamedTemporaryFile
from tempfile import gettempdir, mkstemp, NamedTemporaryFile
import glob
import itertools
import stat
@ -73,7 +73,7 @@ if not six.PY2:
from six.moves import cPickle as pickle
from six.moves.configparser import (ConfigParser, NoSectionError,
NoOptionError, RawConfigParser)
from six.moves import range
from six.moves import range, http_client
from six.moves.urllib.parse import ParseResult
from six.moves.urllib.parse import quote as _quote
from six.moves.urllib.parse import urlparse as stdlib_urlparse
@ -1788,12 +1788,19 @@ class LogAdapter(logging.LoggerAdapter, object):
emsg = str(exc)
elif exc.errno == errno.ECONNREFUSED:
emsg = _('Connection refused')
elif exc.errno == errno.ECONNRESET:
emsg = _('Connection reset')
elif exc.errno == errno.EHOSTUNREACH:
emsg = _('Host unreachable')
elif exc.errno == errno.ENETUNREACH:
emsg = _('Network unreachable')
elif exc.errno == errno.ETIMEDOUT:
emsg = _('Connection timeout')
else:
call = self._exception
elif isinstance(exc, http_client.BadStatusLine):
# Use error(); not really exceptional
emsg = '%s: %s' % (exc.__class__.__name__, exc.line)
elif isinstance(exc, eventlet.Timeout):
emsg = exc.__class__.__name__
if hasattr(exc, 'seconds'):
@ -3428,6 +3435,15 @@ def get_valid_utf8_str(str_or_unicode):
return valid_unicode_str.encode('utf-8')
class Everything(object):
"""
A container that contains everything. If "e" is an instance of
Everything, then "x in e" is true for all x.
"""
def __contains__(self, element):
return True
def list_from_csv(comma_separated_str):
"""
Splits the str given and returns a properly stripped list of the comma
@ -3438,6 +3454,27 @@ def list_from_csv(comma_separated_str):
return []
def parse_overrides(devices='', partitions='', **kwargs):
"""
Given daemon kwargs parse out device and partition overrides or Everything.
:returns: a tuple of (devices, partitions) which an used like containers to
check if a given partition (integer) or device (string) is "in"
the collection on which we should act.
"""
devices = list_from_csv(devices)
if not devices:
devices = Everything()
partitions = [
int(part) for part in
list_from_csv(partitions)]
if not partitions:
partitions = Everything()
return devices, partitions
def csv_append(csv_string, item):
"""
Appends an item to a comma-separated string.
@ -4841,6 +4878,43 @@ def modify_priority(conf, logger):
_ioprio_set(io_class, io_priority)
def o_tmpfile_in_path_supported(dirpath):
if not hasattr(os, 'O_TMPFILE'):
return False
testfile = os.path.join(dirpath, ".o_tmpfile.test")
hasO_TMPFILE = True
fd = None
try:
fd = os.open(testfile, os.O_CREAT | os.O_WRONLY | os.O_TMPFILE)
except OSError as e:
if e.errno == errno.EINVAL:
hasO_TMPFILE = False
else:
raise Exception("Error on '%(path)s' while checking "
"O_TMPFILE: '%(ex)s'",
{'path': dirpath, 'ex': e})
except Exception as e:
raise Exception("Error on '%(path)s' while checking O_TMPFILE: "
"'%(ex)s'", {'path': dirpath, 'ex': e})
finally:
if fd is not None:
os.close(fd)
# ensure closing the fd will actually remove the file
if os.path.isfile(testfile):
return False
return hasO_TMPFILE
def o_tmpfile_in_tmpdir_supported():
return o_tmpfile_in_path_supported(gettempdir())
def o_tmpfile_supported():
"""
Returns True if O_TMPFILE flag is supported.
@ -5060,6 +5134,20 @@ class ThreadSafeSysLogHandler(SysLogHandler):
self.lock = PipeMutex()
def round_robin_iter(its):
"""
Takes a list of iterators, yield an element from each in a round-robin
fashion until all of them are exhausted.
:param its: list of iterators
"""
while its:
for it in its:
try:
yield next(it)
except StopIteration:
its.remove(it)
def get_redirect_data(response):
"""
Extract a redirect location from a response's headers.

View File

@ -41,7 +41,7 @@ from swift.common.constraints import valid_timestamp, check_utf8, check_drive
from swift.common import constraints
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.http import HTTP_NO_CONTENT, HTTP_NOT_FOUND, is_success
from swift.common.middleware import listing_formats
from swift.common.storage_policy import POLICIES
from swift.common.base_storage_server import BaseStorageServer
@ -657,7 +657,7 @@ class ContainerController(BaseStorageServer):
content_type=out_content_type, charset='utf-8')
ret.last_modified = math.ceil(float(resp_headers['X-PUT-Timestamp']))
if not ret.body:
ret.status_int = 204
ret.status_int = HTTP_NO_CONTENT
return ret
@public

View File

@ -34,8 +34,8 @@ from swift.common.constraints import check_drive, CONTAINER_LISTING_LIMIT
from swift.common.ring.utils import is_local_device
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
config_float_value, config_positive_int_value, list_from_csv, \
quorum_size
config_float_value, config_positive_int_value, \
quorum_size, parse_overrides, Everything
def sharding_enabled(broker):
@ -873,10 +873,10 @@ class ContainerSharder(ContainerReplicator):
finally:
self.logger.increment('scanned')
def _check_node(self, node, override_devices):
def _check_node(self, node, devices_to_shard):
if not node:
return False
if override_devices and node['device'] not in override_devices:
if node['device'] not in devices_to_shard:
return False
if not is_local_device(self.ips, self.port,
node['replication_ip'],
@ -889,8 +889,8 @@ class ContainerSharder(ContainerReplicator):
return False
return True
def _one_shard_cycle(self, override_devices=None,
override_partitions=None):
def _one_shard_cycle(self, devices_to_shard=None,
partitions_to_shard=None):
"""
The main function, everything the sharder does forks from this method.
@ -907,30 +907,31 @@ class ContainerSharder(ContainerReplicator):
self.logger.info('Container sharder cycle starting, auto-sharding %s',
self.auto_shard)
self._zero_stats()
if override_devices:
if isinstance(devices_to_shard, (list, tuple)):
self.logger.info('(Override devices: %s)',
', '.join(override_devices))
if override_partitions:
', '.join(str(d) for d in devices_to_shard))
if isinstance(partitions_to_shard, (list, tuple)):
self.logger.info('(Override partitions: %s)',
', '.join(override_partitions))
', '.join(str(p) for p in partitions_to_shard))
dirs = []
self.shard_cleanups = dict()
self.ips = whataremyips(bind_ip=self.bind_ip)
for node in self.ring.devs:
if not self._check_node(node, override_devices):
if not self._check_node(node, devices_to_shard):
continue
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find
# handoffs for shards later
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
part_filt = self._partition_dir_filter(
node['id'],
partitions_to_shard)
dirs.append((datadir, node['id'], part_filt))
if not dirs:
self.logger.warning('Found no data dirs!')
for part, path, node_id in db_replicator.roundrobin_datadirs(dirs):
# NB: get_part_nodes always provides an 'index' key
if override_partitions and part not in override_partitions:
continue
for node in self.ring.get_part_nodes(int(part)):
if node['id'] == node_id:
break
@ -1359,7 +1360,8 @@ class ContainerSharder(ContainerReplicator):
while True:
begin = time.time()
try:
self._one_shard_cycle()
self._one_shard_cycle(devices_to_shard=Everything(),
partitions_to_shard=Everything())
except (Exception, Timeout):
self.logger.increment('errors')
self.logger.exception('Exception in sharder')
@ -1372,11 +1374,11 @@ class ContainerSharder(ContainerReplicator):
def run_once(self, *args, **kwargs):
"""Run the container sharder once."""
self.logger.info('Begin container sharder "once" mode')
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
devices_to_shard, partitions_to_shard = parse_overrides(
**kwargs)
begin = self.reported = time.time()
self._one_shard_cycle(override_devices=override_devices,
override_partitions=override_partitions)
self._one_shard_cycle(devices_to_shard=devices_to_shard,
partitions_to_shard=partitions_to_shard)
elapsed = time.time() - begin
self.logger.info(
'Container sharder "once" mode completed: %.02fs', elapsed)

View File

@ -27,7 +27,7 @@ from eventlet import Timeout
from swift.obj import diskfile, replicator
from swift.common.utils import (
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
unlink_paths_older_than, readconf, config_auto_int_value)
unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter)
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
DiskFileDeleted, DiskFileExpired
from swift.common.daemon import Daemon
@ -120,18 +120,17 @@ class AuditorWorker(object):
total_quarantines = 0
total_errors = 0
time_auditing = 0
# TODO: we should move audit-location generation to the storage policy,
# as we may (conceivably) have a different filesystem layout for each.
# We'd still need to generate the policies to audit from the actual
# directories found on-disk, and have appropriate error reporting if we
# find a directory that doesn't correspond to any known policy. This
# will require a sizable refactor, but currently all diskfile managers
# can find all diskfile locations regardless of policy -- so for now
# just use Policy-0's manager.
all_locs = (self.diskfile_router[POLICIES[0]]
# get AuditLocations for each policy
loc_generators = []
for policy in POLICIES:
loc_generators.append(
self.diskfile_router[policy]
.object_audit_location_generator(
device_dirs=device_dirs,
policy, device_dirs=device_dirs,
auditor_type=self.auditor_type))
all_locs = round_robin_iter(loc_generators)
for location in all_locs:
loop_time = time.time()
self.failsafe_object_audit(location)
@ -192,8 +191,11 @@ class AuditorWorker(object):
self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
# Unset remaining partitions to not skip them in the next run
diskfile.clear_auditor_status(self.devices, self.auditor_type)
for policy in POLICIES:
# Unset remaining partitions to not skip them in the next run
self.diskfile_router[policy].clear_auditor_status(
policy,
self.auditor_type)
def record_stats(self, obj_size):
"""

View File

@ -453,18 +453,20 @@ class AuditLocation(object):
return str(self.path)
def object_audit_location_generator(devices, mount_check=True, logger=None,
device_dirs=None, auditor_type="ALL"):
def object_audit_location_generator(devices, datadir, mount_check=True,
logger=None, device_dirs=None,
auditor_type="ALL"):
"""
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
objects stored under that directory if device_dirs isn't set. If
device_dirs is set, only yield AuditLocation for the objects under the
entries in device_dirs. The AuditLocation only knows the path to the hash
directory, not to the .data file therein (if any). This is to avoid a
double listdir(hash_dir); the DiskFile object will always do one, so
we don't.
objects stored under that directory for the given datadir (policy),
if device_dirs isn't set. If device_dirs is set, only yield AuditLocation
for the objects under the entries in device_dirs. The AuditLocation only
knows the path to the hash directory, not to the .data file therein
(if any). This is to avoid a double listdir(hash_dir); the DiskFile object
will always do one, so we don't.
:param devices: parent directory of the devices to be audited
:param datadir: objects directory
:param mount_check: flag to check if a mount check should be performed
on devices
:param logger: a logger object
@ -480,6 +482,7 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
# randomize devices in case of process restart before sweep completed
shuffle(device_dirs)
base, policy = split_policy_string(datadir)
for device in device_dirs:
if not check_drive(devices, device, mount_check):
if logger:
@ -487,55 +490,37 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
'Skipping %s as it is not %s', device,
'mounted' if mount_check else 'a dir')
continue
# loop through object dirs for all policies
device_dir = os.path.join(devices, device)
try:
dirs = os.listdir(device_dir)
except OSError as e:
if logger:
logger.debug(
_('Skipping %(dir)s: %(err)s') % {'dir': device_dir,
'err': e.strerror})
datadir_path = os.path.join(devices, device, datadir)
if not os.path.exists(datadir_path):
continue
for dir_ in dirs:
if not dir_.startswith(DATADIR_BASE):
continue
partitions = get_auditor_status(datadir_path, logger, auditor_type)
for pos, partition in enumerate(partitions):
update_auditor_status(datadir_path, logger,
partitions[pos:], auditor_type)
part_path = os.path.join(datadir_path, partition)
try:
base, policy = split_policy_string(dir_)
except PolicyError as e:
if logger:
logger.warning(_('Directory %(directory)r does not map '
'to a valid policy (%(error)s)') % {
'directory': dir_, 'error': e})
suffixes = listdir(part_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
datadir_path = os.path.join(devices, device, dir_)
partitions = get_auditor_status(datadir_path, logger, auditor_type)
for pos, partition in enumerate(partitions):
update_auditor_status(datadir_path, logger,
partitions[pos:], auditor_type)
part_path = os.path.join(datadir_path, partition)
for asuffix in suffixes:
suff_path = os.path.join(part_path, asuffix)
try:
suffixes = listdir(part_path)
hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
for asuffix in suffixes:
suff_path = os.path.join(part_path, asuffix)
try:
hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
for hsh in hashes:
hsh_path = os.path.join(suff_path, hsh)
yield AuditLocation(hsh_path, device, partition,
policy)
for hsh in hashes:
hsh_path = os.path.join(suff_path, hsh)
yield AuditLocation(hsh_path, device, partition,
policy)
update_auditor_status(datadir_path, logger, [], auditor_type)
update_auditor_status(datadir_path, logger, [], auditor_type)
def get_auditor_status(datadir_path, logger, auditor_type):
@ -589,15 +574,13 @@ def update_auditor_status(datadir_path, logger, partitions, auditor_type):
{'auditor_status': auditor_status, 'err': e})
def clear_auditor_status(devices, auditor_type="ALL"):
for device in os.listdir(devices):
for dir_ in os.listdir(os.path.join(devices, device)):
if not dir_.startswith("objects"):
continue
datadir_path = os.path.join(devices, device, dir_)
auditor_status = os.path.join(
datadir_path, "auditor_status_%s.json" % auditor_type)
remove_file(auditor_status)
def clear_auditor_status(devices, datadir, auditor_type="ALL"):
device_dirs = listdir(devices)
for device in device_dirs:
datadir_path = os.path.join(devices, device, datadir)
auditor_status = os.path.join(
datadir_path, "auditor_status_%s.json" % auditor_type)
remove_file(auditor_status)
def strip_self(f):
@ -1340,15 +1323,22 @@ class BaseDiskFileManager(object):
pipe_size=self.pipe_size,
use_linkat=self.use_linkat, **kwargs)
def object_audit_location_generator(self, device_dirs=None,
def clear_auditor_status(self, policy, auditor_type="ALL"):
datadir = get_data_dir(policy)
clear_auditor_status(self.devices, datadir, auditor_type)
def object_audit_location_generator(self, policy, device_dirs=None,
auditor_type="ALL"):
"""
Yield an AuditLocation for all objects stored under device_dirs.
:param policy: the StoragePolicy instance
:param device_dirs: directory of target device
:param auditor_type: either ALL or ZBF
"""
return object_audit_location_generator(self.devices, self.mount_check,
datadir = get_data_dir(policy)
return object_audit_location_generator(self.devices, datadir,
self.mount_check,
self.logger, device_dirs,
auditor_type)

View File

@ -25,9 +25,8 @@ import six.moves.cPickle as pickle
from swift import gettext_ as _
import eventlet
from eventlet import GreenPool, tpool, Timeout, sleep
from eventlet import GreenPool, queue, tpool, Timeout, sleep
from eventlet.green import subprocess
from eventlet.support.greenlets import GreenletExit
from swift.common.constraints import check_drive
from swift.common.ring.utils import is_local_device
@ -90,7 +89,6 @@ class ObjectReplicator(Daemon):
if not self.rsync_module:
self.rsync_module = '{replication_ip}::object'
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
@ -113,6 +111,7 @@ class ObjectReplicator(Daemon):
'handoff_delete before the next '
'normal rebalance')
self._df_router = DiskFileRouter(conf, self.logger)
self._child_process_reaper_queue = queue.LightQueue()
def _zero_stats(self):
"""Zero out the stats."""
@ -138,6 +137,36 @@ class ObjectReplicator(Daemon):
my_replication_ips.add(local_dev['replication_ip'])
return list(my_replication_ips)
def _child_process_reaper(self):
"""
Consume processes from self._child_process_reaper_queue and wait() for
them
"""
procs = set()
done = False
while not done:
timeout = 60 if procs else None
try:
new_proc = self._child_process_reaper_queue.get(
timeout=timeout)
if new_proc is not None:
procs.add(new_proc)
else:
done = True
except queue.Empty:
pass
reaped_procs = set()
for proc in procs:
try:
# this will reap the process if it has exited, but
# otherwise will not wait
proc.wait(timeout=0)
reaped_procs.add(proc)
except subprocess.TimeoutExpired:
pass
procs -= reaped_procs
# Just exists for doc anchor point
def sync(self, node, job, suffixes, *args, **kwargs):
"""
@ -169,7 +198,7 @@ class ObjectReplicator(Daemon):
:returns: return code of rsync process. 0 is successful
"""
start_time = time.time()
proc = ret_val = None
proc = None
try:
with Timeout(self.rsync_timeout):
@ -178,25 +207,28 @@ class ObjectReplicator(Daemon):
stderr=subprocess.STDOUT)
results = proc.stdout.read()
ret_val = proc.wait()
except GreenletExit:
self.logger.error(_("Killing by lockup detector"))
if proc:
# Assume rsync is still responsive and give it a chance
# to shut down gracefully
proc.terminate()
# Final good-faith effort to clean up the process table.
# Note that this blocks, but worst-case we wait for the
# lockup detector to come around and kill us. This can
# happen if the process is stuck down in kernel-space
# waiting on I/O or something.
proc.wait()
raise
except Timeout:
self.logger.error(_("Killing long-running rsync: %s"), str(args))
if proc:
proc.kill()
proc.wait()
try:
# Note: Python 2.7's subprocess.Popen class doesn't take
# any arguments for wait(), but Python 3's does.
# However, Eventlet's replacement Popen takes a timeout
# argument regardless of Python version, so we don't
# need any conditional code here.
proc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
# Sometimes a process won't die immediately even after a
# SIGKILL. This can be due to failing disks, high load,
# or other reasons. We can't wait for it forever since
# we're taking up a slot in the (green)thread pool, so
# we send it over to another greenthread, not part of
# our pool, whose sole duty is to wait for child
# processes to exit.
self._child_process_reaper_queue.put(proc)
return 1 # failure response code
total_time = time.time() - start_time
for result in results.split('\n'):
if result == '':
@ -554,14 +586,6 @@ class ObjectReplicator(Daemon):
_("Nothing replicated for %s seconds."),
(time.time() - self.start))
def kill_coros(self):
"""Utility function that kills all coroutines currently running."""
for coro in list(self.run_pool.coroutines_running):
try:
coro.kill(GreenletExit)
except GreenletExit:
pass
def heartbeat(self):
"""
Loop that runs in the background during replication. It periodically
@ -571,19 +595,6 @@ class ObjectReplicator(Daemon):
eventlet.sleep(self.stats_interval)
self.stats_line()
def detect_lockups(self):
"""
In testing, the pool.waitall() call very occasionally failed to return.
This is an attempt to make sure the replicator finishes its replication
pass in some eventuality.
"""
while True:
eventlet.sleep(self.lockup_timeout)
if self.replication_count == self.last_replication_count:
self.logger.error(_("Lockup detected.. killing live coros."))
self.kill_coros()
self.last_replication_count = self.replication_count
def build_replication_jobs(self, policy, ips, override_devices=None,
override_partitions=None):
"""
@ -734,7 +745,6 @@ class ObjectReplicator(Daemon):
self.handoffs_remaining = 0
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle
current_nodes = None
@ -745,11 +755,6 @@ class ObjectReplicator(Daemon):
override_policies=override_policies)
for job in jobs:
current_nodes = job['nodes']
if override_devices and job['device'] not in override_devices:
continue
if override_partitions and \
job['partition'] not in override_partitions:
continue
dev_path = check_drive(self.devices_dir, job['device'],
self.mount_check)
if not dev_path:
@ -788,8 +793,7 @@ class ObjectReplicator(Daemon):
else:
self.run_pool.spawn(self.update, job)
current_nodes = None
with Timeout(self.lockup_timeout):
self.run_pool.waitall()
self.run_pool.waitall()
except (Exception, Timeout):
if current_nodes:
self._add_failure_stats([(failure_dev['replication_ip'],
@ -798,14 +802,14 @@ class ObjectReplicator(Daemon):
else:
self._add_failure_stats(self.all_devs_info)
self.logger.exception(_("Exception in top-level replication loop"))
self.kill_coros()
finally:
stats.kill()
lockup_detector.kill()
self.stats_line()
self.stats['attempted'] = self.replication_count
def run_once(self, *args, **kwargs):
rsync_reaper = eventlet.spawn(self._child_process_reaper)
self._zero_stats()
self.logger.info(_("Running object replicator in script mode."))
@ -835,8 +839,14 @@ class ObjectReplicator(Daemon):
'object_replication_last': replication_last},
self.rcache, self.logger)
# Give rsync processes one last chance to exit, then bail out and
# let them be init's problem
self._child_process_reaper_queue.put(None)
rsync_reaper.wait()
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object replicator in daemon mode."))
eventlet.spawn_n(self._child_process_reaper)
# Run the replicator continually
while True:
self._zero_stats()

View File

@ -587,10 +587,8 @@ class TestContainerTempurl(Base):
def test_tempurl_keys_hidden_from_acl_readonly(self):
if not tf.cluster_info.get('tempauth'):
raise SkipTest('TEMP AUTH SPECIFIC TEST')
original_token = self.env.container.conn.storage_token
self.env.container.conn.storage_token = self.env.conn2.storage_token
metadata = self.env.container.info()
self.env.container.conn.storage_token = original_token
metadata = self.env.container.info(cfg={
'use_token': self.env.conn2.storage_token})
self.assertNotIn(
'tempurl_key', metadata,

View File

@ -1090,6 +1090,15 @@ class Timeout(object):
raise TimeoutException
def requires_o_tmpfile_support_in_tmp(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not utils.o_tmpfile_in_tmpdir_supported():
raise SkipTest('Requires O_TMPFILE support in TMPDIR')
return func(*args, **kwargs)
return wrapper
def requires_o_tmpfile_support(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):

View File

@ -17,7 +17,7 @@
import hashlib
import hmac
import mock
from six import StringIO
import six
import unittest
from swift.cli import form_signature
@ -33,14 +33,19 @@ class TestFormSignature(unittest.TestCase):
max_file_size = str(int(1024 * 1024 * 1024 * 3.14159)) # π GiB
max_file_count = '3'
expected_signature = hmac.new(
key,
"\n".join((
path, redirect, max_file_size, max_file_count,
str(int(the_time + expires)))),
hashlib.sha1).hexdigest()
data = "\n".join((
path, redirect, max_file_size, max_file_count,
str(int(the_time + expires))))
out = StringIO()
if six.PY3:
data = data if isinstance(data, six.binary_type) else \
data.encode('utf8')
key = key if isinstance(key, six.binary_type) else \
key.encode('utf8')
expected_signature = hmac.new(key, data, hashlib.sha1).hexdigest()
out = six.StringIO()
with mock.patch('swift.cli.form_signature.time', lambda: the_time):
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([
@ -59,7 +64,7 @@ class TestFormSignature(unittest.TestCase):
self.assertIn(sig_input, out.getvalue())
def test_too_few_args(self):
out = StringIO()
out = six.StringIO()
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([
'/path/to/swift-form-signature',
@ -70,7 +75,7 @@ class TestFormSignature(unittest.TestCase):
self.assertIn(usage, out.getvalue())
def test_invalid_filesize_arg(self):
out = StringIO()
out = six.StringIO()
key = 'secret squirrel'
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([
@ -79,7 +84,7 @@ class TestFormSignature(unittest.TestCase):
self.assertNotEqual(exitcode, 0)
def test_invalid_filecount_arg(self):
out = StringIO()
out = six.StringIO()
key = 'secret squirrel'
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([
@ -88,7 +93,7 @@ class TestFormSignature(unittest.TestCase):
self.assertNotEqual(exitcode, 0)
def test_invalid_path_arg(self):
out = StringIO()
out = six.StringIO()
key = 'secret squirrel'
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([
@ -97,7 +102,7 @@ class TestFormSignature(unittest.TestCase):
self.assertNotEqual(exitcode, 0)
def test_invalid_seconds_arg(self):
out = StringIO()
out = six.StringIO()
key = 'secret squirrel'
with mock.patch('sys.stdout', out):
exitcode = form_signature.main([

View File

@ -1375,7 +1375,8 @@ class TestDBReplicator(unittest.TestCase):
self.assertTrue(os.path.isdir(dirpath))
node_id = 1
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
results = list(db_replicator.roundrobin_datadirs(
[(datadir, node_id, lambda p: True)]))
expected = [
('450', os.path.join(datadir, db_path), node_id),
]
@ -1396,12 +1397,14 @@ class TestDBReplicator(unittest.TestCase):
self.assertEqual({'18', '1054', '1060', '450'},
set(os.listdir(datadir)))
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
results = list(db_replicator.roundrobin_datadirs(
[(datadir, node_id, lambda p: True)]))
self.assertEqual(results, expected)
self.assertEqual({'1054', '1060', '450'},
set(os.listdir(datadir)))
results = list(db_replicator.roundrobin_datadirs([(datadir, node_id)]))
results = list(db_replicator.roundrobin_datadirs(
[(datadir, node_id, lambda p: True)]))
self.assertEqual(results, expected)
# non db file in '1060' dir is not deleted and exception is handled
self.assertEqual({'1060', '450'},
@ -1488,8 +1491,8 @@ class TestDBReplicator(unittest.TestCase):
mock.patch(base + 'random.shuffle', _shuffle), \
mock.patch(base + 'os.rmdir', _rmdir):
datadirs = [('/srv/node/sda/containers', 1),
('/srv/node/sdb/containers', 2)]
datadirs = [('/srv/node/sda/containers', 1, lambda p: True),
('/srv/node/sdb/containers', 2, lambda p: True)]
results = list(db_replicator.roundrobin_datadirs(datadirs))
# The results show that the .db files are returned, the devices
# interleaved.
@ -1593,6 +1596,215 @@ class TestDBReplicator(unittest.TestCase):
mock.call(node, partition, expected_hsh, replicator.logger)])
class TestHandoffsOnly(unittest.TestCase):
class FakeRing3Nodes(object):
_replicas = 3
# Three nodes, two disks each
devs = [
dict(id=0, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.1', port=6201,
replication_ip='10.0.0.1', replication_port=6201,
device='sdp'),
dict(id=1, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.1', port=6201,
replication_ip='10.0.0.1', replication_port=6201,
device='sdq'),
dict(id=2, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.2', port=6201,
replication_ip='10.0.0.2', replication_port=6201,
device='sdp'),
dict(id=3, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.2', port=6201,
replication_ip='10.0.0.2', replication_port=6201,
device='sdq'),
dict(id=4, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.3', port=6201,
replication_ip='10.0.0.3', replication_port=6201,
device='sdp'),
dict(id=5, region=1, zone=1,
meta='', weight=500.0, ip='10.0.0.3', port=6201,
replication_ip='10.0.0.3', replication_port=6201,
device='sdq'),
]
def __init__(self, *a, **kw):
pass
def get_part(self, account, container=None, obj=None):
return 0
def get_part_nodes(self, part):
nodes = []
for offset in range(self._replicas):
i = (part + offset) % len(self.devs)
nodes.append(self.devs[i])
return nodes
def get_more_nodes(self, part):
for offset in range(self._replicas, len(self.devs)):
i = (part + offset) % len(self.devs)
yield self.devs[i]
def _make_fake_db(self, disk, partition, db_hash):
directories = [
os.path.join(self.root, disk),
os.path.join(self.root, disk, 'containers'),
os.path.join(self.root, disk, 'containers', str(partition)),
os.path.join(self.root, disk, 'containers', str(partition),
db_hash[-3:]),
os.path.join(self.root, disk, 'containers', str(partition),
db_hash[-3:], db_hash)]
for d in directories:
try:
os.mkdir(d)
except OSError as err:
if err.errno != errno.EEXIST:
raise
file_path = os.path.join(directories[-1], db_hash + ".db")
with open(file_path, 'w'):
pass
def setUp(self):
self.root = mkdtemp()
# object disks; they're just here to make sure they don't trip us up
os.mkdir(os.path.join(self.root, 'sdc'))
os.mkdir(os.path.join(self.root, 'sdc', 'objects'))
os.mkdir(os.path.join(self.root, 'sdd'))
os.mkdir(os.path.join(self.root, 'sdd', 'objects'))
# part 0 belongs on sdp
self._make_fake_db('sdp', 0, '010101013cf2b7979af9eaa71cb67220')
# part 1 does not belong on sdp
self._make_fake_db('sdp', 1, 'abababab2b5368158355e799323b498d')
# part 1 belongs on sdq
self._make_fake_db('sdq', 1, '02020202e30f696a3cfa63d434a3c94e')
# part 2 does not belong on sdq
self._make_fake_db('sdq', 2, 'bcbcbcbc15d3835053d568c57e2c83b5')
def cleanUp(self):
rmtree(self.root, ignore_errors=True)
def test_scary_warnings(self):
logger = unit.FakeLogger()
replicator = TestReplicator({
'handoffs_only': 'yes',
'devices': self.root,
'bind_port': 6201,
'mount_check': 'no',
}, logger=logger)
with patch.object(db_replicator, 'whataremyips',
return_value=['10.0.0.1']), \
patch.object(replicator, '_replicate_object'), \
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
replicator.run_once()
self.assertEqual(
logger.get_lines_for_level('warning'),
[('Starting replication pass with handoffs_only enabled. This '
'mode is not intended for normal operation; use '
'handoffs_only with care.'),
('Finished replication pass with handoffs_only enabled. '
'If handoffs_only is no longer required, disable it.')])
def test_skips_primary_partitions(self):
replicator = TestReplicator({
'handoffs_only': 'yes',
'devices': self.root,
'bind_port': 6201,
'mount_check': 'no',
})
with patch.object(db_replicator, 'whataremyips',
return_value=['10.0.0.1']), \
patch.object(replicator, '_replicate_object') as mock_repl, \
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
replicator.run_once()
self.assertEqual(sorted(mock_repl.mock_calls), [
mock.call('1', os.path.join(
self.root, 'sdp', 'containers', '1', '98d',
'abababab2b5368158355e799323b498d',
'abababab2b5368158355e799323b498d.db'), 0),
mock.call('2', os.path.join(
self.root, 'sdq', 'containers', '2', '3b5',
'bcbcbcbc15d3835053d568c57e2c83b5',
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
def test_override_partitions(self):
replicator = TestReplicator({
'devices': self.root,
'bind_port': 6201,
'mount_check': 'no',
})
with patch.object(db_replicator, 'whataremyips',
return_value=['10.0.0.1']), \
patch.object(replicator, '_replicate_object') as mock_repl, \
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
replicator.run_once(partitions="0,2")
self.assertEqual(sorted(mock_repl.mock_calls), [
mock.call('0', os.path.join(
self.root, 'sdp', 'containers', '0', '220',
'010101013cf2b7979af9eaa71cb67220',
'010101013cf2b7979af9eaa71cb67220.db'), 0),
mock.call('2', os.path.join(
self.root, 'sdq', 'containers', '2', '3b5',
'bcbcbcbc15d3835053d568c57e2c83b5',
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
def test_override_devices(self):
replicator = TestReplicator({
'devices': self.root,
'bind_port': 6201,
'mount_check': 'no',
})
with patch.object(db_replicator, 'whataremyips',
return_value=['10.0.0.1']), \
patch.object(replicator, '_replicate_object') as mock_repl, \
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
replicator.run_once(devices="sdp")
self.assertEqual(sorted(mock_repl.mock_calls), [
mock.call('0', os.path.join(
self.root, 'sdp', 'containers', '0', '220',
'010101013cf2b7979af9eaa71cb67220',
'010101013cf2b7979af9eaa71cb67220.db'), 0),
mock.call('1', os.path.join(
self.root, 'sdp', 'containers', '1', '98d',
'abababab2b5368158355e799323b498d',
'abababab2b5368158355e799323b498d.db'), 0)])
def test_override_devices_and_partitions(self):
replicator = TestReplicator({
'devices': self.root,
'bind_port': 6201,
'mount_check': 'no',
})
with patch.object(db_replicator, 'whataremyips',
return_value=['10.0.0.1']), \
patch.object(replicator, '_replicate_object') as mock_repl, \
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
replicator.run_once(partitions="0,2", devices="sdp")
self.assertEqual(sorted(mock_repl.mock_calls), [
mock.call('0', os.path.join(
self.root, 'sdp', 'containers', '0', '220',
'010101013cf2b7979af9eaa71cb67220',
'010101013cf2b7979af9eaa71cb67220.db'), 0)])
class TestReplToNode(unittest.TestCase):
def setUp(self):
db_replicator.ring = FakeRing()

View File

@ -25,7 +25,7 @@ from tempfile import gettempdir
from swift.common.linkat import linkat
from swift.common.utils import O_TMPFILE
from test.unit import requires_o_tmpfile_support
from test.unit import requires_o_tmpfile_support_in_tmp
class TestLinkat(unittest.TestCase):
@ -38,7 +38,7 @@ class TestLinkat(unittest.TestCase):
def test_available(self):
self.assertFalse(linkat.available)
@requires_o_tmpfile_support
@requires_o_tmpfile_support_in_tmp
def test_errno(self):
with open('/dev/null', 'r') as fd:
self.assertRaises(IOError, linkat,
@ -77,7 +77,7 @@ class TestLinkat(unittest.TestCase):
mock_cdll.assert_called_once_with(libc_name, use_errno=True)
self.assertTrue(libc.linkat_retrieved)
@requires_o_tmpfile_support
@requires_o_tmpfile_support_in_tmp
def test_linkat_success(self):
fd = None

View File

@ -47,6 +47,7 @@ import inspect
import six
from six import BytesIO, StringIO
from six.moves.queue import Queue, Empty
from six.moves import http_client
from six.moves import range
from textwrap import dedent
@ -76,7 +77,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import POLICIES, reload_storage_policies
from swift.common.swob import Request, Response
from test.unit import FakeLogger, requires_o_tmpfile_support, \
quiet_eventlet_exceptions
requires_o_tmpfile_support_in_tmp, quiet_eventlet_exceptions
threading = eventlet.patcher.original('threading')
@ -1719,6 +1720,13 @@ class TestUtils(unittest.TestCase):
self.assertTrue('my error message' in log_msg)
message_timeout.cancel()
# test BadStatusLine
log_exception(http_client.BadStatusLine(''))
log_msg = strip_value(sio)
self.assertNotIn('Traceback', log_msg)
self.assertIn('BadStatusLine', log_msg)
self.assertIn("''", log_msg)
# test unhandled
log_exception(Exception('my error message'))
log_msg = strip_value(sio)
@ -3609,6 +3617,22 @@ cluster_dfw1 = http://dfw1.host/v1/
utils.get_hmac('GET', '/path', 1, 'abc'),
'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f')
def test_parse_overrides(self):
devices, partitions = utils.parse_overrides(devices='sdb1,sdb2')
self.assertIn('sdb1', devices)
self.assertIn('sdb2', devices)
self.assertNotIn('sdb3', devices)
self.assertIn(1, partitions)
self.assertIn('1', partitions) # matches because of Everything
self.assertIn(None, partitions) # matches because of Everything
devices, partitions = utils.parse_overrides(partitions='1,2,3')
self.assertIn('sdb1', devices)
self.assertIn('1', devices) # matches because of Everything
self.assertIn(None, devices) # matches because of Everything
self.assertIn(1, partitions)
self.assertNotIn('1', partitions)
self.assertNotIn(None, partitions)
def test_get_policy_index(self):
# Account has no information about a policy
req = Request.blank(
@ -3929,7 +3953,7 @@ cluster_dfw1 = http://dfw1.host/v1/
patch('platform.architecture', return_value=('64bit', '')):
self.assertRaises(OSError, utils.NR_ioprio_set)
@requires_o_tmpfile_support
@requires_o_tmpfile_support_in_tmp
def test_link_fd_to_path_linkat_success(self):
tempdir = mkdtemp()
fd = os.open(tempdir, utils.O_TMPFILE | os.O_WRONLY)
@ -3949,7 +3973,7 @@ cluster_dfw1 = http://dfw1.host/v1/
os.close(fd)
shutil.rmtree(tempdir)
@requires_o_tmpfile_support
@requires_o_tmpfile_support_in_tmp
def test_link_fd_to_path_target_exists(self):
tempdir = mkdtemp()
# Create and write to a file
@ -3984,7 +4008,7 @@ cluster_dfw1 = http://dfw1.host/v1/
self.fail("Expecting IOError exception")
self.assertTrue(_m_linkat.called)
@requires_o_tmpfile_support
@requires_o_tmpfile_support_in_tmp
def test_linkat_race_dir_not_exists(self):
tempdir = mkdtemp()
target_dir = os.path.join(tempdir, uuid4().hex)
@ -4102,6 +4126,19 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
def test_round_robin_iter(self):
it1 = iter([1, 2, 3])
it2 = iter([4, 5])
it3 = iter([6, 7, 8, 9])
it4 = iter([])
rr_its = utils.round_robin_iter([it1, it2, it3, it4])
got = list(rr_its)
# Expect that items get fetched in a round-robin fashion from the
# iterators
self.assertListEqual([1, 4, 6, 2, 5, 7, 3, 8, 9], got)
@with_tempdir
def test_get_db_files(self, tempdir):
dbdir = os.path.join(tempdir, 'dbdir')

View File

@ -852,7 +852,7 @@ class TestAuditor(unittest.TestCase):
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
clear_auditor_status(self.devices)
clear_auditor_status(self.devices, 'objects')
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))

View File

@ -375,17 +375,6 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"fcd938702024c25fef6c32fef05298eb"))
os.makedirs(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
"4a943bc72c2e647c4675923d58cf4ca5"))
os.makedirs(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb",
"fcd938702024c25fef6c32fef05298eb"))
os.makedirs(os.path.join(tmpdir, "sdq", "objects-99", "9972",
"8eb",
"fcd938702024c25fef6c32fef05298eb"))
# the bad
os.makedirs(os.path.join(tmpdir, "sdq", "objects-", "1135",
"6c3",
"fcd938702024c25fef6c32fef05298eb"))
os.makedirs(os.path.join(tmpdir, "sdq", "objects-fud", "foo"))
os.makedirs(os.path.join(tmpdir, "sdq", "objects-+1", "foo"))
self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519",
"fed"))
@ -404,26 +393,18 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"4f9eee668b66c6f0250bfa3c7ab9e51e"))
logger = debug_logger()
locations = [(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=False,
logger=logger)]
locations.sort()
loc_generators = []
datadirs = ["objects", "objects-1"]
for datadir in datadirs:
loc_generators.append(
diskfile.object_audit_location_generator(
devices=tmpdir, datadir=datadir, mount_check=False,
logger=logger))
# expect some warnings about those bad dirs
warnings = logger.get_lines_for_level('warning')
self.assertEqual(set(warnings), set([
("Directory 'objects-' does not map to a valid policy "
"(Unknown policy, for index '')"),
("Directory 'objects-2' does not map to a valid policy "
"(Unknown policy, for index '2')"),
("Directory 'objects-99' does not map to a valid policy "
"(Unknown policy, for index '99')"),
("Directory 'objects-fud' does not map to a valid policy "
"(Unknown policy, for index 'fud')"),
("Directory 'objects-+1' does not map to a valid policy "
"(Unknown policy, for index '+1')"),
]))
all_locs = itertools.chain(*loc_generators)
locations = [(loc.path, loc.device, loc.partition, loc.policy) for
loc in all_locs]
locations.sort()
expected = \
[(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
@ -448,12 +429,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
self.assertEqual(locations, expected)
# Reset status file for next run
diskfile.clear_auditor_status(tmpdir)
for datadir in datadirs:
diskfile.clear_auditor_status(tmpdir, datadir)
# now without a logger
locations = [(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=False)]
for datadir in datadirs:
loc_generators.append(
diskfile.object_audit_location_generator(
devices=tmpdir, datadir=datadir, mount_check=False,
logger=logger))
all_locs = itertools.chain(*loc_generators)
locations = [(loc.path, loc.device, loc.partition, loc.policy) for
loc in all_locs]
locations.sort()
self.assertEqual(locations, expected)
@ -470,7 +458,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=True)]
devices=tmpdir, datadir="objects", mount_check=True)]
locations.sort()
self.assertEqual(
@ -485,7 +473,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=True, logger=logger)]
devices=tmpdir, datadir="objects", mount_check=True,
logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping sdq as it is not mounted',
@ -502,7 +491,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=False)]
devices=tmpdir, datadir="objects", mount_check=False)]
self.assertEqual(
locations,
@ -516,30 +505,22 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=False, logger=logger)]
devices=tmpdir, datadir="objects", mount_check=False,
logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping garbage as it is not a dir',
], debug_lines)
logger.clear()
with mock_check_drive(isdir=True):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=False, logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping %s: Not a directory' % os.path.join(
tmpdir, "garbage"),
], debug_lines)
logger.clear()
with mock_check_drive() as mocks:
mocks['ismount'].side_effect = lambda path: (
False if path.endswith('garbage') else True)
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=tmpdir, mount_check=True, logger=logger)]
devices=tmpdir, datadir="objects", mount_check=True,
logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping garbage as it is not mounted',
@ -550,10 +531,10 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
# so that errors get logged and a human can see what's going wrong;
# only normal FS corruption should be skipped over silently.
def list_locations(dirname):
def list_locations(dirname, datadir):
return [(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
devices=dirname, mount_check=False)]
devices=dirname, datadir=datadir, mount_check=False)]
real_listdir = os.listdir
@ -570,30 +551,34 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"2607", "b54",
"fe450ec990a88cc4b252b181bab04b54"))
with mock.patch('os.listdir', splode_if_endswith("sdf/objects")):
self.assertRaises(OSError, list_locations, tmpdir)
self.assertRaises(OSError, list_locations, tmpdir, "objects")
with mock.patch('os.listdir', splode_if_endswith("2607")):
self.assertRaises(OSError, list_locations, tmpdir)
self.assertRaises(OSError, list_locations, tmpdir, "objects")
with mock.patch('os.listdir', splode_if_endswith("b54")):
self.assertRaises(OSError, list_locations, tmpdir)
self.assertRaises(OSError, list_locations, tmpdir, "objects")
def test_auditor_status(self):
with temptree([]) as tmpdir:
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
datadir = "objects"
# Pretend that some time passed between each partition
with mock.patch('os.stat') as mock_stat, \
mock_check_drive(isdir=True):
mock_stat.return_value.st_mtime = time() - 60
# Auditor starts, there are two partitions to check
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen = diskfile.object_audit_location_generator(tmpdir,
datadir,
False)
gen.next()
gen.next()
# Auditor stopped for some reason without raising StopIterator in
# the generator and restarts There is now only one remaining
# partition to check
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
False)
with mock_check_drive(isdir=True):
gen.next()
@ -602,17 +587,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
# There are no partitions to check if the auditor restarts another
# time and the status files have not been cleared
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
False)
with mock_check_drive(isdir=True):
self.assertRaises(StopIteration, gen.next)
# Reset status file
diskfile.clear_auditor_status(tmpdir)
diskfile.clear_auditor_status(tmpdir, datadir)
# If the auditor restarts another time, we expect to
# check two partitions again, because the remaining
# partitions were empty and a new listdir was executed
gen = diskfile.object_audit_location_generator(tmpdir, False)
gen = diskfile.object_audit_location_generator(tmpdir, datadir,
False)
with mock_check_drive(isdir=True):
gen.next()
gen.next()
@ -985,7 +972,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.df_mgr.logger.increment.assert_called_with('async_pendings')
def test_object_audit_location_generator(self):
locations = list(self.df_mgr.object_audit_location_generator())
locations = list(
self.df_mgr.object_audit_location_generator(POLICIES[0]))
self.assertEqual(locations, [])
def test_replication_one_per_device_deprecation(self):

View File

@ -19,6 +19,7 @@ from test.unit import FakeRing, mocked_http_conn, debug_logger
from tempfile import mkdtemp
from shutil import rmtree
from collections import defaultdict
from copy import deepcopy
import mock
import six
@ -68,8 +69,8 @@ class FakeInternalClient(object):
def iter_containers(self, account, prefix=''):
acc_dict = self.aco_dict[account]
return [{'name': six.text_type(container)} for container in
acc_dict if container.startswith(prefix)]
return sorted([{'name': six.text_type(container)} for container in
acc_dict if container.startswith(prefix)])
def delete_container(*a, **kw):
pass
@ -100,6 +101,41 @@ class TestObjectExpirer(TestCase):
self.conf = {'recon_cache_path': self.rcache}
self.logger = debug_logger('test-expirer')
self.past_time = str(int(time() - 86400))
self.future_time = str(int(time() + 86400))
# Dummy task queue for test
self.fake_swift = FakeInternalClient({
'.expiring_objects': {
# this task container will be checked
self.past_time: [
# tasks ready for execution
self.past_time + '-a0/c0/o0',
self.past_time + '-a1/c1/o1',
self.past_time + '-a2/c2/o2',
self.past_time + '-a3/c3/o3',
self.past_time + '-a4/c4/o4',
self.past_time + '-a5/c5/o5',
self.past_time + '-a6/c6/o6',
self.past_time + '-a7/c7/o7',
# task objects for unicode test
self.past_time + u'-a8/c8/o8\u2661',
self.past_time + u'-a9/c9/o9\xf8',
# this task will be skipped
self.future_time + '-a10/c10/o10'],
# this task container will be skipped
self.future_time: [
self.future_time + '-a11/c11/o11']}
})
self.expirer = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
# target object paths which should be expirerd now
self.expired_target_path_list = [
'a0/c0/o0', 'a1/c1/o1', 'a2/c2/o2', 'a3/c3/o3', 'a4/c4/o4',
'a5/c5/o5', 'a6/c6/o6', 'a7/c7/o7',
'a8/c8/o8\xe2\x99\xa1', 'a9/c9/o9\xc3\xb8',
]
def tearDown(self):
rmtree(self.rcache)
internal_client.sleep = self.old_sleep
@ -220,16 +256,7 @@ class TestObjectExpirer(TestCase):
self.deleted_objects[task_container] = set()
self.deleted_objects[task_container].add(task_object)
aco_dict = {
'.expiring_objects': {
'0': set('1-a/c/one 2-a/c/two 3-a/c/three'.split()),
'1': set('2-a/c/two 3-a/c/three 4-a/c/four'.split()),
'2': set('5-a/c/five 6-a/c/six'.split()),
'3': set(u'7-a/c/seven\u2661'.split()),
},
}
fake_swift = FakeInternalClient(aco_dict)
x = ObjectExpirer(self.conf, swift=fake_swift)
x = ObjectExpirer(self.conf, swift=self.fake_swift)
deleted_objects = defaultdict(set)
for i in range(3):
@ -240,9 +267,15 @@ class TestObjectExpirer(TestCase):
for task_container, deleted in x.deleted_objects.items():
self.assertFalse(deleted_objects[task_container] & deleted)
deleted_objects[task_container] |= deleted
self.assertEqual(aco_dict['.expiring_objects']['3'].pop(),
deleted_objects['3'].pop().decode('utf8'))
self.assertEqual(aco_dict['.expiring_objects'], deleted_objects)
# sort for comparison
deleted_objects = {
con: sorted(o_set) for con, o_set in deleted_objects.items()}
expected = {
self.past_time: [
self.past_time + '-' + target_path
for target_path in self.expired_target_path_list]}
self.assertEqual(deleted_objects, expected)
def test_delete_object(self):
x = expirer.ObjectExpirer({}, logger=self.logger)
@ -341,66 +374,66 @@ class TestObjectExpirer(TestCase):
assert_parse_task_obj('1000-a/c/o', 1000, 'a', 'c', 'o')
assert_parse_task_obj('0000-acc/con/obj', 0, 'acc', 'con', 'obj')
def test_round_robin_order(self):
def make_task(delete_at, target):
return {
'task_account': '.expiring_objects',
'task_container': delete_at,
'task_object': delete_at + '-' + target,
'delete_timestamp': Timestamp(delete_at),
'target_path': target,
}
def make_task(self, delete_at, target):
return {
'task_account': '.expiring_objects',
'task_container': delete_at,
'task_object': delete_at + '-' + target,
'delete_timestamp': Timestamp(delete_at),
'target_path': target,
}
def test_round_robin_order(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
task_con_obj_list = [
# objects in 0000 timestamp container
make_task('0000', 'a/c0/o0'),
make_task('0000', 'a/c0/o1'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0000', 'a/c0/o1'),
# objects in 0001 timestamp container
make_task('0001', 'a/c1/o0'),
make_task('0001', 'a/c1/o1'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0001', 'a/c1/o1'),
# objects in 0002 timestamp container
make_task('0002', 'a/c2/o0'),
make_task('0002', 'a/c2/o1'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0002', 'a/c2/o1'),
]
result = list(x.round_robin_order(task_con_obj_list))
# sorted by popping one object to delete for each target_container
expected = [
make_task('0000', 'a/c0/o0'),
make_task('0001', 'a/c1/o0'),
make_task('0002', 'a/c2/o0'),
make_task('0000', 'a/c0/o1'),
make_task('0001', 'a/c1/o1'),
make_task('0002', 'a/c2/o1'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0000', 'a/c0/o1'),
self.make_task('0001', 'a/c1/o1'),
self.make_task('0002', 'a/c2/o1'),
]
self.assertEqual(expected, result)
# task containers have some task objects with invalid target paths
task_con_obj_list = [
# objects in 0000 timestamp container
make_task('0000', 'invalid0'),
make_task('0000', 'a/c0/o0'),
make_task('0000', 'a/c0/o1'),
self.make_task('0000', 'invalid0'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0000', 'a/c0/o1'),
# objects in 0001 timestamp container
make_task('0001', 'a/c1/o0'),
make_task('0001', 'invalid1'),
make_task('0001', 'a/c1/o1'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0001', 'invalid1'),
self.make_task('0001', 'a/c1/o1'),
# objects in 0002 timestamp container
make_task('0002', 'a/c2/o0'),
make_task('0002', 'a/c2/o1'),
make_task('0002', 'invalid2'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0002', 'a/c2/o1'),
self.make_task('0002', 'invalid2'),
]
result = list(x.round_robin_order(task_con_obj_list))
# the invalid task objects are ignored
expected = [
make_task('0000', 'a/c0/o0'),
make_task('0001', 'a/c1/o0'),
make_task('0002', 'a/c2/o0'),
make_task('0000', 'a/c0/o1'),
make_task('0001', 'a/c1/o1'),
make_task('0002', 'a/c2/o1'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0000', 'a/c0/o1'),
self.make_task('0001', 'a/c1/o1'),
self.make_task('0002', 'a/c2/o1'),
]
self.assertEqual(expected, result)
@ -408,51 +441,51 @@ class TestObjectExpirer(TestCase):
# the same timestamp container
task_con_obj_list = [
# objects in 0000 timestamp container
make_task('0000', 'a/c0/o0'),
make_task('0000', 'a/c0/o1'),
make_task('0000', 'a/c2/o2'),
make_task('0000', 'a/c2/o3'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0000', 'a/c0/o1'),
self.make_task('0000', 'a/c2/o2'),
self.make_task('0000', 'a/c2/o3'),
# objects in 0001 timestamp container
make_task('0001', 'a/c0/o2'),
make_task('0001', 'a/c0/o3'),
make_task('0001', 'a/c1/o0'),
make_task('0001', 'a/c1/o1'),
self.make_task('0001', 'a/c0/o2'),
self.make_task('0001', 'a/c0/o3'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0001', 'a/c1/o1'),
# objects in 0002 timestamp container
make_task('0002', 'a/c2/o0'),
make_task('0002', 'a/c2/o1'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0002', 'a/c2/o1'),
]
result = list(x.round_robin_order(task_con_obj_list))
# so we go around popping by *target* container, not *task* container
expected = [
make_task('0000', 'a/c0/o0'),
make_task('0001', 'a/c1/o0'),
make_task('0000', 'a/c2/o2'),
make_task('0000', 'a/c0/o1'),
make_task('0001', 'a/c1/o1'),
make_task('0000', 'a/c2/o3'),
make_task('0001', 'a/c0/o2'),
make_task('0002', 'a/c2/o0'),
make_task('0001', 'a/c0/o3'),
make_task('0002', 'a/c2/o1'),
self.make_task('0000', 'a/c0/o0'),
self.make_task('0001', 'a/c1/o0'),
self.make_task('0000', 'a/c2/o2'),
self.make_task('0000', 'a/c0/o1'),
self.make_task('0001', 'a/c1/o1'),
self.make_task('0000', 'a/c2/o3'),
self.make_task('0001', 'a/c0/o2'),
self.make_task('0002', 'a/c2/o0'),
self.make_task('0001', 'a/c0/o3'),
self.make_task('0002', 'a/c2/o1'),
]
self.assertEqual(expected, result)
# all of the work to be done could be for different target containers
task_con_obj_list = [
# objects in 0000 timestamp container
make_task('0000', 'a/c0/o'),
make_task('0000', 'a/c1/o'),
make_task('0000', 'a/c2/o'),
make_task('0000', 'a/c3/o'),
self.make_task('0000', 'a/c0/o'),
self.make_task('0000', 'a/c1/o'),
self.make_task('0000', 'a/c2/o'),
self.make_task('0000', 'a/c3/o'),
# objects in 0001 timestamp container
make_task('0001', 'a/c4/o'),
make_task('0001', 'a/c5/o'),
make_task('0001', 'a/c6/o'),
make_task('0001', 'a/c7/o'),
self.make_task('0001', 'a/c4/o'),
self.make_task('0001', 'a/c5/o'),
self.make_task('0001', 'a/c6/o'),
self.make_task('0001', 'a/c7/o'),
# objects in 0002 timestamp container
make_task('0002', 'a/c8/o'),
make_task('0002', 'a/c9/o'),
self.make_task('0002', 'a/c8/o'),
self.make_task('0002', 'a/c9/o'),
]
result = list(x.round_robin_order(task_con_obj_list))
@ -500,18 +533,14 @@ class TestObjectExpirer(TestCase):
"'str' object has no attribute 'get_account_info'")
def test_run_once_calls_report(self):
fake_swift = FakeInternalClient({
'.expiring_objects': {u'1234': [u'1234-a/c/troms\xf8']}
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
with mock.patch.object(x, 'pop_queue', lambda a, c, o: None):
x.run_once()
with mock.patch.object(self.expirer, 'pop_queue',
lambda a, c, o: None):
self.expirer.run_once()
self.assertEqual(
x.logger.get_lines_for_level('info'), [
self.expirer.logger.get_lines_for_level('info'), [
'Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 1 objects expired',
'2 possible containers; 12 possible objects',
'Pass completed in 0s; 10 objects expired',
])
def test_skip_task_account_without_task_container(self):
@ -528,62 +557,30 @@ class TestObjectExpirer(TestCase):
])
def test_iter_task_to_expire(self):
fake_swift = FakeInternalClient({
'.expiring_objects': {
u'1234': ['1234-a0/c0/o0', '1234-a1/c1/o1'],
u'2000': ['2000-a2/c2/o2', '2000-a3/c3/o3'],
}
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
# In this test, all tasks are assigned to the tested expirer
my_index = 0
divisor = 1
task_account_container_list = [('.expiring_objects', u'1234'),
('.expiring_objects', u'2000')]
task_account_container_list = [('.expiring_objects', self.past_time)]
expected = [{
'task_account': '.expiring_objects',
'task_container': u'1234',
'task_object': '1234-a0/c0/o0',
'target_path': 'a0/c0/o0',
'delete_timestamp': Timestamp(1234),
}, {
'task_account': '.expiring_objects',
'task_container': u'1234',
'task_object': '1234-a1/c1/o1',
'target_path': 'a1/c1/o1',
'delete_timestamp': Timestamp(1234),
}, {
'task_account': '.expiring_objects',
'task_container': u'2000',
'task_object': '2000-a2/c2/o2',
'target_path': 'a2/c2/o2',
'delete_timestamp': Timestamp(2000),
}, {
'task_account': '.expiring_objects',
'task_container': u'2000',
'task_object': '2000-a3/c3/o3',
'target_path': 'a3/c3/o3',
'delete_timestamp': Timestamp(2000),
}]
expected = [
self.make_task(self.past_time, target_path)
for target_path in self.expired_target_path_list]
self.assertEqual(
list(x.iter_task_to_expire(
list(self.expirer.iter_task_to_expire(
task_account_container_list, my_index, divisor)),
expected)
# the task queue has invalid task object
fake_swift = FakeInternalClient({
'.expiring_objects': {
u'1234': ['1234-invalid', '1234-a0/c0/o0', '1234-a1/c1/o1'],
u'2000': ['2000-a2/c2/o2', '2000-invalid', '2000-a3/c3/o3'],
}
})
invalid_aco_dict = deepcopy(self.fake_swift.aco_dict)
invalid_aco_dict['.expiring_objects'][self.past_time].insert(
0, self.past_time + '-invalid0')
invalid_aco_dict['.expiring_objects'][self.past_time].insert(
5, self.past_time + '-invalid1')
invalid_fake_swift = FakeInternalClient(invalid_aco_dict)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
swift=invalid_fake_swift)
# but the invalid tasks are skipped
self.assertEqual(
@ -592,161 +589,72 @@ class TestObjectExpirer(TestCase):
expected)
def test_run_once_unicode_problem(self):
fake_swift = FakeInternalClient({
'.expiring_objects': {u'1234': [u'1234-a/c/troms\xf8']}
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
requests = []
def capture_requests(ipaddr, port, method, path, *args, **kwargs):
requests.append((method, path))
with mocked_http_conn(200, 200, 200, give_connect=capture_requests):
x.run_once()
self.assertEqual(len(requests), 3)
# 3 DELETE requests for each 10 executed task objects to pop_queue
code_list = [200] * 3 * 10
with mocked_http_conn(*code_list, give_connect=capture_requests):
self.expirer.run_once()
self.assertEqual(len(requests), 30)
def test_container_timestamp_break(self):
def fail_to_iter_objects(*a, **kw):
raise Exception('This should not have been called')
with mock.patch.object(self.fake_swift, 'iter_objects') as mock_method:
self.expirer.run_once()
fake_swift = FakeInternalClient({
'.expiring_objects': {str(int(time() + 86400)): ['1234-a/c/o']}
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
with mock.patch.object(fake_swift, 'iter_objects',
fail_to_iter_objects):
x.run_once()
logs = x.logger.all_log_lines()
self.assertEqual(logs['info'], [
'Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 0 objects expired',
])
self.assertNotIn('error', logs)
# Reverse test to be sure it still would blow up the way expected.
fake_swift = FakeInternalClient({
'.expiring_objects': {str(int(time() - 86400)): ['1234-a/c/o']}
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
with mock.patch.object(fake_swift, 'iter_objects',
fail_to_iter_objects):
x.run_once()
self.assertEqual(
x.logger.get_lines_for_level('error'), ['Unhandled exception: '])
log_args, log_kwargs = x.logger.log_dict['error'][-1]
self.assertEqual(str(log_kwargs['exc_info'][1]),
'This should not have been called')
# iter_objects is called only for past_time, not future_time
self.assertEqual(mock_method.call_args_list,
[mock.call('.expiring_objects', self.past_time)])
def test_object_timestamp_break(self):
def should_not_be_called(*a, **kw):
raise Exception('This should not have been called')
with mock.patch.object(self.expirer, 'delete_actual_object') \
as mock_method, \
mock.patch.object(self.expirer, 'pop_queue'):
self.expirer.run_once()
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): [
'%d-a/c/actual-obj' % int(time() + 86400)],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.run_once()
self.assertNotIn('error', x.logger.all_log_lines())
self.assertEqual(x.logger.get_lines_for_level('info'), [
'Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 0 objects expired',
])
# Reverse test to be sure it still would blow up the way expected.
ts = int(time() - 86400)
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): ['%d-a/c/actual-obj' % ts],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = should_not_be_called
x.run_once()
# executed tasks are with past time
self.assertEqual(
x.logger.get_lines_for_level('error'),
['Exception while deleting object .expiring_objects '
'%d %d-a/c/actual-obj This should not have been called: ' %
(ts, ts)])
mock_method.call_args_list,
[mock.call(target_path, self.past_time)
for target_path in self.expired_target_path_list])
def test_failed_delete_keeps_entry(self):
def deliberately_blow_up(actual_obj, timestamp):
raise Exception('failed to delete actual object')
def should_not_get_called(account, container, obj):
raise Exception('This should not have been called')
# any tasks are not done
with mock.patch.object(self.expirer, 'delete_actual_object',
deliberately_blow_up), \
mock.patch.object(self.expirer, 'pop_queue') as mock_method:
self.expirer.run_once()
ts = int(time() - 86400)
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): ['%d-a/c/actual-obj' % ts],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = deliberately_blow_up
x.pop_queue = should_not_get_called
x.run_once()
self.assertEqual(
x.logger.get_lines_for_level('error'),
['Exception while deleting object .expiring_objects '
'%d %d-a/c/actual-obj failed to delete actual object: ' %
(ts, ts)])
self.assertEqual(
x.logger.get_lines_for_level('info'), [
'Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 0 objects expired',
])
# no tasks are popped from the queue
self.assertEqual(mock_method.call_args_list, [])
# Reverse test to be sure it still would blow up the way expected.
ts = int(time() - 86400)
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): ['%d-a/c/actual-obj' % ts],
},
})
self.logger._clear()
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = lambda o, t: None
x.pop_queue = should_not_get_called
x.run_once()
# all tasks are done
with mock.patch.object(self.expirer, 'delete_actual_object',
lambda o, t: None), \
mock.patch.object(self.expirer, 'pop_queue') as mock_method:
self.expirer.run_once()
# all tasks are popped from the queue
self.assertEqual(
self.logger.get_lines_for_level('error'),
['Exception while deleting object .expiring_objects '
'%d %d-a/c/actual-obj This should not have been called: ' %
(ts, ts)])
mock_method.call_args_list,
[mock.call('.expiring_objects', self.past_time,
self.past_time + '-' + target_path)
for target_path in self.expired_target_path_list])
def test_success_gets_counted(self):
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): [
'%d-acc/c/actual-obj' % int(time() - 86400)],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = lambda o, t: None
x.pop_queue = lambda a, c, o: None
self.assertEqual(x.report_objects, 0)
with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0):
x.run_once()
self.assertEqual(x.report_objects, 1)
self.assertEqual(
x.logger.get_lines_for_level('info'),
['Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 1 objects expired'])
self.assertEqual(self.expirer.report_objects, 0)
with mock.patch('swift.obj.expirer.MAX_OBJECTS_TO_CACHE', 0), \
mock.patch.object(self.expirer, 'delete_actual_object',
lambda o, t: None), \
mock.patch.object(self.expirer, 'pop_queue',
lambda a, c, o: None):
self.expirer.run_once()
self.assertEqual(self.expirer.report_objects, 10)
def test_delete_actual_object_does_not_get_unicode(self):
got_unicode = [False]
@ -755,25 +663,15 @@ class TestObjectExpirer(TestCase):
if isinstance(actual_obj, six.text_type):
got_unicode[0] = True
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(int(time() - 86400)): [
'%d-a/c/actual-obj' % int(time() - 86400)],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = delete_actual_object_test_for_unicode
x.pop_queue = lambda a, c, o: None
self.assertEqual(x.report_objects, 0)
x.run_once()
self.assertEqual(x.report_objects, 1)
self.assertEqual(
x.logger.get_lines_for_level('info'), [
'Pass beginning for task account .expiring_objects; '
'1 possible containers; 1 possible objects',
'Pass completed in 0s; 1 objects expired',
])
self.assertEqual(self.expirer.report_objects, 0)
with mock.patch.object(self.expirer, 'delete_actual_object',
delete_actual_object_test_for_unicode), \
mock.patch.object(self.expirer, 'pop_queue',
lambda a, c, o: None):
self.expirer.run_once()
self.assertEqual(self.expirer.report_objects, 10)
self.assertFalse(got_unicode[0])
def test_failed_delete_continues_on(self):
@ -783,42 +681,26 @@ class TestObjectExpirer(TestCase):
def fail_delete_actual_object(actual_obj, timestamp):
raise Exception('failed to delete actual object')
cts = int(time() - 86400)
ots = int(time() - 86400)
with mock.patch.object(self.fake_swift, 'delete_container',
fail_delete_container), \
mock.patch.object(self.expirer, 'delete_actual_object',
fail_delete_actual_object):
self.expirer.run_once()
fake_swift = FakeInternalClient({
'.expiring_objects': {
str(cts): [
'%d-a/c/actual-obj' % ots, '%d-a/c/next-obj' % ots],
str(cts + 1): [
'%d-a/c/actual-obj' % ots, '%d-a/c/next-obj' % ots],
},
})
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=fake_swift)
x.delete_actual_object = fail_delete_actual_object
with mock.patch.object(fake_swift, 'delete_container',
fail_delete_container):
x.run_once()
error_lines = x.logger.get_lines_for_level('error')
self.assertEqual(sorted(error_lines), sorted([
'Exception while deleting object .expiring_objects %d '
'%d-a/c/actual-obj failed to delete actual object: ' % (cts, ots),
'Exception while deleting object .expiring_objects %d '
'%d-a/c/next-obj failed to delete actual object: ' % (cts, ots),
'Exception while deleting object .expiring_objects %d '
'%d-a/c/actual-obj failed to delete actual object: ' %
(cts + 1, ots),
'Exception while deleting object .expiring_objects %d '
'%d-a/c/next-obj failed to delete actual object: ' %
(cts + 1, ots),
'Exception while deleting container .expiring_objects %d '
'failed to delete container: ' % (cts,),
'Exception while deleting container .expiring_objects %d '
'failed to delete container: ' % (cts + 1,)]))
self.assertEqual(x.logger.get_lines_for_level('info'), [
error_lines = self.expirer.logger.get_lines_for_level('error')
self.assertEqual(error_lines, [
'Exception while deleting object %s %s %s '
'failed to delete actual object: ' % (
'.expiring_objects', self.past_time,
self.past_time + '-' + target_path)
for target_path in self.expired_target_path_list] + [
'Exception while deleting container %s %s '
'failed to delete container: ' % (
'.expiring_objects', self.past_time)])
self.assertEqual(self.expirer.logger.get_lines_for_level('info'), [
'Pass beginning for task account .expiring_objects; '
'2 possible containers; 4 possible objects',
'2 possible containers; 12 possible objects',
'Pass completed in 0s; 0 objects expired',
])

View File

@ -30,8 +30,7 @@ from eventlet.green import subprocess
from eventlet import Timeout, sleep
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
mocked_http_conn, mock_check_drive, skip_if_no_xattrs,
SkipTest)
mocked_http_conn, mock_check_drive, skip_if_no_xattrs)
from swift.common import utils
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
storage_directory)
@ -134,20 +133,28 @@ def _mock_process(ret):
class MockHungProcess(object):
def __init__(self, *args, **kwargs):
def __init__(self, waits_needed=1, *args, **kwargs):
class MockStdout(object):
def read(self):
pass
self.stdout = MockStdout()
self._state = 'running'
self._calls = []
self._waits = 0
self._waits_needed = waits_needed
def wait(self):
def wait(self, timeout=None):
self._calls.append(('wait', self._state))
if self._state == 'running':
# Sleep so we trip either the lockup detector or the rsync timeout
# Sleep so we trip the rsync timeout
sleep(1)
raise BaseException('You need to mock out some timeouts')
elif self._state == 'killed':
self._waits += 1
if self._waits >= self._waits_needed:
return
else:
raise subprocess.TimeoutExpired('some cmd', timeout)
def terminate(self):
self._calls.append(('terminate', self._state))
@ -2036,38 +2043,6 @@ class TestObjectReplicator(unittest.TestCase):
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
def test_replicate_lockup_detector(self):
raise SkipTest("this is not a reliable test and must be fixed")
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES[0])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
mock_procs = []
def new_mock(*a, **kw):
proc = MockHungProcess()
mock_procs.append(proc)
return proc
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'lockup_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.replicate()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
('terminate', 'running'),
('wait', 'terminating'),
])
self.assertEqual(len(mock_procs), 1)
def test_replicate_rsync_timeout(self):
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
@ -2090,7 +2065,7 @@ class TestObjectReplicator(unittest.TestCase):
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.replicate()
self.replicator.run_once()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
@ -2099,5 +2074,38 @@ class TestObjectReplicator(unittest.TestCase):
])
self.assertEqual(len(mock_procs), 2)
def test_replicate_rsync_timeout_wedged(self):
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy=POLICIES[0])
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
mock_procs = []
def new_mock(*a, **kw):
proc = MockHungProcess(waits_needed=2)
mock_procs.append(proc)
return proc
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)), \
mock.patch.object(self.replicator, 'rsync_timeout', 0.01), \
mock.patch('eventlet.green.subprocess.Popen', new_mock):
self.replicator.run_once()
for proc in mock_procs:
self.assertEqual(proc._calls, [
('wait', 'running'),
('kill', 'running'),
('wait', 'killed'),
('wait', 'killed'),
])
self.assertEqual(len(mock_procs), 2)
if __name__ == '__main__':
unittest.main()

View File

@ -30,6 +30,7 @@ setenv = VIRTUAL_ENV={envdir}
commands =
nosetests \
test/unit/cli/test_dispersion_report.py \
test/unit/cli/test_form_signature.py \
test/unit/cli/test_info.py \
test/unit/cli/test_relinker.py \
test/unit/cli/test_ring_builder_analyzer.py \