diff --git a/bin/swift-recon b/bin/swift-recon new file mode 100755 index 0000000000..4aa38bc7e9 --- /dev/null +++ b/bin/swift-recon @@ -0,0 +1,312 @@ +#! /usr/bin/env python +""" + cmdline utility to perform cluster reconnaissance +""" + + +from eventlet.green import urllib2 +from swift.common.ring import Ring +import simplejson as json +from hashlib import md5 +import datetime +import eventlet +import optparse +import os + +VERBOSE = False +SUPPRESS_ERRORS = False + + +def getdevices(): + #todo , fitler by zone[s] + ring_file = "/etc/swift/object.ring.gz" + ring_data = Ring(ring_file) + ips = set((n['ip'], n['port']) for n in ring_data.devs) + return ips + + +def scout(base_url, recon_type): + global VERBOSE, SUPPRESS_ERRORS + url = base_url + recon_type + try: + body = urllib2.urlopen(url).read() + content = json.loads(body) + if VERBOSE: + print "-> %s: %s" % (url, content) + status = 200 + except urllib2.HTTPError as e: + if not SUPPRESS_ERRORS or VERBOSE: + print "-> %s: %s" % (url, e) + content = e + status = e.code + except urllib2.URLError as e: + if not SUPPRESS_ERRORS or VERBOSE: + print "-> %s: %s" % (url, e) + content = e + status = -1 + return url, content, status + + +def scout_md5(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "ringmd5") + return url, content, status + + +def scout_async(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "async") + return url, content, status + + +def scout_replication(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "replication") + return url, content, status + + +def scout_load(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "load") + return url, content, status + + +def scout_du(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "diskusage") + return url, content, status + + +def scout_umount(host): + base_url = "http://%s:%s/recon/" % (host[0], host[1]) + url, content, status = scout(base_url, "unmounted") + return url, content, status + + +def get_ringmd5(ringfile): + stats = {} + matches = 0 + errors = 0 + hosts = getdevices() + md5sum = md5() + with open(ringfile, 'rb') as f: + block = f.read(4096) + while block: + md5sum.update(block) + block = f.read(4096) + ring_sum = md5sum.hexdigest() + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Checking ring md5sum's on %s hosts..." % (now, len(hosts)) + if VERBOSE: + print "-> On disk md5sum: %s" % ring_sum + for url, response, status in pool.imap(scout_md5, hosts): + if status == 200: + #fixme - need to grab from config + stats[url] = response[ringfile] + if response[ringfile] != ring_sum: + ringsmatch = False + print "!! %s (%s) doesn't match on disk md5sum" % \ + (url, response[ringfile]) + else: + matches = matches + 1 + if VERBOSE: + print "-> %s matches." % url + else: + errors = errors + 1 + print "%s/%s hosts matched, %s error[s] while checking hosts." % \ + (matches, len(hosts), errors) + print "=" * 79 + + +def async_check(): + ASYNC_COUNTER = 0 + stats = {} + hosts = getdevices() + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Checking async pendings on %s hosts..." % (now, len(hosts)) + for url, response, status in pool.imap(scout_async, hosts): + if status == 200: + stats[url] = response['async_pending'] + if len(stats) > 0: + low = min(stats.values()) + high = max(stats.values()) + total = sum(stats.values()) + average = total / len(stats) + print "Async stats: low: %d, high: %d, avg: %d, total: %d" % (low, + high, average, total) + else: + print "Error: No hosts where available or returned valid information." + print "=" * 79 + + +def umount_check(): + ASYNC_COUNTER = 0 + stats = {} + hosts = getdevices() + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Getting unmounted drives from %s hosts..." % (now, len(hosts)) + for url, response, status in pool.imap(scout_umount, hosts): + if status == 200: + for i in response: + stats[url] = i['device'] + for host in stats: + print "Not mounted: %s on %s" % (stats[host], host) + print "=" * 79 + + +def replication_check(): + stats = {} + hosts = getdevices() + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Checking replication times on %s hosts..." % (now, len(hosts)) + for url, response, status in pool.imap(scout_replication, hosts): + if status == 200: + stats[url] = response['object_replication_time'] + if len(stats) > 0: + low = min(stats.values()) + high = max(stats.values()) + total = sum(stats.values()) + average = total / len(stats) + print "[Replication Times] shortest: %s, longest: %s, avg: %s" % \ + (low, high, average) + else: + print "Error: No hosts where available or returned valid information." + print "=" * 79 + + +def load_check(): + load1 = {} + load5 = {} + load15 = {} + hosts = getdevices() + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Checking load avg's on %s hosts..." % (now, len(hosts)) + for url, response, status in pool.imap(scout_load, hosts): + if status == 200: + load1[url] = response['1m'] + load5[url] = response['5m'] + load15[url] = response['15m'] + stats = {"1m": load1, "5m": load5, "15m": load15} + for item in stats: + if len(stats[item]) > 0: + low = min(stats[item].values()) + high = max(stats[item].values()) + total = sum(stats[item].values()) + average = total / len(stats[item]) + print "[%s load average] lowest: %s, highest: %s, avg: %s" % \ + (item, low, high, average) + else: + print "Error: Hosts unavailable or returned valid information." + print "=" * 79 + + +def disk_usage(): + hosts = getdevices() + stats = {} + highs = [] + lows = [] + averages = [] + percents = {} + pool = eventlet.GreenPool(20) + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print "[%s] Checking disk usage on %s hosts..." % (now, len(hosts)) + for url, response, status in pool.imap(scout_du, hosts): + if status == 200: + hostusage = [] + for entry in response: + if entry['mounted']: + used = float(entry['used']) / float(entry['size']) * 100.0 + hostusage.append(round(used, 2)) + stats[url] = hostusage + + for url in stats: + if len(stats[url]) > 0: + #get per host hi/los for another day + low = min(stats[url]) + high = max(stats[url]) + total = sum(stats[url]) + average = total / len(stats[url]) + highs.append(high) + lows.append(low) + averages.append(average) + for percent in stats[url]: + percents[percent] = percents.get(percent, 0) + 1 + else: + print "-> %s: Error. No drive info available." % url + + if len(lows) > 0: + low = min(lows) + high = max(highs) + average = sum(averages) / len(averages) + #distrib graph shamelessly stolen from https://github.com/gholt/tcod + print "Distribution Graph:" + mul = 69.0 / max(percents.values()) + for percent in sorted(percents): + print '% 3d%% % 4d %s' % (percent, percents[percent], \ + '*' * int(percents[percent] * mul)) + + print "Disk usage: lowest: %s%%, highest: %s%%, avg: %s%%" % \ + (low, high, average) + else: + print "Error: No hosts where available or returned valid information." + print "=" * 79 + + +def main(): + global VERBOSE, SUPPRESS_ERRORS, swift_dir, pool + print "=" * 79 + usage = ''' + usage: %prog [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [-c] [--objmd5] + ''' + args = optparse.OptionParser(usage) + args.add_option('--verbose', '-v', action="store_true", + help="Print verbose info") + args.add_option('--suppress', action="store_true", + help="Suppress most connection related errors") + args.add_option('--async', '-a', action="store_true", + help="Get async stats") + args.add_option('--replication', '-r', action="store_true", + help="Get replication stats") + args.add_option('--unmounted', '-u', action="store_true", + help="Check cluster for unmounted devices") + args.add_option('--diskusage', '-d', action="store_true", + help="Get disk usage stats") + args.add_option('--loadstats', '-l', action="store_true", + help="Get cluster load average stats") + args.add_option('--connstats', '-c', action="store_true", + help="Get connection stats") + args.add_option('--objmd5', action="store_true", + help="Get md5sums of object.ring.gz and compare to local copy") + args.add_option('--swiftdir', default="/etc/swift", + help="Default = /etc/swift") + options, arguments = args.parse_args() + + swift_dir = options.swiftdir + + VERBOSE = options.verbose + SUPPRESS_ERRORS = options.suppress + + if options.async: + async_check() + if options.unmounted: + umount_check() + if options.replication: + replication_check() + if options.loadstats: + load_check() + if options.diskusage: + disk_usage() + if options.objmd5: + get_ringmd5(os.path.join(swift_dir, 'object.ring.gz')) + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print '\n' diff --git a/bin/swift-recon-cron b/bin/swift-recon-cron new file mode 100755 index 0000000000..a8d1c05353 --- /dev/null +++ b/bin/swift-recon-cron @@ -0,0 +1,46 @@ +#!/bin/bash + +#ghetto temporary cronjob to pull some of the stats for swift-recon +#usage: swift-recon-cron /var/log/swift/storage.log +# run it as frequently as you like, will skip runs during periods +# of high async pendings when the find takes a while. +#todo: everything. + +SYSLOG_FACILITY="local0.error" +ASYNC_PATH="/srv/node/sd[a-z]/async_pending/" +RECON_CACHE_PATH="/var/cache/swift/object.recon" + +LOCKFILE="/var/lock/swift-recon-object.lock" +if [ -e $LOCKFILE ]; then + echo "NOTICE - $0 lock present - cron jobs overlapping ?" + echo "$0 lock file present" | /usr/bin/logger -p $SYSLOG_FACILITY + exit 1 +else + touch $LOCKFILE +fi + +if [ -z "$1" ]; then + LOGFILE="/var/log/swift/storage.log" +else + LOGFILE=$1 +fi + +if [ ! -r "$LOGFILE" ]; then + echo "$0: error $LOGFILE not readable" | /usr/bin/logger -p $SYSLOG_FACILITY + rm $LOCKFILE + exit 1 +fi + +TMPF=`/bin/mktemp` + +asyncs=$(find $ASYNC_PATH -type f 2> /dev/null| wc -l) +#asyncs=$(find /srv/[1-4]/node/sd[a-z]1/async_pending/ -type f 2> /dev/null| wc -l) #saio +objrep=$(grep "Object replication complete." $LOGFILE | tail -n 1 | awk '{print $9}' | sed -e 's/(//g') +objincoming=$(netstat -aln | egrep "tcp.*:6000.*:.*ESTABLISHED" -c) +#objtw=$(netstat -aln | egrep "tcp.*:6000.*:.*TIME_WAIT" -c) + +echo "{\"async_pending\":$asyncs, \"object_replication_time\":$objrep, \"object_established_conns\":$objincoming}" > $TMPF + +mv $TMPF $RECON_CACHE_PATH +rm -f $TMPF $LOCKFILE +exit 0 diff --git a/swift/common/middleware/recon.py b/swift/common/middleware/recon.py new file mode 100644 index 0000000000..5dacc779fc --- /dev/null +++ b/swift/common/middleware/recon.py @@ -0,0 +1,214 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from webob import Request, Response +from swift.common.utils import split_path, cache_from_env, get_logger +from swift.common.constraints import check_mount +from hashlib import md5 +import simplejson as json +import os + + +class ReconMiddleware(object): + """ + Recon middleware used for monitoring. + + /recon/load|mem|async... will return various system metrics. + """ + + def __init__(self, app, conf, *args, **kwargs): + self.app = app + self.devices = conf.get('devices', '/srv/node/') + swift_dir = conf.get('swift_dir', '/etc/swift') + self.logger = get_logger(conf, log_route='recon') + self.recon_cache_path = conf.get('recon_cache_path', \ + '/var/cache/swift') + self.object_recon_cache = "%s/object.recon" % self.recon_cache_path + self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz') + self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz') + self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz') + self.rings = [self.account_ring_path, self.container_ring_path, \ + self.object_ring_path] + self.mount_check = conf.get('mount_check', 'true').lower() in \ + ('true', 't', '1', 'on', 'yes', 'y') + + def get_mounted(self): + """get ALL mounted fs from /proc/mounts""" + mounts = [] + with open('/proc/mounts', 'r') as procmounts: + for line in procmounts: + mount = {} + mount['device'], mount['path'], opt1, opt2, opt3, \ + opt4 = line.rstrip().split() + mounts.append(mount) + return mounts + + def get_load(self): + """get info from /proc/loadavg""" + loadavg = {} + onemin, fivemin, ftmin, tasks, procs \ + = open('/proc/loadavg', 'r').readline().rstrip().split() + loadavg['1m'] = float(onemin) + loadavg['5m'] = float(fivemin) + loadavg['15m'] = float(ftmin) + loadavg['tasks'] = tasks + loadavg['processes'] = int(procs) + return loadavg + + def get_mem(self): + """get info from /proc/meminfo""" + meminfo = {} + with open('/proc/meminfo', 'r') as memlines: + for i in memlines: + entry = i.rstrip().split(":") + meminfo[entry[0]] = entry[1].strip() + return meminfo + + def get_async_info(self): + """get # of async pendings""" + asyncinfo = {} + with open(self.object_recon_cache, 'r') as f: + recondata = json.load(f) + if 'async_pending' in recondata: + asyncinfo['async_pending'] = recondata['async_pending'] + else: + self.logger.notice( \ + _('NOTICE: Async pendings not in recon data.')) + asyncinfo['async_pending'] = -1 + return asyncinfo + + def get_replication_info(self): + """grab last object replication time""" + repinfo = {} + with open(self.object_recon_cache, 'r') as f: + recondata = json.load(f) + if 'object_replication_time' in recondata: + repinfo['object_replication_time'] = \ + recondata['object_replication_time'] + else: + self.logger.notice( \ + _('NOTICE: obj replication time not in recon data')) + repinfo['object_replication_time'] = -1 + return repinfo + + def get_device_info(self): + """place holder, grab dev info""" + return self.devices + + def get_unmounted(self): + """list unmounted (failed?) devices""" + mountlist = [] + for entry in os.listdir(self.devices): + mpoint = {'device': entry, \ + "mounted": check_mount(self.devices, entry)} + if not mpoint['mounted']: + mountlist.append(mpoint) + return mountlist + + def get_diskusage(self): + """get disk utilization statistics""" + devices = [] + for entry in os.listdir(self.devices): + if check_mount(self.devices, entry): + path = "%s/%s" % (self.devices, entry) + disk = os.statvfs(path) + capacity = disk.f_bsize * disk.f_blocks + available = disk.f_bsize * disk.f_bavail + used = disk.f_bsize * (disk.f_blocks - disk.f_bavail) + devices.append({'device': entry, 'mounted': True, \ + 'size': capacity, 'used': used, 'avail': available}) + else: + devices.append({'device': entry, 'mounted': False, \ + 'size': '', 'used': '', 'avail': ''}) + return devices + + def get_ring_md5(self): + """get all ring md5sum's""" + sums = {} + for ringfile in self.rings: + md5sum = md5() + with open(ringfile, 'rb') as f: + block = f.read(4096) + while block: + md5sum.update(block) + block = f.read(4096) + sums[ringfile] = md5sum.hexdigest() + return sums + + def GET(self, req): + error = False + root, type = split_path(req.path, 1, 2, False) + try: + if type == "mem": + content = json.dumps(self.get_mem()) + elif type == "load": + try: + content = json.dumps(self.get_load(), sort_keys=True) + except IOError as e: + error = True + content = "load - %s" % e + elif type == "async": + try: + content = json.dumps(self.get_async_info()) + except IOError as e: + error = True + content = "async - %s" % e + elif type == "replication": + try: + content = json.dumps(self.get_replication_info()) + except IOError as e: + error = True + content = "replication - %s" % e + elif type == "mounted": + content = json.dumps(self.get_mounted()) + elif type == "unmounted": + content = json.dumps(self.get_unmounted()) + elif type == "diskusage": + content = json.dumps(self.get_diskusage()) + elif type == "ringmd5": + content = json.dumps(self.get_ring_md5()) + else: + content = "Invalid path: %s" % req.path + return Response(request=req, status="400 Bad Request", \ + body=content, content_type="text/plain") + except ValueError as e: + error = True + content = "ValueError: %s" % e + + if not error: + return Response(request=req, body=content, \ + content_type="application/json") + else: + msg = 'CRITICAL recon - %s' % str(content) + self.logger.critical(msg) + body = "Internal server error." + return Response(request=req, status="500 Server Error", \ + body=body, content_type="text/plain") + + def __call__(self, env, start_response): + req = Request(env) + if req.path.startswith('/recon/'): + return self.GET(req)(env, start_response) + else: + return self.app(env, start_response) + + +def filter_factory(global_conf, **local_conf): + conf = global_conf.copy() + conf.update(local_conf) + + def recon_filter(app): + return ReconMiddleware(app, conf) + return recon_filter