Expand recon middleware support

Expand recon middleware to include support for account and container
servers in addition to the existing object servers. Also add support
for retrieving recent information from auditors, replicators, and
updaters. In the case of certain checks (such as container auditors)
the stats returned are only for the most recent path processed.

The middleware has also been refactored and should now also handle
errors better in cases where stats are unavailable.

While new check's have been added the output from pre-existing
check's has not changed. This should allow existing 3rd party
utilities such as the Swift ZenPack to continue to function.

Change-Id: Ib9893a77b9b8a2f03179f2a73639bc4a6e264df7
This commit is contained in:
Florian Hines 2012-05-14 18:01:48 -05:00
parent a74cd3b01b
commit ccb6334c17
19 changed files with 1193 additions and 437 deletions

View File

@ -12,9 +12,9 @@ try:
except ImportError:
import json
from hashlib import md5
import datetime
import eventlet
import optparse
import time
import sys
import os
@ -26,12 +26,7 @@ class Scout(object):
def __init__(self, recon_type, verbose=False, suppress_errors=False,
timeout=5):
recon_uri = ["ringmd5", "async", "replication", "load", "diskusage",
"unmounted", "quarantined", "sockstat"]
if recon_type not in recon_uri:
raise Exception("Invalid scout type requested")
else:
self.recon_type = recon_type
self.recon_type = recon_type
self.verbose = verbose
self.suppress_errors = suppress_errors
self.timeout = timeout
@ -87,6 +82,44 @@ class SwiftRecon(object):
self.timeout = 5
self.pool_size = 30
self.pool = eventlet.GreenPool(self.pool_size)
self.check_types = ['account', 'container', 'object']
self.server_type = 'object'
def _gen_stats(self, stats, name=None):
""" compute various stats from a list of values """
cstats = [x for x in stats if x is not None]
if len(cstats) > 0:
ret_dict = {'low': min(cstats), 'high': max(cstats),
'total': sum(cstats), 'reported': len(cstats),
'number_none': len(stats) - len(cstats), 'name': name}
ret_dict['average'] = \
ret_dict['total'] / float(len(cstats))
ret_dict['perc_none'] = \
ret_dict['number_none'] * 100.0 / len(stats)
else:
ret_dict = {'reported': 0}
return ret_dict
def _print_stats(self, stats):
"""
print out formatted stats to console
:param stats: dict of stats generated by _gen_stats
"""
print '[%(name)s] low: %(low)d, high: %(high)d, avg: ' \
'%(average).1f, total: %(total)d, ' \
'Failed: %(perc_none).1f%%, no_result: %(number_none)d, ' \
'reported: %(reported)d' % stats
def _ptime(self, timev=None):
"""
:param timev: a unix timestamp or None
:returns: a pretty string of the current time or provided time
"""
if timev:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timev))
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def get_devices(self, zone_filter, swift_dir, ring_name):
"""
@ -125,10 +158,9 @@ class SwiftRecon(object):
ring_sum = md5sum.hexdigest()
recon = Scout("ringmd5", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking ring md5sum's on %s hosts..." % (now, len(hosts))
print "[%s] Checking ring md5sums" % self._ptime()
if self.verbose:
print "-> On disk md5sum: %s" % ring_sum
print "-> On disk %s md5sum: %s" % (ringfile, ring_sum)
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response[ringfile]
@ -152,23 +184,18 @@ class SwiftRecon(object):
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {}
scan = {}
recon = Scout("async", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking async pendings on %s hosts..." % (now, len(hosts))
print "[%s] Checking async pendings" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response['async_pending']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "Async stats: low: %d, high: %d, avg: %d, total: %d" % (low,
high, average, total)
scan[url] = response['async_pending']
stats = self._gen_stats(scan.values(), 'async_pending')
if stats['reported'] > 0:
self._print_stats(stats)
else:
print "Error: No hosts available or returned valid information."
print "[async_pending] - No hosts returned valid data."
print "=" * 79
def umount_check(self, hosts):
@ -181,9 +208,8 @@ class SwiftRecon(object):
stats = {}
recon = Scout("unmounted", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Getting unmounted drives from %s hosts..." % \
(now, len(hosts))
(self._ptime(), len(hosts))
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
for i in response:
@ -193,31 +219,231 @@ class SwiftRecon(object):
print "Not mounted: %s on %s" % (stats[host], node)
print "=" * 79
def expirer_check(self, hosts):
"""
Obtain and print expirer statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {'object_expiration_pass': [], 'expired_last_pass': []}
recon = Scout("expirer/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking on expirers" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats['object_expiration_pass'].append(
response.get('object_expiration_pass'))
stats['expired_last_pass'].append(
response.get('expired_last_pass'))
for k in stats:
if stats[k]:
computed = self._gen_stats(stats[k], name=k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[%s] - No hosts returned valid data." % k
else:
print "[%s] - No hosts returned valid data." % k
print "=" * 79
def replication_check(self, hosts):
"""
Obtain and print replication statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {'replication_time': [], 'failure': [], 'success': [],
'attempted': []}
recon = Scout("replication/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking on replication" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats['replication_time'].append(
response.get('replication_time'))
repl_stats = response['replication_stats']
if repl_stats:
for stat_key in ['attempted', 'failure', 'success']:
stats[stat_key].append(repl_stats.get(stat_key))
for k in stats:
if stats[k]:
if k != 'replication_time':
computed = self._gen_stats(stats[k],
name='replication_%s' % k)
else:
computed = self._gen_stats(stats[k], name=k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[%s] - No hosts returned valid data." % k
else:
print "[%s] - No hosts returned valid data." % k
print "=" * 79
def object_replication_check(self, hosts):
"""
Obtain and print replication statistics from object servers
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {}
recon = Scout("replication", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking replication times on %s hosts..." % \
(now, len(hosts))
print "[%s] Checking on replication" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response['object_replication_time']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "[Replication Times] shortest: %s, longest: %s, avg: %s" % \
(low, high, average)
times = [x for x in stats.values() if x is not None]
if len(stats) > 0 and len(times) > 0:
computed = self._gen_stats(times, 'replication_time')
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[replication_time] - No hosts returned valid data."
else:
print "Error: No hosts available or returned valid information."
print "[replication_time] - No hosts returned valid data."
print "=" * 79
def updater_check(self, hosts):
"""
Obtain and print updater statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = []
recon = Scout("updater/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking updater times" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
if response['%s_updater_sweep' % self.server_type]:
stats.append(response['%s_updater_sweep' %
self.server_type])
if len(stats) > 0:
computed = self._gen_stats(stats, name='updater_last_sweep')
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[updater_last_sweep] - No hosts returned valid data."
else:
print "[updater_last_sweep] - No hosts returned valid data."
print "=" * 79
def auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
scan = {}
adone = '%s_auditor_pass_completed' % self.server_type
afail = '%s_audits_failed' % self.server_type
apass = '%s_audits_passed' % self.server_type
asince = '%s_audits_since' % self.server_type
recon = Scout("auditor/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking auditor stats" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
scan[url] = response
if len(scan) < 1:
print "Error: No hosts available"
return
stats = {}
stats[adone] = [scan[i][adone] for i in scan
if scan[i][adone] is not None]
stats[afail] = [scan[i][afail] for i in scan
if scan[i][afail] is not None]
stats[apass] = [scan[i][apass] for i in scan
if scan[i][apass] is not None]
stats[asince] = [scan[i][asince] for i in scan
if scan[i][asince] is not None]
for k in stats:
if len(stats[k]) < 1:
print "[%s] - No hosts returned valid data." % k
else:
if k != asince:
computed = self._gen_stats(stats[k], k)
if computed['reported'] > 0:
self._print_stats(computed)
if len(stats[asince]) >= 1:
low = min(stats[asince])
high = max(stats[asince])
total = sum(stats[asince])
average = total / len(stats[asince])
print '[last_pass] oldest: %s, newest: %s, avg: %s' % \
(self._ptime(low), self._ptime(high), self._ptime(average))
print "=" * 79
def object_auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
all_scan = {}
zbf_scan = {}
atime = 'audit_time'
bprocessed = 'bytes_processed'
passes = 'passes'
errors = 'errors'
quarantined = 'quarantined'
recon = Scout("auditor/object", self.verbose, self.suppress_errors,
self.timeout)
print "[%s] Checking auditor stats " % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
if response['object_auditor_stats_ALL']:
all_scan[url] = response['object_auditor_stats_ALL']
if response['object_auditor_stats_ZBF']:
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
stats[atime] = [all_scan[i][atime] for i in all_scan]
stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan]
stats[passes] = [all_scan[i][passes] for i in all_scan]
stats[errors] = [all_scan[i][errors] for i in all_scan]
stats[quarantined] = [all_scan[i][quarantined] for i in all_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
if len(stats[k]) < 1:
print "[Auditor %s] - No hosts returned valid data." % k
else:
computed = self._gen_stats(stats[k],
name='ALL_%s_last_path' % k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[ALL_auditor] - No hosts returned valid data."
else:
print "[ALL_auditor] - No hosts returned valid data."
if len(zbf_scan) > 0:
stats = {}
stats[atime] = [zbf_scan[i][atime] for i in zbf_scan]
stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan]
stats[errors] = [zbf_scan[i][errors] for i in zbf_scan]
stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
if len(stats[k]) < 1:
print "[Auditor %s] - No hosts returned valid data." % k
else:
computed = self._gen_stats(stats[k],
name='ZBF_%s_last_path' % k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[ZBF_auditor] - No hosts returned valid data."
else:
print "[ZBF_auditor] - No hosts returned valid data."
print "=" * 79
def load_check(self, hosts):
@ -232,8 +458,7 @@ class SwiftRecon(object):
load15 = {}
recon = Scout("load", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking load avg's on %s hosts..." % (now, len(hosts))
print "[%s] Checking load averages" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
load1[url] = response['1m']
@ -242,14 +467,11 @@ class SwiftRecon(object):
stats = {"1m": load1, "5m": load5, "15m": load15}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print "[%s load average] lowest: %s, highest: %s, avg: %s" % \
(item, low, high, average)
computed = self._gen_stats(stats[item].values(),
name='%s_load_avg' % item)
self._print_stats(computed)
else:
print "Error: No hosts available or returned valid info."
print "[%s_load_avg] - No hosts returned valid data." % item
print "=" * 79
def quarantine_check(self, hosts):
@ -264,8 +486,7 @@ class SwiftRecon(object):
acctq = {}
recon = Scout("quarantined", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking quarantine on %s hosts..." % (now, len(hosts))
print "[%s] Checking quarantine" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
objq[url] = response['objects']
@ -274,14 +495,11 @@ class SwiftRecon(object):
stats = {"objects": objq, "containers": conq, "accounts": acctq}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print ("[Quarantined %s] low: %d, high: %d, avg: %d, total: %d"
% (item, low, high, average, total))
computed = self._gen_stats(stats[item].values(),
name='quarantined_%s' % item)
self._print_stats(computed)
else:
print "Error: No hosts available or returned valid info."
print "No hosts returned valid data."
print "=" * 79
def socket_usage(self, hosts):
@ -298,8 +516,7 @@ class SwiftRecon(object):
orphan = {}
recon = Scout("sockstat", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking socket usage on %s hosts..." % (now, len(hosts))
print "[%s] Checking socket usage" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
inuse4[url] = response['tcp_in_use']
@ -312,14 +529,10 @@ class SwiftRecon(object):
"orphan": orphan}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print "[%s] low: %d, high: %d, avg: %d, total: %d" % \
(item, low, high, average, total)
computed = self._gen_stats(stats[item].values(), item)
self._print_stats(computed)
else:
print "Error: No hosts or info available."
print "No hosts returned valid data."
print "=" * 79
def disk_usage(self, hosts):
@ -334,12 +547,10 @@ class SwiftRecon(object):
lows = []
raw_total_used = []
raw_total_avail = []
averages = []
percents = {}
recon = Scout("diskusage", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking disk usage on %s hosts..." % (now, len(hosts))
print "[%s] Checking disk usage now" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
hostusage = []
@ -357,11 +568,8 @@ class SwiftRecon(object):
#get per host hi/los for another day
low = min(stats[url])
high = max(stats[url])
total = sum(stats[url])
average = total / len(stats[url])
highs.append(high)
lows.append(low)
averages.append(average)
for percent in stats[url]:
percents[int(percent)] = percents.get(int(percent), 0) + 1
else:
@ -370,7 +578,6 @@ class SwiftRecon(object):
if len(lows) > 0:
low = min(lows)
high = max(highs)
average = sum(averages) / len(averages)
#dist graph shamelessly stolen from https://github.com/gholt/tcod
print "Distribution Graph:"
mul = 69.0 / max(percents.values())
@ -380,12 +587,13 @@ class SwiftRecon(object):
raw_used = sum(raw_total_used)
raw_avail = sum(raw_total_avail)
raw_total = raw_used + raw_avail
avg_used = 100.0 * raw_used / raw_total
print "Disk usage: space used: %s of %s" % (raw_used, raw_total)
print "Disk usage: space free: %s of %s" % (raw_avail, raw_total)
print "Disk usage: lowest: %s%%, highest: %s%%, avg: %s%%" % \
(low, high, average)
(low, high, avg_used)
else:
print "Error: No hosts available or returned valid information."
print "No hosts returned valid data."
print "=" * 79
def main(self):
@ -394,7 +602,13 @@ class SwiftRecon(object):
"""
print "=" * 79
usage = '''
usage: %prog [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
usage: %prog <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d]
[-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
<server_type>\taccount|container|object
Defaults to object server.
ex: %prog container -l --auditor
'''
args = optparse.OptionParser(usage)
args.add_option('--verbose', '-v', action="store_true",
@ -405,6 +619,12 @@ class SwiftRecon(object):
help="Get async stats")
args.add_option('--replication', '-r', action="store_true",
help="Get replication stats")
args.add_option('--auditor', action="store_true",
help="Get auditor stats")
args.add_option('--updater', action="store_true",
help="Get updater stats")
args.add_option('--expirer', action="store_true",
help="Get expirer stats")
args.add_option('--unmounted', '-u', action="store_true",
help="Check cluster for unmounted devices")
args.add_option('--diskusage', '-d', action="store_true",
@ -413,12 +633,12 @@ class SwiftRecon(object):
help="Get cluster load average stats")
args.add_option('--quarantined', '-q', action="store_true",
help="Get cluster quarantine stats")
args.add_option('--objmd5', action="store_true",
help="Get md5sums of object.ring.gz and compare to local copy")
args.add_option('--md5', action="store_true",
help="Get md5sum of servers ring and compare to local copy")
args.add_option('--sockstat', action="store_true",
help="Get cluster socket usage stats")
args.add_option('--all', action="store_true",
help="Perform all checks. Equal to -arudlq --objmd5 --sockstat")
help="Perform all checks. Equal to -arudlq --md5 --sockstat")
args.add_option('--zone', '-z', type="int",
help="Only query servers in specified zone")
args.add_option('--timeout', '-t', type="int", metavar="SECONDS",
@ -427,44 +647,88 @@ class SwiftRecon(object):
help="Default = /etc/swift")
options, arguments = args.parse_args()
if len(sys.argv) <= 1:
if len(sys.argv) <= 1 or len(arguments) > 1:
args.print_help()
sys.exit(0)
swift_dir = options.swiftdir
obj_ring = os.path.join(swift_dir, 'object.ring.gz')
if arguments:
if arguments[0] in self.check_types:
self.server_type = arguments[0]
else:
print "Invalid Server Type"
args.print_help()
sys.exit(1)
else:
self.server_type = 'object'
swift_dir = options.swiftdir
ring_file = os.path.join(swift_dir, '%s.ring.gz' % self.server_type)
self.verbose = options.verbose
self.suppress_errors = options.suppress
self.timeout = options.timeout
if options.zone:
hosts = self.get_devices(options.zone, swift_dir, 'object')
hosts = self.get_devices(options.zone, swift_dir, self.server_type)
else:
hosts = self.get_devices(None, swift_dir, 'object')
hosts = self.get_devices(None, swift_dir, self.server_type)
print "--> Starting reconnaissance on %s hosts" % len(hosts)
print "=" * 79
if options.all:
self.async_check(hosts)
if self.server_type == 'object':
self.async_check(hosts)
self.object_replication_check(hosts)
self.object_auditor_check(hosts)
self.updater_check(hosts)
self.expirer_check(hosts)
elif self.server_type == 'container':
self.replication_check(hosts)
self.auditor_check(hosts)
self.updater_check(hosts)
elif self.server_type == 'account':
self.replication_check(hosts)
self.auditor_check(hosts)
self.umount_check(hosts)
self.replication_check(hosts)
self.load_check(hosts)
self.disk_usage(hosts)
self.get_ringmd5(hosts, obj_ring)
self.get_ringmd5(hosts, ring_file)
self.quarantine_check(hosts)
self.socket_usage(hosts)
else:
if options.async:
self.async_check(hosts)
if self.server_type == 'object':
self.async_check(hosts)
else:
print "Error: Can't check async's on non object servers."
if options.unmounted:
self.umount_check(hosts)
if options.replication:
self.replication_check(hosts)
if self.server_type == 'object':
self.object_replication_check(hosts)
else:
self.replication_check(hosts)
if options.auditor:
if self.server_type == 'object':
self.object_auditor_check(hosts)
else:
self.auditor_check(hosts)
if options.updater:
if self.server_type == 'account':
print "Error: Can't check updaters on account servers."
else:
self.updater_check(hosts)
if options.expirer:
if self.server_type == 'object':
self.expirer_check(hosts)
else:
print "Error: Can't check expired on non object servers."
if options.loadstats:
self.load_check(hosts)
if options.diskusage:
self.disk_usage(hosts)
if options.objmd5:
self.get_ringmd5(hosts, obj_ring)
if options.md5:
self.get_ringmd5(hosts, ring_file)
if options.quarantined:
self.quarantine_check(hosts)
if options.sockstat:

View File

@ -5,17 +5,11 @@ swift-recon-cron.py
import os
import sys
import optparse
from tempfile import NamedTemporaryFile
try:
import simplejson as json
except ImportError:
import json
from ConfigParser import ConfigParser
from swift.common.utils import get_logger, dump_recon_cache
def async_count(device_dir, logger):
def get_async_count(device_dir, logger):
async_count = 0
for i in os.listdir(device_dir):
asyncdir = os.path.join(device_dir, i, "async_pending")
@ -53,14 +47,13 @@ def main():
print str(e)
sys.exit(1)
try:
asyncs = async_count(device_dir, logger)
asyncs = get_async_count(device_dir, logger)
except Exception:
logger.exception(
_('Exception during recon-cron while accessing devices'))
try:
dump_recon_cache('async_pending', asyncs, cache_file)
except Exception:
logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'async_pending': asyncs}, cache_file, logger)
try:
os.rmdir(lock_dir)
except Exception:

