From 2579cf54c20fc3096d334b9e83f31b0cb8c4e42a Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Tue, 12 Oct 2010 10:47:56 -0500 Subject: [PATCH] updated container auditor to only do local work and updated auditor configs --- etc/account-server.conf-sample | 4 - etc/container-server.conf-sample | 4 - etc/object-server.conf-sample | 4 - swift/container/auditor.py | 274 ++++++++----------------------- 4 files changed, 64 insertions(+), 222 deletions(-) diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index d7cbf2d656..ec477091e1 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -46,10 +46,6 @@ use = egg:swift#account # log_name = account-auditor # Will audit, at most, 1 account per device per interval # interval = 1800 -# Maximum containers randomly picked for a given account audit -# max_container_count = 100 -# node_timeout = 10 -# conn_timeout = 0.5 # log_facility = LOG_LOCAL0 # log_level = INFO diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index e17ccd30be..65beb18d6b 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -42,7 +42,3 @@ use = egg:swift#container # log_name = container-auditor # Will audit, at most, 1 container per device per interval # interval = 1800 -# Maximum objects randomly picked for a given container audit -# max_object_count = 100 -# node_timeout = 10 -# conn_timeout = 0.5 diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index a72ef879d7..e2b7744557 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -45,7 +45,3 @@ use = egg:swift#object [object-auditor] # log_name = object-auditor -# Will audit, at most, 1 object per device per interval -# interval = 1800 -# node_timeout = 10 -# conn_timeout = 0.5 diff --git a/swift/container/auditor.py b/swift/container/auditor.py index 592f4b7cd6..ed59708605 100644 --- a/swift/container/auditor.py +++ b/swift/container/auditor.py @@ -16,24 +16,17 @@ import os import socket import time -from random import choice, random +from random import random from urllib import quote from eventlet import Timeout from swift.container import server as container_server from swift.common.db import ContainerBroker -from swift.common.bufferedhttp import http_connect -from swift.common.exceptions import ConnectionTimeout -from swift.common.ring import Ring from swift.common.utils import get_logger from swift.common.daemon import Daemon -class AuditException(Exception): - pass - - class ContainerAuditor(Daemon): """Audit containers.""" @@ -45,227 +38,88 @@ class ContainerAuditor(Daemon): ('true', 't', '1', 'on', 'yes', 'y') self.interval = int(conf.get('interval', 1800)) swift_dir = conf.get('swift_dir', '/etc/swift') - self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz') - self.account_ring = None - self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz') - self.object_ring = None - self.node_timeout = int(conf.get('node_timeout', 10)) - self.conn_timeout = float(conf.get('conn_timeout', 0.5)) - self.max_object_count = int(conf.get('max_object_count', 100)) - self.account_passes = 0 - self.account_failures = 0 - self.account_errors = 0 - self.object_passes = 0 - self.object_failures = 0 - self.object_errors = 0 + self.container_passes = 0 + self.container_failures = 0 - def get_account_ring(self): - """ - Get the account ring. Loads the ring if neccesary. - - :returns: account ring - """ - if not self.account_ring: - self.logger.debug( - 'Loading account ring from %s' % self.account_ring_path) - self.account_ring = Ring(self.account_ring_path) - return self.account_ring - - def get_object_ring(self): - """ - Get the object ring. Loads the ring if neccesary. - - :returns: object ring - """ - if not self.object_ring: - self.logger.debug( - 'Loading object ring from %s' % self.object_ring_path) - self.object_ring = Ring(self.object_ring_path) - return self.object_ring + def broker_generator(self): + for device in os.listdir(self.devices): + if self.mount_check and not\ + os.path.ismount(os.path.join(self.devices, device)): + self.logger.debug( + 'Skipping %s as it is not mounted' % device) + continue + datadir = os.path.join(self.devices, device, + container_server.DATADIR) + if not os.path.exists(datadir): + continue + partitions = os.listdir(datadir) + for partition in partitions: + part_path = os.path.join(datadir, partition) + if not os.path.isdir(part_path): + continue + suffixes = os.listdir(part_path) + for suffix in suffixes: + suff_path = os.path.join(part_path, suffix) + if not os.path.isdir(suff_path): + continue + hashes = os.listdir(suff_path) + for hsh in hashes: + hash_path = os.path.join(suff_path, hsh) + if not os.path.isdir(hash_path): + continue + for fname in sorted(os.listdir(hash_path), + reverse=True): + if fname.endswith('.db'): + broker = ContainerBroker(os.path.join(fpath, + fname)) + if not broker.is_deleted(): + yield broker def run_forever(self): # pragma: no cover """Run the container audit until stopped.""" reported = time.time() time.sleep(random() * self.interval) + all_brokers = self.broker_generator() while True: begin = time.time() - for device in os.listdir(self.devices): - if self.mount_check and not\ - os.path.ismount(os.path.join(self.devices, device)): - self.logger.debug( - 'Skipping %s as it is not mounted' % device) - continue - self.container_audit(device) - if time.time() - reported >= 3600: # once an hour - self.logger.info( - 'Since %s: Remote audits with accounts: %s passed audit, ' - '%s failed audit, %s errors Remote audits with objects: ' - '%s passed audit, %s failed audit, %s errors' % - (time.ctime(reported), self.account_passes, - self.account_failures, self.account_errors, - self.object_passes, self.object_failures, - self.object_errors)) - reported = time.time() - self.account_passes = 0 - self.account_failures = 0 - self.account_errors = 0 - self.object_passes = 0 - self.object_failures = 0 - self.object_errors = 0 - elapsed = time.time() - begin - if elapsed < self.interval: - time.sleep(self.interval - elapsed) + for broker in all_brokers: + self.container_audit(broker) + if time.time() - reported >= 3600: # once an hour + self.logger.info( + 'Since %s: Container audits: %s passed audit, ' + '%s failed audit' % (time.ctime(reported), + self.container_passes, + self.container_failures)) + reported = time.time() + self.container_passes = 0 + self.container_failures = 0 + elapsed = time.time() - begin + if elapsed < self.interval: + time.sleep(self.interval - elapsed) + # reset all_brokers so we loop forever + all_brokers = self.broker_generator() def run_once(self): """Run the container audit once.""" self.logger.info('Begin container audit "once" mode') begin = time.time() - for device in os.listdir(self.devices): - if self.mount_check and \ - not os.path.ismount(os.path.join(self.devices, device)): - self.logger.debug( - 'Skipping %s as it is not mounted' % device) - continue - self.container_audit(device) + self.container_audit(self.broker_generator().next()) elapsed = time.time() - begin self.logger.info( 'Container audit "once" mode completed: %.02fs' % elapsed) - def container_audit(self, device): + def container_audit(self, broker): """ Audit any containers found on the device - :param device: device to audit + :param broker: a container broker """ - datadir = os.path.join(self.devices, device, container_server.DATADIR) - if not os.path.exists(datadir): - return - broker = None - partition = None - attempts = 100 - while not broker and attempts: - attempts -= 1 - try: - partition = choice(os.listdir(datadir)) - fpath = os.path.join(datadir, partition) - if not os.path.isdir(fpath): - continue - suffix = choice(os.listdir(fpath)) - fpath = os.path.join(fpath, suffix) - if not os.path.isdir(fpath): - continue - hsh = choice(os.listdir(fpath)) - fpath = os.path.join(fpath, hsh) - if not os.path.isdir(fpath): - continue - except IndexError: - continue - for fname in sorted(os.listdir(fpath), reverse=True): - if fname.endswith('.db'): - broker = ContainerBroker(os.path.join(fpath, fname)) - if broker.is_deleted(): - broker = None - break - if not broker: - return - info = broker.get_info() - found = False - good_response = False - results = [] - part, nodes = self.get_account_ring().get_nodes(info['account']) - for node in nodes: - try: - with ConnectionTimeout(self.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, 'GET', - '/%s' % info['account'], - query_string='prefix=%s' % - quote(info['container'])) - with Timeout(self.node_timeout): - resp = conn.getresponse() - body = resp.read() - if 200 <= resp.status <= 299: - for cname in body.split('\n'): - if cname == info['container']: - found = True - break - if found: - break - else: - results.append('%s:%s/%s %s %s = %s' % (node['ip'], - node['port'], node['device'], resp.status, - resp.reason, repr(body))) - else: - results.append('%s:%s/%s %s %s' % - (node['ip'], node['port'], node['device'], - resp.status, resp.reason)) - except socket.error, err: - results.append('%s:%s/%s Socket Error: %s' % (node['ip'], - node['port'], node['device'], err)) - except ConnectionTimeout: - results.append('%(ip)s:%(port)s/%(device)s ConnectionTimeout' % - node) - except Timeout: - results.append('%(ip)s:%(port)s/%(device)s Timeout' % node) - except Exception, err: - self.logger.exception('ERROR With remote server ' - '%(ip)s:%(port)s/%(device)s' % node) - results.append('%s:%s/%s Exception: %s' % (node['ip'], - node['port'], node['device'], err)) - if found: - self.account_passes += 1 - self.logger.debug('Audit passed for /%s/%s %s' % (info['account'], - info['container'], broker.db_file)) + try: + info = broker.get_info() + except: + self.container_failures += 1 + self.logger.error('ERROR Could not get container info %s' % + (broker.db_file)) else: - if good_response: - self.account_failures += 1 - else: - self.account_errors += 1 - self.logger.error('ERROR Could not find container /%s/%s %s on ' - 'any of the primary account servers it should be on: %s' % - (info['account'], info['container'], broker.db_file, results)) - for obj in broker.get_random_objects(max_count=self.max_object_count): - found = False - results = [] - part, nodes = self.get_object_ring().get_nodes(info['account'], - info['container'], obj) - for node in nodes: - try: - with ConnectionTimeout(self.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, 'HEAD', - '/%s/%s/%s' % - (info['account'], info['container'], obj)) - with Timeout(self.node_timeout): - resp = conn.getresponse() - body = resp.read() - if 200 <= resp.status <= 299: - found = True - break - else: - results.append('%s:%s/%s %s %s' % (node['ip'], - node['port'], node['device'], resp.status, - resp.reason)) - except socket.error, err: - results.append('%s:%s/%s Socket Error: %s' % (node['ip'], - node['port'], node['device'], err)) - except ConnectionTimeout: - results.append( - '%(ip)s:%(port)s/%(device)s ConnectionTimeout' % node) - except Timeout: - results.append('%(ip)s:%(port)s/%(device)s Timeout' % node) - except Exception, err: - self.logger.exception('ERROR With remote server ' - '%(ip)s:%(port)s/%(device)s' % node) - results.append('%s:%s/%s Exception: %s' % (node['ip'], - node['port'], node['device'], err)) - if found: - self.object_passes += 1 - self.logger.debug('Audit passed for /%s/%s %s object %s' % - (info['account'], info['container'], broker.db_file, obj)) - else: - self.object_errors += 1 - self.logger.error('ERROR Could not find object /%s/%s/%s ' - 'referenced by %s on any of the primary object ' - 'servers it should be on: %s' % (info['account'], - info['container'], obj, broker.db_file, results)) + self.container_passes += 1 + self.logger.debug('Audit passed for %s' % broker.db_file)