diff --git a/etc/log-processor.conf-sample b/etc/log-processor.conf-sample index 7619d0599a..4e33e70c48 100644 --- a/etc/log-processor.conf-sample +++ b/etc/log-processor.conf-sample @@ -14,7 +14,7 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 # log_dir = /var/log/swift/ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 container_name = log_data -source_filename_format = access-%Y%m%d%H +source_filename_pattern = access-%Y%m%d%H # new_log_cutoff = 7200 # unlink_log = True class_path = swift.stats.access_processor.AccessLogProcessor @@ -31,9 +31,9 @@ class_path = swift.stats.access_processor.AccessLogProcessor # log_dir = /var/log/swift/ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 container_name = account_stats -source_filename_format = stats-%Y%m%d%H_* +source_filename_pattern = stats-%Y%m%d%H_.* # new_log_cutoff = 7200 # unlink_log = True class_path = swift.stats.stats_processor.StatsLogProcessor # account_server_conf = /etc/swift/account-server.conf -# user = swift \ No newline at end of file +# user = swift diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py index 64a1801af2..cdbedf34e6 100644 --- a/swift/stats/log_uploader.py +++ b/swift/stats/log_uploader.py @@ -18,7 +18,7 @@ import os import hashlib import time import gzip -import glob +import re from paste.deploy import appconfig from swift.common.internal_proxy import InternalProxy @@ -44,29 +44,30 @@ class LogUploader(Daemon): def __init__(self, uploader_conf, plugin_name): super(LogUploader, self).__init__(uploader_conf) - log_dir = uploader_conf.get('log_dir', '/var/log/swift/') - swift_account = uploader_conf['swift_account'] - container_name = uploader_conf['container_name'] - source_filename_format = uploader_conf['source_filename_format'] + log_name = '%s-log-uploader' % plugin_name + self.logger = utils.get_logger(uploader_conf, log_name, + log_route=plugin_name) + self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/') + self.swift_account = uploader_conf['swift_account'] + self.container_name = uploader_conf['container_name'] proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', '/etc/swift/proxy-server.conf') proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, name='proxy-server') - new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200')) - unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \ - ('true', 'on', '1', 'yes') - self.unlink_log = unlink_log - self.new_log_cutoff = new_log_cutoff - if not log_dir.endswith('/'): - log_dir = log_dir + '/' - self.log_dir = log_dir - self.swift_account = swift_account - self.container_name = container_name - self.filename_format = source_filename_format self.internal_proxy = InternalProxy(proxy_server_conf) - log_name = '%s-log-uploader' % plugin_name - self.logger = utils.get_logger(uploader_conf, log_name, - log_route=plugin_name) + self.new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200')) + self.unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \ + utils.TRUE_VALUES + + # source_filename_format is deprecated + source_filename_format = uploader_conf.get('source_filename_format') + source_filename_pattern = uploader_conf.get('source_filename_pattern') + if source_filename_format and not source_filename_pattern: + self.logger.warning(_('source_filename_format is unreliable and ' + 'deprecated; use source_filename_pattern')) + self.pattern = self.convert_glob_to_regex(source_filename_format) + else: + self.pattern = source_filename_pattern or '%Y%m%d%H' def run_once(self, *args, **kwargs): self.logger.info(_("Uploading logs")) @@ -75,70 +76,114 @@ class LogUploader(Daemon): self.logger.info(_("Uploading logs complete (%0.2f minutes)") % ((time.time() - start) / 60)) + def convert_glob_to_regex(self, glob): + """ + Make a best effort to support old style config globs + + :param : old style config source_filename_format + + :returns : new style config source_filename_pattern + """ + pattern = glob + pattern = pattern.replace('.', r'\.') + pattern = pattern.replace('*', r'.*') + pattern = pattern.replace('?', r'.?') + return pattern + + def validate_filename_pattern(self): + """ + Validate source_filename_pattern + + :returns : valid regex pattern based on soruce_filename_pattern with + group matches substituded for date fmt markers + """ + pattern = self.pattern + markers = { + '%Y': ('year', '(?P[0-9]{4})'), + '%m': ('month', '(?P[0-1][0-9])'), + '%d': ('day', '(?P[0-3][0-9])'), + '%H': ('hour', '(?P[0-2][0-9])'), + } + for marker, (type, group) in markers.items(): + if marker not in self.pattern: + self.logger.error(_('source_filename_pattern much contain a ' + 'marker %(marker)s to match the ' + '%(type)s') % {'marker': marker, + 'type': type}) + return + pattern = pattern.replace(marker, group) + return pattern + + def get_relpath_to_files_under_log_dir(self): + """ + Look under log_dir recursively and return all filenames as relpaths + + :returns : list of strs, the relpath to all filenames under log_dir + """ + all_files = [] + for path, dirs, files in os.walk(self.log_dir): + all_files.extend(os.path.join(path, f) for f in files) + return [os.path.relpath(f, start=self.log_dir) for f in all_files] + + def filter_files(self, all_files, pattern): + """ + Filter files based on regex pattern + + :param all_files: list of strs, relpath of the filenames under log_dir + :param pattern: regex pattern to match against filenames + + :returns : dict mapping full path of file to match group dict + """ + filename2match = {} + found_match = False + for filename in all_files: + match = re.match(pattern, filename) + if match: + found_match = True + full_path = os.path.join(self.log_dir, filename) + filename2match[full_path] = match.groupdict() + else: + self.logger.debug(_('%(filename)s does not match ' + '%(pattern)s') % {'filename': filename, + 'pattern': pattern}) + return filename2match + def upload_all_logs(self): - i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] - i.sort() - year_offset = month_offset = day_offset = hour_offset = None - base_offset = len(self.log_dir.rstrip('/')) + 1 - for start, c in i: - offset = base_offset + start - if c == '%Y': - year_offset = offset, offset + 4 - # Add in the difference between len(%Y) and the expanded - # version of %Y (????). This makes sure the codes after this - # one will align properly in the final filename. - base_offset += 2 - elif c == '%m': - month_offset = offset, offset + 2 - elif c == '%d': - day_offset = offset, offset + 2 - elif c == '%H': - hour_offset = offset, offset + 2 - if not (year_offset and month_offset and day_offset and hour_offset): - # don't have all the parts, can't upload anything + """ + Match files under log_dir to source_filename_pattern and upload to swift + """ + pattern = self.validate_filename_pattern() + if not pattern: + self.logger.error(_('Invalid filename_format')) return - glob_pattern = self.filename_format - glob_pattern = glob_pattern.replace('%Y', '????', 1) - glob_pattern = glob_pattern.replace('%m', '??', 1) - glob_pattern = glob_pattern.replace('%d', '??', 1) - glob_pattern = glob_pattern.replace('%H', '??', 1) - filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) - current_hour = int(time.strftime('%H')) - today = int(time.strftime('%Y%m%d')) - self.internal_proxy.create_container(self.swift_account, - self.container_name) - for filename in filelist: - try: - # From the filename, we need to derive the year, month, day, - # and hour for the file. These values are used in the uploaded - # object's name, so they should be a reasonably accurate - # representation of the time for which the data in the file was - # collected. The file's last modified time is not a reliable - # representation of the data in the file. For example, an old - # log file (from hour A) may be uploaded or moved into the - # log_dir in hour Z. The file's modified time will be for hour - # Z, and therefore the object's name in the system will not - # represent the data in it. - # If the filename doesn't match the format, it shouldn't be - # uploaded. - year = filename[slice(*year_offset)] - month = filename[slice(*month_offset)] - day = filename[slice(*day_offset)] - hour = filename[slice(*hour_offset)] - except IndexError: - # unexpected filename format, move on - self.logger.error(_("Unexpected log: %s") % filename) + all_files = self.get_relpath_to_files_under_log_dir() + filename2match = self.filter_files(all_files, pattern) + if not filename2match: + self.logger.info(_('No files in %(log_dir)s match %(pattern)s') % + {'log_dir': self.log_dir, 'pattern': pattern}) + return + if not self.internal_proxy.create_container(self.swift_account, + self.container_name): + self.logger.error(_('Unable to create container for ' + '%(account)s/%(container)s') % { + 'account': self.swift_account, + 'container': self.container_name}) + return + for filename, match in filename2match.items(): + # don't process very new logs + seconds_since_mtime = time.time() - os.stat(filename).st_mtime + if seconds_since_mtime < self.new_log_cutoff: + self.logger.debug(_("Skipping log: %(file)s " + "(< %(cutoff)d seconds old)") % { + 'file': filename, + 'cutoff': self.new_log_cutoff}) continue - if ((time.time() - os.stat(filename).st_mtime) < - self.new_log_cutoff): - # don't process very new logs - self.logger.debug( - _("Skipping log: %(file)s (< %(cutoff)d seconds old)") % - {'file': filename, 'cutoff': self.new_log_cutoff}) - continue - self.upload_one_log(filename, year, month, day, hour) + self.upload_one_log(filename, **match) def upload_one_log(self, filename, year, month, day, hour): + """ + Upload one file to swift + """ if os.path.getsize(filename) == 0: self.logger.debug(_("Log %s is 0 length, skipping") % filename) return diff --git a/test/unit/stats/test_log_uploader.py b/test/unit/stats/test_log_uploader.py index b82e0ce02c..9b733624c6 100644 --- a/test/unit/stats/test_log_uploader.py +++ b/test/unit/stats/test_log_uploader.py @@ -13,14 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: More tests - import unittest import os from datetime import datetime from tempfile import mkdtemp from shutil import rmtree +from functools import partial +from collections import defaultdict +import random +import string +from test.unit import temptree from swift.stats import log_uploader import logging @@ -29,34 +32,62 @@ LOGGER = logging.getLogger() DEFAULT_GLOB = '%Y%m%d%H' +COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \ + '\xf3\xf3\xad\x04\x00\x00\x00' + + +def mock_appconfig(*args, **kwargs): + pass + + +class MockInternalProxy(): + + def __init__(self, *args, **kwargs): + pass + + def create_container(self, *args, **kwargs): + return True + + def upload_file(self, *args, **kwargs): + return True + + +_orig_LogUploader = log_uploader.LogUploader + + +class MockLogUploader(_orig_LogUploader): + + def __init__(self, conf, logger=LOGGER): + conf['swift_account'] = conf.get('swift_account', '') + conf['container_name'] = conf.get('container_name', '') + conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0') + conf['source_filename_format'] = conf.get( + 'source_filename_format', conf.get('filename_format')) + log_uploader.LogUploader.__init__(self, conf, 'plugin') + self.logger = logger + self.uploaded_files = [] + + def upload_one_log(self, filename, year, month, day, hour): + d = {'year': year, 'month': month, 'day': day, 'hour': hour} + self.uploaded_files.append((filename, d)) + _orig_LogUploader.upload_one_log(self, filename, year, month, + day, hour) + class TestLogUploader(unittest.TestCase): - def test_upload_all_logs(self): + def setUp(self): + # mock internal proxy + self._orig_InternalProxy = log_uploader.InternalProxy + self._orig_appconfig = log_uploader.appconfig + log_uploader.InternalProxy = MockInternalProxy + log_uploader.appconfig = mock_appconfig - class MockInternalProxy(): - - def create_container(self, *args, **kwargs): - pass - - class MonkeyLogUploader(log_uploader.LogUploader): - - def __init__(self, conf, logger=LOGGER): - self.log_dir = conf['log_dir'] - self.filename_format = conf.get('filename_format', - DEFAULT_GLOB) - self.new_log_cutoff = 0 - self.logger = logger - self.internal_proxy = MockInternalProxy() - self.swift_account = '' - self.container_name = '' - - self.uploaded_files = [] - - def upload_one_log(self, filename, year, month, day, hour): - d = {'year': year, 'month': month, 'day': day, 'hour': hour} - self.uploaded_files.append((filename, d)) + def tearDown(self): + log_uploader.appconfig = self._orig_appconfig + log_uploader.InternalProxy = self._orig_InternalProxy + def test_deprecated_glob_style_upload_all_logs(self): tmpdir = mkdtemp() try: today = datetime.now() @@ -72,7 +103,7 @@ class TestLogUploader(unittest.TestCase): open(os.path.join(tmpdir, ts), 'w').close() conf = {'log_dir': tmpdir} - uploader = MonkeyLogUploader(conf) + uploader = MockLogUploader(conf) uploader.upload_all_logs() self.assertEquals(len(uploader.uploaded_files), 24) for i, file_date in enumerate(sorted(uploader.uploaded_files)): @@ -112,7 +143,7 @@ class TestLogUploader(unittest.TestCase): 'log_dir': '%s/' % tmpdir, 'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz', } - uploader = MonkeyLogUploader(conf) + uploader = MockLogUploader(conf) uploader.upload_all_logs() self.assertEquals(len(uploader.uploaded_files), 24) for i, file_date in enumerate(sorted(uploader.uploaded_files)): @@ -146,22 +177,201 @@ class TestLogUploader(unittest.TestCase): 'log_dir': tmpdir, 'filename_format': '*.%Y%m%d%H.log', } - uploader = MonkeyLogUploader(conf) + uploader = MockLogUploader(conf) uploader.upload_all_logs() self.assertEquals(len(uploader.uploaded_files), 24) - for i, file_date in enumerate(sorted(uploader.uploaded_files)): + fname_to_int = lambda x: int(os.path.basename(x[0]).split('.')[0]) + numerically = lambda x, y: cmp(fname_to_int(x), + fname_to_int(y)) + for i, file_date in enumerate(sorted(uploader.uploaded_files, + cmp=numerically)): d = {'year': year, 'month': month, 'day': day, 'hour': i} for k, v in d.items(): d[k] = '%0.2d' % v expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' % (i, today_str, i)), d) - # TODO: support wildcards before the date pattern - # (i.e. relative offsets) - #print file_date - #self.assertEquals(file_date, expected) + self.assertEquals(file_date, expected) finally: rmtree(tmpdir) + def test_bad_pattern_in_config(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: + # invalid pattern + conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%h'} # should be %H + uploader = MockLogUploader(conf) + self.assertFalse(uploader.validate_filename_pattern()) + uploader.upload_all_logs() + self.assertEquals(uploader.uploaded_files, []) + + conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%H'} + uploader = MockLogUploader(conf) + self.assert_(uploader.validate_filename_pattern()) + uploader.upload_all_logs() + self.assertEquals(len(uploader.uploaded_files), 1) + + + # deprecated warning on source_filename_format + class MockLogger(): + + def __init__(self): + self.msgs = defaultdict(list) + + def log(self, level, msg): + self.msgs[level].append(msg) + + def __getattr__(self, attr): + return partial(self.log, attr) + + logger = MockLogger.logger = MockLogger() + + def mock_get_logger(*args, **kwargs): + return MockLogger.logger + + _orig_get_logger = log_uploader.utils.get_logger + try: + log_uploader.utils.get_logger = mock_get_logger + conf = {'source_filename_format': '%Y%m%d%H'} + uploader = MockLogUploader(conf, logger=logger) + self.assert_([m for m in logger.msgs['warning'] + if 'deprecated' in m]) + finally: + log_uploader.utils.get_logger = _orig_get_logger + + # convert source_filename_format to regex + conf = {'source_filename_format': 'pattern-*.%Y%m%d%H.*.gz'} + uploader = MockLogUploader(conf) + expected = r'pattern-.*\.%Y%m%d%H\..*\.gz' + self.assertEquals(uploader.pattern, expected) + + # use source_filename_pattern if we have the choice! + conf = { + 'source_filename_format': 'bad', + 'source_filename_pattern': 'good', + } + uploader = MockLogUploader(conf) + self.assertEquals(uploader.pattern, 'good') + + def test_pattern_upload_all_logs(self): + + # test empty dir + with temptree([]) as t: + conf = {'log_dir': t} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + + def get_random_length_str(max_len=10, chars=string.ascii_letters): + return ''.join(random.choice(chars) for x in + range(random.randint(1, max_len))) + + template = 'prefix_%(random)s_%(digits)s.blah.' \ + '%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz' + pattern = r'prefix_.*_[0-9]+\.blah\.%Y%m%d%H00-[0-9]{2}00' \ + '-[0-9]?[0-9]\.gz' + files_that_should_match = [] + # add some files that match + for i in range(24): + fname = template % { + 'random': get_random_length_str(), + 'digits': get_random_length_str(16, string.digits), + 'datestr': datetime.now().strftime('%Y%m%d'), + 'hour': i, + 'next_hour': i + 1, + 'number': random.randint(0, 20), + } + files_that_should_match.append(fname) + + # add some files that don't match + files = list(files_that_should_match) + for i in range(24): + fname = template % { + 'random': get_random_length_str(), + 'digits': get_random_length_str(16, string.digits), + 'datestr': datetime.now().strftime('%Y%m'), + 'hour': i, + 'next_hour': i + 1, + 'number': random.randint(0, 20), + } + files.append(fname) + + for fname in files: + print fname + + with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: + self.assertEquals(len(os.listdir(t)), 48) + conf = {'source_filename_pattern': pattern, 'log_dir': t} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(os.listdir(t)), 24) + self.assertEquals(len(uploader.uploaded_files), 24) + files_that_were_uploaded = set(x[0] for x in + uploader.uploaded_files) + for f in files_that_should_match: + self.assert_(os.path.join(t, f) in files_that_were_uploaded) + + def test_log_cutoff(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files) as t: + conf = {'log_dir': t, 'new_log_cutoff': '7200'} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + conf = {'log_dir': t, 'new_log_cutoff': '0'} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + + def test_create_container_fail(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files) as t: + conf = {'log_dir': t} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + + with temptree(files) as t: + conf = {'log_dir': t} + uploader = MockLogUploader(conf) + # mock create_container to fail + uploader.internal_proxy.create_container = lambda *args: False + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + + def test_unlink_log(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA]) as t: + conf = {'log_dir': t, 'unlink_log': 'false'} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + # file still there + self.assertEquals(len(os.listdir(t)), 1) + + conf = {'log_dir': t, 'unlink_log': 'true'} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + # file gone + self.assertEquals(len(os.listdir(t)), 0) + + def test_upload_file_failed(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA]) as t: + conf = {'log_dir': t, 'unlink_log': 'true'} + uploader = MockLogUploader(conf) + + # mock upload_file to fail, and clean up mock + def mock_upload_file(self, *args, **kwargs): + uploader.uploaded_files.pop() + return False + uploader.internal_proxy.upload_file = mock_upload_file + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + # file still there + self.assertEquals(len(os.listdir(t)), 1) + + if __name__ == '__main__': unittest.main()