obj replicator can now log replication stats for recon directly:
in object-server.conf: [object-replicator] vm_test_mode = yes recon_enable = yes recon_cache_path = /var/cache/swift Also replaced the swift-recon bash cronjob with a friendlier/cleaner python version, that now only obtains async stats. Basic usage: $ bin/swift-recon-cron Usage: swift-recon-cron CONF_FILE #CONF_FILE = path to your object-server.conf $ bin/swift-recon-cron /etc/swift/object-server.conf
This commit is contained in:
@@ -1,56 +1,64 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
swift-recon-cron.py
|
||||
"""
|
||||
|
||||
#ghetto temporary cronjob to pull some of the stats for swift-recon
|
||||
#usage: swift-recon-cron /var/log/swift/storage.log
|
||||
# run it as frequently as you like, will skip runs during periods
|
||||
# of high async pendings when the find takes a while.
|
||||
#todo: everything.
|
||||
|
||||
SYSLOG_FACILITY="local2"
|
||||
ASYNC_PATH="/srv/node/sd[a-z]/async_pending/"
|
||||
RECON_CACHE_PATH="/var/cache/swift"
|
||||
|
||||
LOCKFILE="/var/lock/swift-recon-object.lock"
|
||||
if [ -e $LOCKFILE ]; then
|
||||
echo "NOTICE - $0 lock present - cron jobs overlapping ?"
|
||||
echo "$0 lock file present" | /usr/bin/logger -p $SYSLOG_FACILITY.err
|
||||
exit 1
|
||||
else
|
||||
touch $LOCKFILE
|
||||
fi
|
||||
import os
|
||||
import sys
|
||||
import optparse
|
||||
from tempfile import NamedTemporaryFile
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
from ConfigParser import ConfigParser
|
||||
from swift.common.utils import get_logger, dump_recon_cache
|
||||
|
||||
|
||||
if [ -z "$1" ]; then
|
||||
LOGFILE="/var/log/swift/storage.log"
|
||||
else
|
||||
LOGFILE=$1
|
||||
fi
|
||||
def async_count(device_dir, logger):
|
||||
async_count = 0
|
||||
for i in os.listdir(device_dir):
|
||||
asyncdir = os.path.join(device_dir, i, "async_pending")
|
||||
if os.path.isdir(asyncdir):
|
||||
for entry in os.listdir(asyncdir):
|
||||
if os.path.isdir(os.path.join(asyncdir, entry)):
|
||||
async_hdir = os.path.join(asyncdir, entry)
|
||||
async_count += len(os.listdir(async_hdir))
|
||||
return async_count
|
||||
|
||||
if [ ! -r "$LOGFILE" ]; then
|
||||
echo "$0: error $LOGFILE not readable" | /usr/bin/logger -p $SYSLOG_FACILITY.err
|
||||
rm $LOCKFILE
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ ! -d "$RECON_CACHE_PATH" ]; then
|
||||
mkdir $RECON_CACHE_PATH
|
||||
fi
|
||||
def main():
|
||||
c = ConfigParser()
|
||||
try:
|
||||
conf_path = sys.argv[1]
|
||||
except Exception:
|
||||
print "Usage: %s CONF_FILE" % sys.argv[0].split('/')[-1]
|
||||
print "ex: swift-recon-cron /etc/swift/object-server.conf"
|
||||
sys.exit(1)
|
||||
if not c.read(conf_path):
|
||||
print "Unable to read config file %s" % conf_path
|
||||
sys.exit(1)
|
||||
conf = dict(c.items('filter:recon'))
|
||||
device_dir = conf.get('devices', '/srv/node')
|
||||
recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift')
|
||||
cache_file = os.path.join(recon_cache_path, "object.recon")
|
||||
conf['log_name'] = conf.get('log_name', 'recon-cron')
|
||||
logger = get_logger(conf, log_route='recon-cron')
|
||||
try:
|
||||
os.mkdir("/var/lock/swift-recon-object-cron")
|
||||
except OSError as e:
|
||||
logger.critical(_(str(e)))
|
||||
print str(e)
|
||||
sys.exit(1)
|
||||
asyncs = async_count(device_dir, logger)
|
||||
try:
|
||||
dump_recon_cache('async_pending', asyncs, cache_file)
|
||||
except Exception:
|
||||
logger.exception(_('Exception dumping recon cache'))
|
||||
try:
|
||||
os.rmdir("/var/lock/swift-recon-object-cron")
|
||||
except Exception:
|
||||
logger.exception(_('Exception remove cronjob lock'))
|
||||
|
||||
TMPF=`/bin/mktemp`
|
||||
|
||||
asyncs=$(find $ASYNC_PATH -type f 2> /dev/null| wc -l)
|
||||
#asyncs=$(find /srv/[1-4]/node/sd[a-z]1/async_pending/ -type f 2> /dev/null| wc -l) #saio
|
||||
objrep=$(grep "Object replication complete." $LOGFILE | tail -n 1 | awk '{print $9}' | sed -e 's/(//g')
|
||||
objincoming=$(netstat -aln | egrep "tcp.*:6000.*:.*ESTABLISHED" -c)
|
||||
#objtw=$(netstat -aln | egrep "tcp.*:6000.*:.*TIME_WAIT" -c)
|
||||
|
||||
echo "{\"async_pending\":$asyncs, \"object_replication_time\":$objrep, \"object_established_conns\":$objincoming}" > $TMPF
|
||||
|
||||
mv $TMPF $RECON_CACHE_PATH/object.recon
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "$0: $TMPF rename failed" | /usr/bin/logger -p $SYSLOG_FACILITY.err
|
||||
rm -f $TMPF $LOCKFILE
|
||||
exit 1
|
||||
fi
|
||||
rm -f $TMPF $LOCKFILE
|
||||
exit 0
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -17,7 +17,10 @@ from webob import Request, Response
|
||||
from swift.common.utils import split_path, cache_from_env, get_logger
|
||||
from swift.common.constraints import check_mount
|
||||
from hashlib import md5
|
||||
import simplejson as json
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
import os
|
||||
|
||||
|
||||
|
||||
@@ -33,7 +33,11 @@ import struct
|
||||
from ConfigParser import ConfigParser, NoSectionError, NoOptionError, \
|
||||
RawConfigParser
|
||||
from optparse import OptionParser
|
||||
from tempfile import mkstemp
|
||||
from tempfile import mkstemp, NamedTemporaryFile
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
import cPickle as pickle
|
||||
import glob
|
||||
from urlparse import urlparse as stdlib_urlparse, ParseResult
|
||||
@@ -634,6 +638,46 @@ def lock_path(directory, timeout=10):
|
||||
os.close(fd)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def lock_file(filename, timeout=10, append=False, unlink=True):
|
||||
"""
|
||||
Context manager that acquires a lock on a file. This will block until
|
||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||
first).
|
||||
|
||||
:param filename: file to be locked
|
||||
:param timeout: timeout (in seconds)
|
||||
:param append: True if file should be opened in append mode
|
||||
:param unlink: True if the file should be unlinked at the end
|
||||
"""
|
||||
flags = os.O_CREAT | os.O_RDWR
|
||||
if append:
|
||||
flags |= os.O_APPEND
|
||||
fd = os.open(filename, flags)
|
||||
try:
|
||||
with LockTimeout(timeout, filename):
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
break
|
||||
except IOError, err:
|
||||
if err.errno != errno.EAGAIN:
|
||||
raise
|
||||
sleep(0.01)
|
||||
mode = 'r+'
|
||||
if append:
|
||||
mode = 'a+'
|
||||
file_obj = os.fdopen(fd, mode)
|
||||
yield file_obj
|
||||
finally:
|
||||
try:
|
||||
file_obj.close()
|
||||
except UnboundLocalError:
|
||||
pass # may have not actually opened the file
|
||||
if unlink:
|
||||
os.unlink(filename)
|
||||
|
||||
|
||||
def lock_parent_directory(filename, timeout=10):
|
||||
"""
|
||||
Context manager that acquires a lock on the parent directory of the given
|
||||
@@ -1030,3 +1074,33 @@ def human_readable(value):
|
||||
if index == -1:
|
||||
return '%d' % value
|
||||
return '%d%si' % (round(value), suffixes[index])
|
||||
|
||||
|
||||
def dump_recon_cache(cache_key, cache_value, cache_file, lock_timeout=2):
|
||||
"""Update recon cache values
|
||||
|
||||
:param cache_key: key to update
|
||||
:param cache_value: value you want to set key too
|
||||
:param cache_file: cache file to update
|
||||
:param lock_timeout: timeout (in seconds)
|
||||
"""
|
||||
with lock_file(cache_file, lock_timeout, unlink=False) as cf:
|
||||
cache_entry = {}
|
||||
try:
|
||||
existing_entry = cf.readline()
|
||||
if existing_entry:
|
||||
cache_entry = json.loads(existing_entry)
|
||||
except ValueError:
|
||||
#file doesn't have a valid entry, we'll recreate it
|
||||
pass
|
||||
cache_entry[cache_key] = cache_value
|
||||
try:
|
||||
with NamedTemporaryFile(delete=False) as tf:
|
||||
tf.write(json.dumps(cache_entry) + '\n')
|
||||
os.rename(tf.name, cache_file)
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tf.name)
|
||||
except OSError, err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
@@ -32,7 +32,8 @@ from eventlet.support.greenlets import GreenletExit
|
||||
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
|
||||
compute_eta, get_logger, write_pickle, renamer
|
||||
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
|
||||
TRUE_VALUES
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
@@ -243,6 +244,11 @@ class ObjectReplicator(Daemon):
|
||||
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
|
||||
self.http_timeout = int(conf.get('http_timeout', 60))
|
||||
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
|
||||
self.recon_enable = conf.get(
|
||||
'recon_enable', 'no').lower() in TRUE_VALUES
|
||||
self.recon_cache_path = conf.get(
|
||||
'recon_cache_path', '/var/cache/swift')
|
||||
self.recon_object = os.path.join(self.recon_cache_path, "object.recon")
|
||||
|
||||
def _rsync(self, args):
|
||||
"""
|
||||
@@ -578,6 +584,12 @@ class ObjectReplicator(Daemon):
|
||||
total = (time.time() - start) / 60
|
||||
self.logger.info(
|
||||
_("Object replication complete. (%.02f minutes)"), total)
|
||||
if self.recon_enable:
|
||||
try:
|
||||
dump_recon_cache('object_replication_time', total, \
|
||||
self.recon_object)
|
||||
except Exception:
|
||||
self.logger.exception(_('Exception dumping recon cache'))
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
self.logger.info(_("Starting object replicator in daemon mode."))
|
||||
@@ -590,6 +602,12 @@ class ObjectReplicator(Daemon):
|
||||
total = (time.time() - start) / 60
|
||||
self.logger.info(
|
||||
_("Object replication complete. (%.02f minutes)"), total)
|
||||
if self.recon_enable:
|
||||
try:
|
||||
dump_recon_cache('object_replication_time', total, \
|
||||
self.recon_object)
|
||||
except Exception:
|
||||
self.logger.exception(_('Exception dumping recon cache'))
|
||||
self.logger.debug(_('Replication sleeping for %s seconds.'),
|
||||
self.run_pause)
|
||||
sleep(self.run_pause)
|
||||
|
||||
Reference in New Issue
Block a user