View File

@ -25,7 +25,7 @@
.SH SYNOPSIS
.LP
.B swift-recon
\ [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
\ <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
.SH DESCRIPTION
.PP
@ -40,6 +40,8 @@ more information in the example section below.
.SH OPTIONS
.RS 0
.PD 1
.IP "\fB<server_type>\fR"
account|container|object - Defaults to object server.
.IP "\fB-h, --help\fR"
show this help message and exit
.IP "\fB-v, --verbose\fR"
@ -48,6 +50,12 @@ Print verbose information
Suppress most connection related errors
.IP "\fB-a, --async\fR"
Get async stats
.IP "\fB--auditor\fR"
Get auditor stats
.IP "\fB--updater\fR"
Get updater stats
.IP "\fB--expirer\fR"
Get expirer stats
.IP "\fB-r, --replication\fR"
Get replication stats
.IP "\fB-u, --unmounted\fR"
@ -58,10 +66,10 @@ Get disk usage stats
Get cluster load average stats
.IP "\fB-q, --quarantined\fR"
Get cluster quarantine stats
.IP "\fB--objmd5\fR"
Get md5sums of object.ring.gz and compare to local copy
.IP "\fB--md5\fR"
Get md5sum of servers ring and compare to local cop
.IP "\fB--all\fR"
Perform all checks. Equivalent to -arudlq --objmd5
Perform all checks. Equivalent to -arudlq --md5
.IP "\fB-z ZONE, --zone=ZONE\fR"
Only query servers in specified zone
.IP "\fB--swiftdir=PATH\fR"

View File

@ -248,37 +248,50 @@ allows it to be more easily consumed by third party utilities::
Cluster Telemetry and Monitoring
--------------------------------
Various metrics and telemetry can be obtained from the object servers using
the recon server middleware and the swift-recon cli. To do so update your
object-server.conf to enable the recon middleware by adding a pipeline entry
and setting its one option::
Various metrics and telemetry can be obtained from the account, container, and
object servers using the recon server middleware and the swift-recon cli. To do
so update your account, container, or object servers pipelines to include recon
and add the associated filter config.
object-server.conf sample::
[pipeline:main]
pipeline = recon object-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
container-server.conf sample::
[pipeline:main]
pipeline = recon container-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
account-server.conf sample::
[pipeline:main]
pipeline = recon account-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
The recon_cache_path simply sets the directory where stats for a few items will
be stored. Depending on the method of deployment you may need to create this
directory manually and ensure that swift has read/write.
directory manually and ensure that swift has read/write access.
If you wish to enable reporting of replication times you can enable recon
support in the object-replicator section of the object-server.conf::
[object-replicator]
...
recon_enable = yes
recon_cache_path = /var/cache/swift
Finally if you also wish to track asynchronous pending's you will need to setup
a cronjob to run the swift-recon-cron script periodically::
Finally, if you also wish to track asynchronous pending on your object
servers you will need to setup a cronjob to run the swift-recon-cron script
periodically on your object servers::
*/5 * * * * swift /usr/bin/swift-recon-cron /etc/swift/object-server.conf
Once enabled a GET request for "/recon/<metric>" to the object server will
return a json formatted response::
Once the recon middleware is enabled a GET request for "/recon/<metric>" to
the server will return a json formatted response::
fhines@ubuntu:~$ curl -i http://localhost:6030/recon/async
HTTP/1.1 200 OK
@ -288,30 +301,39 @@ return a json formatted response::
{"async_pending": 0}
The following metrics and telemetry are currently exposed:
The following metrics and telemetry are currently exposed::
================== ====================================================
Request URI Description
------------------ ----------------------------------------------------
/recon/load returns 1,5, and 15 minute load average
/recon/async returns count of async pending
/recon/mem returns /proc/meminfo
/recon/replication returns last logged object replication time
/recon/mounted returns *ALL* currently mounted filesystems
/recon/unmounted returns all unmounted drives if mount_check = True
/recon/diskusage returns disk utilization for storage devices
/recon/ringmd5 returns object/container/account ring md5sums
/recon/quarantined returns # of quarantined objects/accounts/containers
/recon/sockstat returns consumable info from /proc/net/sockstat|6
================== ====================================================
======================== ========================================================================================
Request URI Description
------------------------ ----------------------------------------------------------------------------------------
/recon/load returns 1,5, and 15 minute load average
/recon/mem returns /proc/meminfo
/recon/mounted returns *ALL* currently mounted filesystems
/recon/unmounted returns all unmounted drives if mount_check = True
/recon/diskusage returns disk utilization for storage devices
/recon/ringmd5 returns object/container/account ring md5sums
/recon/quarantined returns # of quarantined objects/accounts/containers
/recon/sockstat returns consumable info from /proc/net/sockstat|6
/recon/devices returns list of devices and devices dir i.e. /srv/node
/recon/async returns count of async pending
/recon/replication returns object replication times (for backward compatability)
/recon/replication/<type> returns replication info for given type (account, container, object)
/recon/auditor/<type> returns auditor stats on last reported scan for given type (account, container, object)
/recon/updater/<type> returns last updater sweep times for given type (container, object)
========================= =======================================================================================
This information can also be queried via the swift-recon command line utility::
fhines@ubuntu:~$ swift-recon -h
===============================================================================
Usage:
usage: swift-recon [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
usage: swift-recon <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d]
[-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
<server_type> account|container|object
Defaults to object server.
ex: swift-recon container -l --auditor
Options:
-h, --help show this help message and exit
@ -319,28 +341,32 @@ This information can also be queried via the swift-recon command line utility::
--suppress Suppress most connection related errors
-a, --async Get async stats
-r, --replication Get replication stats
--auditor Get auditor stats
--updater Get updater stats
--expirer Get expirer stats
-u, --unmounted Check cluster for unmounted devices
-d, --diskusage Get disk usage stats
-l, --loadstats Get cluster load average stats
-q, --quarantined Get cluster quarantine stats
--objmd5 Get md5sums of object.ring.gz and compare to local
copy
--md5 Get md5sum of servers ring and compare to local copy
--sockstat Get cluster socket usage stats
--all Perform all checks. Equivalent to -arudlq --objmd5
--socketstat
--all Perform all checks. Equal to -arudlq --md5 --sockstat
-z ZONE, --zone=ZONE Only query servers in specified zone
-t SECONDS, --timeout=SECONDS
Time to wait for a response from a server
--swiftdir=SWIFTDIR Default = /etc/swift
For example, to obtain quarantine stats from all hosts in zone "3"::
For example, to obtain container replication info from all hosts in zone "3"::
fhines@ubuntu:~$ swift-recon -q --zone 3
fhines@ubuntu:~$ swift-recon container -r --zone 3
===============================================================================
[2011-10-18 19:36:00] Checking quarantine dirs on 1 hosts...
[Quarantined objects] low: 4, high: 4, avg: 4, total: 4
[Quarantined accounts] low: 0, high: 0, avg: 0, total: 0
[Quarantined containers] low: 0, high: 0, avg: 0, total: 0
--> Starting reconnaissance on 1 hosts
===============================================================================
[2012-04-02 02:45:48] Checking on replication
[failure] low: 0.000, high: 0.000, avg: 0.000, reported: 1
[success] low: 486.000, high: 486.000, avg: 486.000, reported: 1
[replication_time] low: 20.853, high: 20.853, avg: 20.853, reported: 1
[attempted] low: 243.000, high: 243.000, avg: 243.000, reported: 1
---------------------------
Reporting Metrics to StatsD

View File

@ -22,7 +22,7 @@
# db_preallocation = off
[pipeline:main]
pipeline = account-server
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
@ -33,6 +33,10 @@ use = egg:swift#account
# set log_requests = True
# auto_create_account_prefix = .
[filter:recon]
use = egg:swift#recon
# recon_cache_path = /var/cache/swift
[account-replicator]
# You can override the default log routing for this app here (don't use set!):
# log_name = account-replicator
@ -54,6 +58,7 @@ use = egg:swift#account
# reclaim_age = 604800
# Time in seconds to wait between replication passes
# run_pause = 30
# recon_cache_path = /var/cache/swift
[account-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -62,6 +67,9 @@ use = egg:swift#account
# log_level = INFO
# Will audit, at most, 1 account per device per interval
# interval = 1800
# log_facility = LOG_LOCAL0
# log_level = INFO
# recon_cache_path = /var/cache/swift
[account-reaper]
# You can override the default log routing for this app here (don't use set!):

View File

@ -25,7 +25,7 @@
# db_preallocation = off
[pipeline:main]
pipeline = container-server
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
@ -39,6 +39,10 @@ use = egg:swift#container
# allow_versions = False
# auto_create_account_prefix = .
[filter:recon]
use = egg:swift#recon
#recon_cache_path = /var/cache/swift
[container-replicator]
# You can override the default log routing for this app here (don't use set!):
# log_name = container-replicator
@ -55,7 +59,7 @@ use = egg:swift#container
# reclaim_age = 604800
# Time in seconds to wait between replication passes
# run_pause = 30
# recon_cache_path = /var/cache/swift
[container-updater]
# You can override the default log routing for this app here (don't use set!):
@ -70,6 +74,7 @@ use = egg:swift#container
# slowdown = 0.01
# Seconds to suppress updating an account that has generated an error
# account_suppression_time = 60
# recon_cache_path = /var/cache/swift
[container-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -78,6 +83,7 @@ use = egg:swift#container
# log_level = INFO
# Will audit, at most, 1 container per device per interval
# interval = 1800
# recon_cache_path = /var/cache/swift
[container-sync]
# You can override the default log routing for this app here (don't use set!):

View File

@ -45,8 +45,8 @@ use = egg:swift#object
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
recon_lock_path = /var/lock
#recon_cache_path = /var/cache/swift
#recon_lock_path = /var/lock
[object-replicator]
# You can override the default log routing for this app here (don't use set!):
@ -68,10 +68,8 @@ recon_lock_path = /var/lock
# lockup_timeout = 1800
# The replicator also performs reclamation
# reclaim_age = 604800
# enable logging of replication stats for recon
# recon_enable = no
# recon_cache_path = /var/cache/swift
# ring_check_interval = 15
# recon_cache_path = /var/cache/swift
[object-updater]
# You can override the default log routing for this app here (don't use set!):
@ -84,6 +82,7 @@ recon_lock_path = /var/lock
# conn_timeout = 0.5
# slowdown will sleep that amount between objects
# slowdown = 0.01
# recon_cache_path = /var/cache/swift
[object-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -94,4 +93,4 @@ recon_lock_path = /var/lock
# bytes_per_second = 10000000
# log_time = 3600
# zero_byte_files_per_second = 50
# recon_cache_path = /var/cache/swift

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from random import random
@ -20,7 +21,7 @@ import swift.common.db
from swift.account import server as account_server
from swift.common.db import AccountBroker
from swift.common.utils import get_logger, audit_location_generator, \
TRUE_VALUES
TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
from eventlet import Timeout
@ -40,6 +41,9 @@ class AccountAuditor(Daemon):
self.account_failures = 0
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "account.recon")
def _one_audit_pass(self, reported):
all_locs = audit_location_generator(self.devices,
@ -53,6 +57,12 @@ class AccountAuditor(Daemon):
{'time': time.ctime(reported),
'passed': self.account_passes,
'failed': self.account_failures})
self.account_audit(path)
dump_recon_cache({'account_audits_since': reported,
'account_audits_passed': self.account_passes,
'account_audits_failed':
self.account_failures},
self.rcache, self.logger)
reported = time.time()
self.account_passes = 0
self.account_failures = 0
@ -75,6 +85,8 @@ class AccountAuditor(Daemon):
time.sleep(self.interval - elapsed)
self.logger.info(
_('Account audit pass completed: %.02fs'), elapsed)
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def run_once(self, *args, **kwargs):
"""Run the account audit once."""
@ -84,6 +96,8 @@ class AccountAuditor(Daemon):
elapsed = time.time() - begin
self.logger.info(
_('Account audit "once" mode completed: %.02fs'), elapsed)
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def account_audit(self, path):
"""

View File

@ -31,7 +31,8 @@ from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \
import swift.common.db
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, TRUE_VALUES, unlink_older_than
renamer, mkdirs, lock_parent_directory, TRUE_VALUES, unlink_older_than, \
dump_recon_cache
from swift.common import ring
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
@ -124,6 +125,11 @@ class Replicator(Daemon):
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self._zero_stats()
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.recon_replicator = '%s.recon' % self.server_type
self.rcache = os.path.join(self.recon_cache_path,
self.recon_replicator)
def _zero_stats(self):
"""Zero out the stats."""
@ -144,6 +150,9 @@ class Replicator(Daemon):
self.logger.info(_('Removed %(remove)d dbs') % self.stats)
self.logger.info(_('%(success)s successes, %(failure)s failures')
% self.stats)
dump_recon_cache({'replication_stats': self.stats,
'replication_time': time.time() - self.stats['start']
}, self.rcache, self.logger)
self.logger.info(' '.join(['%s:%s' % item for item in
self.stats.items() if item[0] in
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty',

View File

@ -17,7 +17,7 @@ import errno
import os
from webob import Request, Response
from swift.common.utils import split_path, get_logger
from swift.common.utils import split_path, get_logger, TRUE_VALUES
from swift.common.constraints import check_mount
from resource import getpagesize
from hashlib import md5
@ -46,16 +46,41 @@ class ReconMiddleware(object):
self.devices = conf.get('devices', '/srv/node/')
swift_dir = conf.get('swift_dir', '/etc/swift')
self.logger = get_logger(conf, log_route='recon')
self.recon_cache_path = conf.get('recon_cache_path', \
'/var/cache/swift')
self.object_recon_cache = "%s/object.recon" % self.recon_cache_path
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.object_recon_cache = os.path.join(self.recon_cache_path,
'object.recon')
self.container_recon_cache = os.path.join(self.recon_cache_path,
'container.recon')
self.account_recon_cache = os.path.join(self.recon_cache_path,
'account.recon')
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
self.rings = [self.account_ring_path, self.container_ring_path, \
self.object_ring_path]
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.rings = [self.account_ring_path, self.container_ring_path,
self.object_ring_path]
self.mount_check = conf.get('mount_check', 'true').lower() \
in TRUE_VALUES
def _from_recon_cache(self, cache_keys, cache_file, openr=open):
"""retrieve values from a recon cache file
:params cache_keys: list of cache items to retrieve
:params cache_file: cache file to retrieve items from.
:params openr: open to use [for unittests]
:return: dict of cache items and their value or none if not found
"""
try:
with openr(cache_file, 'r') as f:
recondata = json.load(f)
return dict((key, recondata.get(key)) for key in cache_keys)
except IOError:
self.logger.exception(_('Error reading recon cache file'))
except ValueError:
self.logger.exception(_('Error parsing recon cache file'))
except Exception:
self.logger.exception(_('Error retrieving recon data'))
return dict((key, None) for key in cache_keys)
def get_mounted(self, openr=open):
"""get ALL mounted fs from /proc/mounts"""
@ -89,36 +114,73 @@ class ReconMiddleware(object):
meminfo[entry[0]] = entry[1].strip()
return meminfo
def get_async_info(self, openr=open):
def get_async_info(self):
"""get # of async pendings"""
asyncinfo = {}
with openr(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'async_pending' in recondata:
asyncinfo['async_pending'] = recondata['async_pending']
else:
self.logger.notice( \
_('NOTICE: Async pendings not in recon data.'))
asyncinfo['async_pending'] = -1
return asyncinfo
return self._from_recon_cache(['async_pending'],
self.object_recon_cache)
def get_replication_info(self, openr=open):
"""grab last object replication time"""
repinfo = {}
with openr(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'object_replication_time' in recondata:
repinfo['object_replication_time'] = \
recondata['object_replication_time']
else:
self.logger.notice( \
_('NOTICE: obj replication time not in recon data'))
repinfo['object_replication_time'] = -1
return repinfo
def get_replication_info(self, recon_type):
"""get replication info"""
if recon_type == 'account':
return self._from_recon_cache(['replication_time',
'replication_stats'],
self.account_recon_cache)
elif recon_type == 'container':
return self._from_recon_cache(['replication_time',
'replication_stats'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_replication_time'],
self.object_recon_cache)
else:
return None
def get_device_info(self):
"""place holder, grab dev info"""
return self.devices
"""get devices"""
try:
return {self.devices: os.listdir(self.devices)}
except Exception:
self.logger.exception(_('Error listing devices'))
return {self.devices: None}
def get_updater_info(self, recon_type):
"""get updater info"""
if recon_type == 'container':
return self._from_recon_cache(['container_updater_sweep'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_updater_sweep'],
self.object_recon_cache)
else:
return None
def get_expirer_info(self, recon_type):
"""get expirer info"""
if recon_type == 'object':
return self._from_recon_cache(['object_expiration_pass',
'expired_last_pass'],
self.object_recon_cache)
def get_auditor_info(self, recon_type):
"""get auditor info"""
if recon_type == 'account':
return self._from_recon_cache(['account_audits_passed',
'account_auditor_pass_completed',
'account_audits_since',
'account_audits_failed'],
self.account_recon_cache)
elif recon_type == 'container':
return self._from_recon_cache(['container_audits_passed',
'container_auditor_pass_completed',
'container_audits_since',
'container_audits_failed'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
self.object_recon_cache)
else:
return None
def get_unmounted(self):
"""list unmounted (failed?) devices"""
@ -152,12 +214,18 @@ class ReconMiddleware(object):
sums = {}
for ringfile in self.rings:
md5sum = md5()
with openr(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
sums[ringfile] = md5sum.hexdigest()
if os.path.exists(ringfile):
try:
with openr(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
sums[ringfile] = md5sum.hexdigest()
except IOError, err:
sums[ringfile] = None
if err.errno != errno.ENOENT:
self.logger.exception(_('Error reading ringfile'))
return sums
def get_quarantine_count(self):
@ -193,7 +261,7 @@ class ReconMiddleware(object):
int(tcpstats[10]) * getpagesize()
except IOError as e:
if e.errno != errno.ENOENT:
raise
raise
try:
with openr('/proc/net/sockstat6', 'r') as proc_sockstat6:
for entry in proc_sockstat6:
@ -205,54 +273,50 @@ class ReconMiddleware(object):
return sockstat
def GET(self, req):
error = False
root, type = split_path(req.path, 1, 2, False)
try:
if type == "mem":
content = json.dumps(self.get_mem())
elif type == "load":
content = json.dumps(self.get_load(), sort_keys=True)
elif type == "async":
try:
content = json.dumps(self.get_async_info())
except IOError as e:
error = True
content = "async - %s" % e
elif type == "replication":
try:
content = json.dumps(self.get_replication_info())
except IOError as e:
error = True
content = "replication - %s" % e
elif type == "mounted":
content = json.dumps(self.get_mounted())
elif type == "unmounted":
content = json.dumps(self.get_unmounted())
elif type == "diskusage":
content = json.dumps(self.get_diskusage())
elif type == "ringmd5":
content = json.dumps(self.get_ring_md5())
elif type == "quarantined":
content = json.dumps(self.get_quarantine_count())
elif type == "sockstat":
content = json.dumps(self.get_socket_info())
else:
content = "Invalid path: %s" % req.path
return Response(request=req, status="400 Bad Request", \
body=content, content_type="text/plain")
except ValueError as e:
error = True
content = "ValueError: %s" % e
if not error:
return Response(request=req, body=content, \
content_type="application/json")
root, rcheck, rtype = split_path(req.path, 1, 3, True)
all_rtypes = ['account', 'container', 'object']
if rcheck == "mem":
content = self.get_mem()
elif rcheck == "load":
content = self.get_load()
elif rcheck == "async":
content = self.get_async_info()
elif rcheck == 'replication' and rtype in all_rtypes:
content = self.get_replication_info(rtype)
elif rcheck == 'replication' and rtype is None:
#handle old style object replication requests
content = self.get_replication_info('object')
elif rcheck == "devices":
content = self.get_device_info()
elif rcheck == "updater" and rtype in ['container', 'object']:
content = self.get_updater_info(rtype)
elif rcheck == "auditor" and rtype in all_rtypes:
content = self.get_auditor_info(rtype)
elif rcheck == "expirer" and rtype == 'object':
content = self.get_expirer_info(rtype)
elif rcheck == "mounted":
content = self.get_mounted()
elif rcheck == "unmounted":
content = self.get_unmounted()
elif rcheck == "diskusage":
content = self.get_diskusage()
elif rcheck == "ringmd5":
content = self.get_ring_md5()
elif rcheck == "quarantined":
content = self.get_quarantine_count()
elif rcheck == "sockstat":
content = self.get_socket_info()
else:
msg = 'CRITICAL recon - %s' % str(content)
self.logger.critical(msg)
body = "Internal server error."
return Response(request=req, status="500 Server Error", \
body=body, content_type="text/plain")
content = "Invalid path: %s" % req.path
return Response(request=req, status="404 Not Found",
body=content, content_type="text/plain")
if content:
return Response(request=req, body=json.dumps(content),
content_type="application/json")
else:
return Response(request=req, status="500 Server Error",
body="Internal server error.",
content_type="text/plain")
def __call__(self, env, start_response):
req = Request(env)

View File

@ -41,7 +41,7 @@ import glob
from urlparse import urlparse as stdlib_urlparse, ParseResult
import eventlet
from eventlet import GreenPool, sleep
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import socket, threading
import netifaces
@ -1184,35 +1184,39 @@ def human_readable(value):
return '%d%si' % (round(value), suffixes[index])
def dump_recon_cache(cache_key, cache_value, cache_file, lock_timeout=2):
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
"""Update recon cache values
:param cache_key: key to update
:param cache_value: value you want to set key too
:param cache_dict: Dictionary of cache key/value pairs to write out
:param cache_file: cache file to update
:param logger: the logger to use to log an encountered error
:param lock_timeout: timeout (in seconds)
"""
with lock_file(cache_file, lock_timeout, unlink=False) as cf:
cache_entry = {}
try:
existing_entry = cf.readline()
if existing_entry:
cache_entry = json.loads(existing_entry)
except ValueError:
#file doesn't have a valid entry, we'll recreate it
pass
cache_entry[cache_key] = cache_value
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
tf.write(json.dumps(cache_entry) + '\n')
os.rename(tf.name, cache_file)
finally:
try:
with lock_file(cache_file, lock_timeout, unlink=False) as cf:
cache_entry = {}
try:
os.unlink(tf.name)
except OSError, err:
if err.errno != errno.ENOENT:
raise
existing_entry = cf.readline()
if existing_entry:
cache_entry = json.loads(existing_entry)
except ValueError:
#file doesn't have a valid entry, we'll recreate it
pass
for cache_key, cache_value in cache_dict.items():
cache_entry[cache_key] = cache_value
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
tf.write(json.dumps(cache_entry) + '\n')
os.rename(tf.name, cache_file)
finally:
try:
os.unlink(tf.name)
except OSError, err:
if err.errno != errno.ENOENT:
raise
except (Exception, Timeout):
logger.exception(_('Exception dumping recon cache'))
def listdir(path):

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from random import random
@ -22,7 +23,7 @@ import swift.common.db
from swift.container import server as container_server
from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, audit_location_generator, \
TRUE_VALUES
TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
@ -36,11 +37,13 @@ class ContainerAuditor(Daemon):
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.interval = int(conf.get('interval', 1800))
swift_dir = conf.get('swift_dir', '/etc/swift')
self.container_passes = 0
self.container_failures = 0
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
def _one_audit_pass(self, reported):
all_locs = audit_location_generator(self.devices,
@ -56,6 +59,12 @@ class ContainerAuditor(Daemon):
{'time': time.ctime(reported),
'pass': self.container_passes,
'fail': self.container_failures})
dump_recon_cache({'container_audits_since': reported,
'container_audits_passed':
self.container_passes,
'container_audits_failed':
self.container_failures},
self.rcache, self.logger)
reported = time.time()
self.container_passes = 0
self.container_failures = 0
@ -78,6 +87,8 @@ class ContainerAuditor(Daemon):
time.sleep(self.interval - elapsed)
self.logger.info(
_('Container audit pass completed: %.02fs'), elapsed)
dump_recon_cache({'container_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def run_once(self, *args, **kwargs):
"""Run the container audit once."""
@ -87,6 +98,8 @@ class ContainerAuditor(Daemon):
elapsed = time.time() - begin
self.logger.info(
_('Container audit "once" mode completed: %.02fs'), elapsed)
dump_recon_cache({'container_auditor_pass_completed': elapsed},
self.recon_container)
def container_audit(self, path):
"""

View File

@ -29,7 +29,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.db import ContainerBroker
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, TRUE_VALUES
from swift.common.utils import get_logger, TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
@ -59,6 +59,9 @@ class ContainerUpdater(Daemon):
self.new_account_suppressions = None
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
def get_account_ring(self):
"""Get the account ring. Load it if it hasn't been yet."""
@ -154,6 +157,8 @@ class ContainerUpdater(Daemon):
elapsed = time.time() - begin
self.logger.info(_('Container update sweep completed: %.02fs'),
elapsed)
dump_recon_cache({'container_updater_sweep': elapsed},
self.rcache, self.logger)
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
@ -175,6 +180,8 @@ class ContainerUpdater(Daemon):
'%(no_change)s with no changes'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures, 'no_change': self.no_changes})
dump_recon_cache({'container_updater_sweep': elapsed},
self.rcache, self.logger)
def container_sweep(self, path):
"""

View File

@ -20,7 +20,7 @@ from eventlet import Timeout
from swift.obj import server as object_server
from swift.common.utils import get_logger, audit_location_generator, \
ratelimit_sleep, TRUE_VALUES
ratelimit_sleep, TRUE_VALUES, dump_recon_cache
from swift.common.exceptions import AuditException, DiskFileError, \
DiskFileNotExist
from swift.common.daemon import Daemon
@ -54,6 +54,9 @@ class AuditorWorker(object):
self.passes = 0
self.quarantines = 0
self.errors = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def audit_all_objects(self, mode='once'):
self.logger.info(_('Begin object audit "%s" mode (%s)' %
@ -63,7 +66,6 @@ class AuditorWorker(object):
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
files_running_time = 0
time_auditing = 0
all_locs = audit_location_generator(self.devices,
object_server.DATADIR,
@ -93,6 +95,16 @@ class AuditorWorker(object):
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)})
dump_recon_cache({'object_auditor_stats_%s' %
self.auditor_type: {
'errors': self.errors,
'passes': self.passes,
'quarantined': self.quarantines,
'bytes_processed':
self.bytes_processed,
'start_time': reported,
'audit_time': time_auditing}
}, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors

View File

@ -14,16 +14,15 @@
# limitations under the License.
from random import random
from sys import exc_info
from time import time
from urllib import quote
from os.path import join
from eventlet import sleep, Timeout
from webob import Request
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient
from swift.common.utils import get_logger
from swift.common.utils import get_logger, dump_recon_cache
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED
@ -55,6 +54,9 @@ class ObjectExpirer(Daemon):
self.report_interval = int(conf.get('report_interval') or 300)
self.report_first_time = self.report_last_time = time()
self.report_objects = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = join(self.recon_cache_path, 'object.recon')
def report(self, final=False):
"""
@ -68,6 +70,9 @@ class ObjectExpirer(Daemon):
elapsed = time() - self.report_first_time
self.logger.info(_('Pass completed in %ds; %d objects expired') %
(elapsed, self.report_objects))
dump_recon_cache({'object_expiration_pass': elapsed,
'expired_last_pass': self.report_objects},
self.rcache, self.logger)
elif time() - self.report_last_time >= self.report_interval:
elapsed = time() - self.report_first_time
self.logger.info(_('Pass so far %ds; %d objects expired') %

View File

@ -32,8 +32,7 @@ from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
TRUE_VALUES
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -247,11 +246,9 @@ class ObjectReplicator(Daemon):
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_enable = conf.get(
'recon_enable', 'no').lower() in TRUE_VALUES
self.recon_cache_path = conf.get(
'recon_cache_path', '/var/cache/swift')
self.recon_object = os.path.join(self.recon_cache_path, "object.recon")
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def _rsync(self, args):
"""
@ -598,12 +595,8 @@ class ObjectReplicator(Daemon):
total = (time.time() - start) / 60
self.logger.info(
_("Object replication complete. (%.02f minutes)"), total)
if self.recon_enable:
try:
dump_recon_cache('object_replication_time', total, \
self.recon_object)
except (Exception, Timeout):
self.logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'object_replication_time': total},
self.rcache, self.logger)
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object replicator in daemon mode."))
@ -616,12 +609,8 @@ class ObjectReplicator(Daemon):
total = (time.time() - start) / 60
self.logger.info(
_("Object replication complete. (%.02f minutes)"), total)
if self.recon_enable:
try:
dump_recon_cache('object_replication_time', total, \
self.recon_object)
except (Exception, Timeout):
self.logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'object_replication_time': total},
self.rcache, self.logger)
self.logger.debug(_('Replication sleeping for %s seconds.'),
self.run_pause)
sleep(self.run_pause)

View File

@ -25,7 +25,8 @@ from eventlet import patcher, Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache
from swift.common.daemon import Daemon
from swift.obj.server import ASYNCDIR
from swift.common.http import is_success, HTTP_NOT_FOUND, \
@ -50,6 +51,9 @@ class ObjectUpdater(Daemon):
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.successes = 0
self.failures = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
def get_container_ring(self):
"""Get the container ring. Load it, if it hasn't been yet."""
@ -97,6 +101,8 @@ class ObjectUpdater(Daemon):
elapsed = time.time() - begin
self.logger.info(_('Object update sweep completed: %.02fs'),
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
@ -119,6 +125,8 @@ class ObjectUpdater(Daemon):
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures})
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
def object_sweep(self, device):
"""

View File

@ -30,6 +30,16 @@ class FakeApp(object):
def start_response(*args):
pass
class FakeFromCache(object):
def __init__(self, out=None):
self.fakeout = out
self.fakeout_calls = []
def fake_from_recon_cache(self, *args, **kwargs):
self.fakeout_calls.append((args, kwargs))
return self.fakeout
class OpenAndReadTester(object):
def __init__(self, output_iter):
@ -93,12 +103,78 @@ class MockOS(object):
return stat_result(self.lstat_output_tuple)
class FakeRecon(object):
def __init__(self):
self.fake_replication_rtype = None
self.fake_updater_rtype = None
self.fake_auditor_rtype = None
self.fake_expirer_rtype = None
def fake_mem(self):
return {'memtest': "1"}
def fake_load(self):
return {'loadtest': "1"}
def fake_async(self):
return {'asynctest': "1"}
def fake_get_device_info(self):
return {"/srv/1/node": ["sdb1"]}
def fake_replication(self, recon_type):
self.fake_replication_rtype = recon_type
return {'replicationtest': "1"}
def fake_updater(self, recon_type):
self.fake_updater_rtype = recon_type
return {'updatertest': "1"}
def fake_auditor(self, recon_type):
self.fake_auditor_rtype = recon_type
return {'auditortest': "1"}
def fake_expirer(self, recon_type):
self.fake_expirer_rtype = recon_type
return {'expirertest': "1"}
def fake_mounted(self):
return {'mountedtest': "1"}
def fake_unmounted(self):
return {'unmountedtest': "1"}
def fake_diskusage(self):
return {'diskusagetest': "1"}
def fake_ringmd5(self):
return {'ringmd5test': "1"}
def fake_quarantined(self):
return {'quarantinedtest': "1"}
def fake_sockstat(self):
return {'sockstattest': "1"}
def nocontent(self):
return None
def raise_IOError(self, *args, **kwargs):
raise IOError
def raise_ValueError(self, *args, **kwargs):
raise ValueError
def raise_Exception(self, *args, **kwargs):
raise Exception
class TestReconSuccess(TestCase):
def setUp(self):
self.app = recon.ReconMiddleware(FakeApp(), {})
self.mockos = MockOS()
self.fakecache = FakeFromCache()
self.real_listdir = os.listdir
self.real_path_exists = os.path.exists
self.real_lstat = os.lstat
@ -107,6 +183,9 @@ class TestReconSuccess(TestCase):
os.path.exists = self.mockos.fake_path_exists
os.lstat = self.mockos.fake_lstat
os.statvfs = self.mockos.fake_statvfs
self.real_from_cache = self.app._from_recon_cache
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
self.frecon = FakeRecon()
def tearDown(self):
os.listdir = self.real_listdir
@ -114,6 +193,42 @@ class TestReconSuccess(TestCase):
os.lstat = self.real_lstat
os.statvfs = self.real_statvfs
del self.mockos
self.app._from_recon_cache = self.real_from_cache
del self.fakecache
def test_from_recon_cache(self):
oart = OpenAndReadTester(['{"notneeded": 5, "testkey1": "canhazio"}'])
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('test.cache', 'r'), {})])
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': 'canhazio'})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_ioerror(self):
oart = self.frecon.raise_IOError
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_valueerror(self):
oart = self.frecon.raise_ValueError
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_exception(self):
oart = self.frecon.raise_Exception
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_get_mounted(self):
mounts_content = ['rootfs / rootfs rw 0 0',
@ -255,40 +370,166 @@ class TestReconSuccess(TestCase):
self.assertEquals(rv, meminfo_resp)
def test_get_async_info(self):
obj_recon_content = """{"object_replication_time": 200.0, "async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_async_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
from_cache_response = {'async_pending': 5}
self.fakecache.fakeout = from_cache_response
rv = self.app.get_async_info()
self.assertEquals(rv, {'async_pending': 5})
def test_get_async_info_empty_file(self):
obj_recon_content = """{"object_replication_time": 200.0}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_async_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
self.assertEquals(rv, {'async_pending': -1})
def test_get_replication_info_account(self):
from_cache_response = {"replication_stats": {
"attempted": 1, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 2, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 1333044050.855202,
"success": 2, "ts_repl": 0 },
"replication_time": 0.2615511417388916}
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('account')
self.assertEquals(self.fakecache.fakeout_calls,
[((['replication_time', 'replication_stats'],
'/var/cache/swift/account.recon'), {})])
self.assertEquals(rv, {"replication_stats": {
"attempted": 1, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 2, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 1333044050.855202,
"success": 2, "ts_repl": 0 },
"replication_time": 0.2615511417388916})
def test_get_replication_info(self):
obj_recon_content = """{"object_replication_time": 200.0, "async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_replication_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
def test_get_replication_info_container(self):
from_cache_response = {"replication_time": 200.0,
"replication_stats": {
"attempted": 179, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 358, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 5.5, "success": 358,
"ts_repl": 0}}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['replication_time', 'replication_stats'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"replication_time": 200.0,
"replication_stats": {
"attempted": 179, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 358, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 5.5, "success": 358,
"ts_repl": 0}})
def test_get_replication_object(self):
from_cache_response = {"object_replication_time": 200.0}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_replication_time'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {'object_replication_time': 200.0})
def test_get_replication_info_empty_file(self):
obj_recon_content = """{"async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_replication_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
self.assertEquals(rv, {'object_replication_time': -1})
def test_get_updater_info_container(self):
from_cache_response = {"container_updater_sweep": 18.476239919662476}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_updater_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['container_updater_sweep'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"container_updater_sweep": 18.476239919662476})
def test_get_device_info(self):
rv = self.app.get_device_info()
self.assertEquals(rv, '/srv/node/')
def test_get_updater_info_object(self):
from_cache_response = {"object_updater_sweep": 0.79848217964172363}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_updater_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_updater_sweep'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {"object_updater_sweep": 0.79848217964172363})
def test_get_auditor_info_account(self):
from_cache_response = {"account_auditor_pass_completed": 0.24,
"account_audits_failed": 0,
"account_audits_passed": 6,
"account_audits_since": "1333145374.1373529"}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('account')
self.assertEquals(self.fakecache.fakeout_calls,
[((['account_audits_passed',
'account_auditor_pass_completed',
'account_audits_since',
'account_audits_failed'],
'/var/cache/swift/account.recon'), {})])
self.assertEquals(rv, {"account_auditor_pass_completed": 0.24,
"account_audits_failed": 0,
"account_audits_passed": 6,
"account_audits_since": "1333145374.1373529"})
def test_get_auditor_info_container(self):
from_cache_response = {"container_auditor_pass_completed": 0.24,
"container_audits_failed": 0,
"container_audits_passed": 6,
"container_audits_since": "1333145374.1373529"}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['container_audits_passed',
'container_auditor_pass_completed',
'container_audits_since',
'container_audits_failed'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"container_auditor_pass_completed": 0.24,
"container_audits_failed": 0,
"container_audits_passed": 6,
"container_audits_since": "1333145374.1373529"})
def test_get_auditor_info_object(self):
from_cache_response = {"object_auditor_stats_ALL": {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 },
"object_auditor_stats_ZBF": {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 }}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {"object_auditor_stats_ALL": {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 },
"object_auditor_stats_ZBF": {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 }})
def test_get_unmounted(self):
@ -319,7 +560,8 @@ class TestReconSuccess(TestCase):
self.mockos.statvfs_output=statvfs_content
self.mockos.path_exists_output=True
rv = self.app.get_diskusage()
self.assertEquals(self.mockos.statvfs_calls,[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(self.mockos.statvfs_calls,
[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(rv, du_resp)
def test_get_diskusage_checkmount_fail(self):
@ -329,11 +571,12 @@ class TestReconSuccess(TestCase):
self.mockos.path_exists_output=False
rv = self.app.get_diskusage()
self.assertEquals(self.mockos.listdir_calls,[(('/srv/node/',), {})])
self.assertEquals(self.mockos.path_exists_calls,[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(self.mockos.path_exists_calls,
[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(rv, du_resp)
def test_get_quarantine_count(self):
#posix.lstat_result(st_mode=1, st_ino=2, st_dev=3, st_nlink=4,
#posix.lstat_result(st_mode=1, st_ino=2, st_dev=3, st_nlink=4,
# st_uid=5, st_gid=6, st_size=7, st_atime=8,
# st_mtime=9, st_ctime=10)
lstat_content = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
@ -352,63 +595,28 @@ class TestReconSuccess(TestCase):
sockstat6_content = ['TCP6: inuse 1',
'UDP6: inuse 3',
'UDPLITE6: inuse 0',
'RAW6: inuse 0',
'RAW6: inuse 0',
'FRAG6: inuse 0 memory 0',
'']
oart = OpenAndReadTester(sockstat_content)
rv = self.app.get_socket_info(openr=oart.open)
self.assertEquals(oart.open_calls, [(('/proc/net/sockstat', 'r'), {}),
(('/proc/net/sockstat6', 'r'), {})])
#todo verify parsed result of sockstat6
#self.assertEquals(rv, {'time_wait': 0, 'tcp_in_use': 30, 'orphan': 0, 'tcp_mem_allocated_bytes': 0})
class FakeRecon(object):
def fake_mem(self):
return {'memtest': "1"}
def fake_load(self):
return {'loadtest': "1"}
def fake_async(self):
return {'asynctest': "1"}
def fake_replication(self):
return {'replicationtest': "1"}
def fake_mounted(self):
return {'mountedtest': "1"}
def fake_unmounted(self):
return {'unmountedtest': "1"}
def fake_diskusage(self):
return {'diskusagetest': "1"}
def fake_ringmd5(self):
return {'ringmd5test': "1"}
def fake_quarantined(self):
return {'quarantinedtest': "1"}
def fake_sockstat(self):
return {'sockstattest': "1"}
def raise_IOError(self):
raise IOError
def raise_ValueError(self):
raise ValueError
class TestHealthCheck(unittest.TestCase):
class TestReconMiddleware(unittest.TestCase):
def setUp(self):
self.frecon = FakeRecon()
self.app = recon.ReconMiddleware(FakeApp(), {})
self.app = recon.ReconMiddleware(FakeApp(), {'object_recon': "true"})
#self.app.object_recon = True
self.app.get_mem = self.frecon.fake_mem
self.app.get_load = self.frecon.fake_load
self.app.get_async_info = self.frecon.fake_async
self.app.get_device_info = self.frecon.fake_get_device_info
self.app.get_replication_info = self.frecon.fake_replication
self.app.get_auditor_info = self.frecon.fake_auditor
self.app.get_updater_info = self.frecon.fake_updater
self.app.get_expirer_info = self.frecon.fake_expirer
self.app.get_mounted = self.frecon.fake_mounted
self.app.get_unmounted = self.frecon.fake_unmounted
self.app.get_diskusage = self.frecon.fake_diskusage
@ -434,75 +642,185 @@ class TestHealthCheck(unittest.TestCase):
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_async_resp)
def test_recon_get_async_ioerror(self):
orig = self.app.get_async_info
self.app.get_async_info = self.frecon.raise_IOError
req = Request.blank('/recon/async', environ={'REQUEST_METHOD': 'GET'})
def test_get_device_info(self):
get_device_resp = ['{"/srv/1/node": ["sdb1"]}']
req = Request.blank('/recon/devices',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
self.assertEquals(resp, get_device_resp)
def test_recon_get_replication(self):
def test_recon_get_replication_notype(self):
get_replication_resp = ['{"replicationtest": "1"}']
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/replication',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'object')
self.frecon.fake_replication_rtype = None
def test_recon_get_replication_ioerror(self):
orig = self.app.get_replication_info
self.app.get_replication_info = self.frecon.raise_IOError
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
def test_recon_get_replication_all(self):
get_replication_resp = ['{"replicationtest": "1"}']
#test account
req = Request.blank('/recon/replication/account',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'account')
self.frecon.fake_replication_rtype = None
#test container
req = Request.blank('/recon/replication/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'container')
self.frecon.fake_replication_rtype = None
#test object
req = Request.blank('/recon/replication/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'object')
self.frecon.fake_replication_rtype = None
def test_recon_get_auditor_invalid(self):
get_auditor_resp = ['Invalid path: /recon/auditor/invalid']
req = Request.blank('/recon/auditor/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
def test_recon_get_auditor_notype(self):
get_auditor_resp = ['Invalid path: /recon/auditor']
req = Request.blank('/recon/auditor',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
def test_recon_get_auditor_all(self):
get_auditor_resp = ['{"auditortest": "1"}']
req = Request.blank('/recon/auditor/account',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'account')
self.frecon.fake_auditor_rtype = None
req = Request.blank('/recon/auditor/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'container')
self.frecon.fake_auditor_rtype = None
req = Request.blank('/recon/auditor/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'object')
self.frecon.fake_auditor_rtype = None
def test_recon_get_updater_invalid(self):
get_updater_resp = ['Invalid path: /recon/updater/invalid']
req = Request.blank('/recon/updater/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_updater_notype(self):
get_updater_resp = ['Invalid path: /recon/updater']
req = Request.blank('/recon/updater',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_updater(self):
get_updater_resp = ['{"updatertest": "1"}']
req = Request.blank('/recon/updater/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(self.frecon.fake_updater_rtype, 'container')
self.frecon.fake_updater_rtype = None
self.assertEquals(resp, get_updater_resp)
req = Request.blank('/recon/updater/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
self.assertEquals(self.frecon.fake_updater_rtype, 'object')
self.frecon.fake_updater_rtype = None
def test_recon_get_expirer_invalid(self):
get_updater_resp = ['Invalid path: /recon/expirer/invalid']
req = Request.blank('/recon/expirer/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_expirer_notype(self):
get_updater_resp = ['Invalid path: /recon/expirer']
req = Request.blank('/recon/expirer',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_expirer_object(self):
get_expirer_resp = ['{"expirertest": "1"}']
req = Request.blank('/recon/expirer/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_expirer_resp)
self.assertEquals(self.frecon.fake_expirer_rtype, 'object')
self.frecon.fake_updater_rtype = None
def test_recon_get_mounted(self):
get_mounted_resp = ['{"mountedtest": "1"}']
req = Request.blank('/recon/mounted', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/mounted',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_mounted_resp)
def test_recon_get_unmounted(self):
get_unmounted_resp = ['{"unmountedtest": "1"}']
req = Request.blank('/recon/unmounted', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/unmounted',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_unmounted_resp)
def test_recon_get_diskusage(self):
get_diskusage_resp = ['{"diskusagetest": "1"}']
req = Request.blank('/recon/diskusage', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/diskusage',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_diskusage_resp)
def test_recon_get_ringmd5(self):
get_ringmd5_resp = ['{"ringmd5test": "1"}']
req = Request.blank('/recon/ringmd5', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/ringmd5',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_ringmd5_resp)
def test_recon_get_quarantined(self):
get_quarantined_resp = ['{"quarantinedtest": "1"}']
req = Request.blank('/recon/quarantined', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/quarantined',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_quarantined_resp)
def test_recon_get_sockstat(self):
get_sockstat_resp = ['{"sockstattest": "1"}']
req = Request.blank('/recon/sockstat', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/sockstat',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_sockstat_resp)
def test_recon_invalid_path(self):
req = Request.blank('/recon/invalid', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, ['Invalid path: /recon/invalid'])
def test_recon_failed_json_dumps(self):
orig = self.app.get_replication_info
self.app.get_replication_info = self.frecon.raise_ValueError
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
def test_no_content(self):
self.app.get_load = self.frecon.nocontent
req = Request.blank('/recon/load', environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
def test_recon_pass(self):

View File

@ -96,7 +96,6 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient()
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
self.assertEquals(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
@ -121,7 +120,9 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient([{'name': str(int(time() + 86400))}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
@ -163,7 +164,9 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() + 86400)}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -177,10 +180,13 @@ class TestObjectExpirer(TestCase):
[{'name': '%d-actual-obj' % ts}])
x.delete_actual_object = should_not_be_called
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj '
'This should not have been called' % (ts, ts),), {},
'This should not have been called')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'This should not have been called' % (ts, ts)])
def test_failed_delete_keeps_entry(self):
class InternalClient(object):
@ -217,10 +223,13 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj '
'failed to delete actual object' % (ts, ts),), {},
'failed to delete actual object')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'failed to delete actual object' % (ts, ts)])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -234,10 +243,13 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj This should '
'not have been called' % (ts, ts),), {},
'This should not have been called')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj This should '
'not have been called' % (ts, ts)])
def test_success_gets_counted(self):
class InternalClient(object):
@ -268,7 +280,6 @@ class TestObjectExpirer(TestCase):
[{'name': '%d-actual-obj' % int(time() - 86400)}])
x.run_once()
self.assertEquals(x.report_objects, 1)
self.assertEquals(x.logger.log_dict['exception'], [])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -317,25 +328,23 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient(containers, objects)
x.delete_actual_object = fail_delete_actual_object
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [
(('Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts, ots),), {},
'failed to delete actual object'),
(('Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts, ots),), {},
'failed to delete actual object'),
(('Exception while deleting container %d failed to delete '
'container' % (cts,),), {},
'failed to delete container'),
(('Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),), {},
'failed to delete actual object'),
(('Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),), {},
'failed to delete actual object'),
(('Exception while deleting container %d failed to delete '
'container' % (cts + 1,),), {},
'failed to delete container')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting, [
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts,),
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts + 1,)])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),