From 4e9e2b65a9353e3a0a84d4e90d9813b9bf0c7c58 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Wed, 15 Jun 2011 15:19:36 -0500 Subject: [PATCH] initial code copy --- .unittests | 0 AUTHORS | 4 + CHANGELOG | 4 + LICENSE | 202 +++++ bin/swift-account-stats-logger | 27 + bin/swift-container-stats-logger | 27 + bin/swift-log-stats-collector | 35 + bin/swift-log-uploader | 49 + etc/log-processor.conf-sample | 57 ++ setup.cfg | 23 + setup.py | 65 ++ slogging/__init__.py | 0 slogging/access_processor.py | 250 ++++++ slogging/compressing_file_reader.py | 73 ++ slogging/db_stats_collector.py | 177 ++++ slogging/internal_proxy.py | 210 +++++ slogging/log_processor.py | 581 ++++++++++++ slogging/log_uploader.py | 188 ++++ slogging/stats_processor.py | 69 ++ test_slogging/__init__.py | 0 test_slogging/test_access_processor.py | 94 ++ test_slogging/test_compressing_file_reader.py | 34 + test_slogging/test_db_stats_collector.py | 238 +++++ test_slogging/test_internal_proxy.py | 192 ++++ test_slogging/test_log_processor.py | 834 ++++++++++++++++++ test_slogging/test_log_uploader.py | 240 +++++ test_slogging/test_stats_processor.py | 29 + 27 files changed, 3702 insertions(+) create mode 100755 .unittests create mode 100644 AUTHORS create mode 100644 CHANGELOG create mode 100644 LICENSE create mode 100755 bin/swift-account-stats-logger create mode 100755 bin/swift-container-stats-logger create mode 100755 bin/swift-log-stats-collector create mode 100755 bin/swift-log-uploader create mode 100644 etc/log-processor.conf-sample create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 slogging/__init__.py create mode 100644 slogging/access_processor.py create mode 100644 slogging/compressing_file_reader.py create mode 100644 slogging/db_stats_collector.py create mode 100644 slogging/internal_proxy.py create mode 100644 slogging/log_processor.py create mode 100644 slogging/log_uploader.py create mode 100644 slogging/stats_processor.py create mode 100644 test_slogging/__init__.py create mode 100644 test_slogging/test_access_processor.py create mode 100644 test_slogging/test_compressing_file_reader.py create mode 100644 test_slogging/test_db_stats_collector.py create mode 100644 test_slogging/test_internal_proxy.py create mode 100644 test_slogging/test_log_processor.py create mode 100644 test_slogging/test_log_uploader.py create mode 100644 test_slogging/test_stats_processor.py diff --git a/.unittests b/.unittests new file mode 100755 index 0000000..e69de29 diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..1006053 --- /dev/null +++ b/AUTHORS @@ -0,0 +1,4 @@ +John Dickinson +Clay Gerrard +David Goetz +Greg Lange \ No newline at end of file diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000..8657219 --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,4 @@ +slogging (1.0) + + - initial release since separation from swift project + (http://swift.openstack.org) \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..75b5248 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/bin/swift-account-stats-logger b/bin/swift-account-stats-logger new file mode 100755 index 0000000..b018ab5 --- /dev/null +++ b/bin/swift-account-stats-logger @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.stats.db_stats_collector import AccountStatsCollector +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options() + # currently AccountStatsCollector only supports run_once + options['once'] = True + run_daemon(AccountStatsCollector, conf_file, + section_name='log-processor-stats', + log_name="account-stats", **options) diff --git a/bin/swift-container-stats-logger b/bin/swift-container-stats-logger new file mode 100755 index 0000000..3b93c20 --- /dev/null +++ b/bin/swift-container-stats-logger @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.stats.db_stats_collector import ContainerStatsCollector +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options() + # currently ContainerStatsCollector only supports run_once + options['once'] = True + run_daemon(ContainerStatsCollector, conf_file, + section_name='log-processor-container-stats', + log_name="container-stats", **options) diff --git a/bin/swift-log-stats-collector b/bin/swift-log-stats-collector new file mode 100755 index 0000000..f2d5011 --- /dev/null +++ b/bin/swift-log-stats-collector @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from optparse import OptionParser + +from swift.stats.log_processor import LogProcessorDaemon +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + parser = OptionParser(usage='Usage: %prog [options] ') + + parser.add_option('--lookback_hours', type='int', dest='lookback_hours', + help='Hours in the past to start looking for log files') + parser.add_option('--lookback_window', type='int', dest='lookback_window', + help='Hours past lookback_hours to stop looking for log files') + + conf_file, options = parse_options(parser) + # currently the LogProcessorDaemon only supports run_once + options['once'] = True + run_daemon(LogProcessorDaemon, conf_file, section_name=None, + log_name='log-stats-collector', **options) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader new file mode 100755 index 0000000..3639dff --- /dev/null +++ b/bin/swift-log-uploader @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from optparse import OptionParser +from swift.stats.log_uploader import LogUploader +from swift.common.utils import parse_options +from swift.common import utils + +if __name__ == '__main__': + parser = OptionParser("Usage: %prog CONFIG_FILE PLUGIN") + parser.add_option('-c', '--log_cutoff', + help='Override new_log_cutoff.') + parser.add_option('-x', '--regex', + help='Override source_filename_pattern regex.') + conf_file, options = parse_options(parser=parser) + try: + plugin = options['extra_args'][0] + except (IndexError, KeyError): + print "Error: missing plugin name" + sys.exit(1) + + uploader_conf = utils.readconf(conf_file, 'log-processor') + section_name = 'log-processor-%s' % plugin + plugin_conf = utils.readconf(conf_file, section_name) + uploader_conf.update(plugin_conf) + + # pre-configure logger + logger = utils.get_logger(uploader_conf, log_route='log-uploader', + log_to_console=options.get('verbose', False)) + # currently LogUploader only supports run_once + options['once'] = True + regex = options.get('regex') + cutoff = options.get('log_cutoff') + uploader = LogUploader(uploader_conf, plugin, + regex=regex, cutoff=cutoff).run(**options) diff --git a/etc/log-processor.conf-sample b/etc/log-processor.conf-sample new file mode 100644 index 0000000..350ae73 --- /dev/null +++ b/etc/log-processor.conf-sample @@ -0,0 +1,57 @@ +# plugin section format is named "log-processor-" + +[log-processor] +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# container_name = log_processing_data +# proxy_server_conf = /etc/swift/proxy-server.conf +# log_facility = LOG_LOCAL0 +# log_level = INFO +# lookback_hours = 120 +# lookback_window = 120 +# user = swift + +[log-processor-access] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = log_data +source_filename_pattern = ^ + (?P[0-9]{4}) + (?P[0-1][0-9]) + (?P[0-3][0-9]) + (?P[0-2][0-9]) + .*$ +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.access_processor.AccessLogProcessor +# service ips is for client ip addresses that should be counted as servicenet +# service_ips = +# load balancer private ips is for load balancer ip addresses that should be +# counted as servicenet +# lb_private_ips = +# server_name = proxy-server +# user = swift +# warn_percent = 0.8 + +[log-processor-stats] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = account_stats +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.stats_processor.StatsLogProcessor +# devices = /srv/node +# mount_check = true +# user = swift + +[log-processor-container-stats] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = container_stats +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.stats_processor.StatsLogProcessor +processable = false +# devices = /srv/node +# mount_check = true +# user = swift +# metadata_keys = comma separated list of user metadata keys to be collected diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..86db59d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,23 @@ +[build_sphinx] +all_files = 1 +build-dir = doc/build +source-dir = doc/source + +[egg_info] +tag_build = +tag_date = 0 +tag_svn_revision = 0 + +[compile_catalog] +directory = locale +domain = swauth + +[update_catalog] +domain = slogging +output_dir = locale +input_file = locale/slogging.pot + +[extract_messages] +keywords = _ l_ lazy_gettext +mapping_file = babel.cfg +output_file = locale/slogging.pot diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..394af37 --- /dev/null +++ b/setup.py @@ -0,0 +1,65 @@ +#!/usr/bin/python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup, find_packages +from setuptools.command.sdist import sdist +import os +import subprocess +try: + from babel.messages import frontend +except ImportError: + frontend = None + +from slogging import __version__ as version + + +name = 'slogging' + + +cmdclass = {'sdist': local_sdist} + + +if frontend: + cmdclass.update({ + 'compile_catalog': frontend.compile_catalog, + 'extract_messages': frontend.extract_messages, + 'init_catalog': frontend.init_catalog, + 'update_catalog': frontend.update_catalog, + }) + + +setup( + name=name, + version=version, + description='Slogging', + license='Apache License (2.0)', + author='OpenStack, LLC.', + author_email='me@not.mn', + url='https://github.com/notmyname/slogging', + packages=find_packages(exclude=['test_slogging', 'bin']), + test_suite='nose.collector', + cmdclass=cmdclass, + classifiers=[ + 'Development Status :: 4 - Beta', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 2.6', + 'Environment :: No Input/Output (Daemon)', + ], + install_requires=[], # removed for better compat + scripts=[ + ], + ) diff --git a/slogging/__init__.py b/slogging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slogging/access_processor.py b/slogging/access_processor.py new file mode 100644 index 0000000..897c238 --- /dev/null +++ b/slogging/access_processor.py @@ -0,0 +1,250 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +from urllib import unquote +import copy + +from swift.common.utils import split_path, get_logger + +month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() +LISTING_PARAMS = set( + 'path limit format delimiter marker end_marker prefix'.split()) + + +class AccessLogProcessor(object): + """Transform proxy server access logs""" + + def __init__(self, conf): + self.server_name = conf.get('server_name', 'proxy-server') + self.lb_private_ips = [x.strip() for x in \ + conf.get('lb_private_ips', '').split(',')\ + if x.strip()] + self.service_ips = [x.strip() for x in \ + conf.get('service_ips', '').split(',')\ + if x.strip()] + self.warn_percent = float(conf.get('warn_percent', '0.8')) + self.logger = get_logger(conf, log_route='access-processor') + + def log_line_parser(self, raw_log): + '''given a raw access log line, return a dict of the good parts''' + d = {} + try: + (unused, + server, + client_ip, + lb_ip, + timestamp, + method, + request, + http_version, + code, + referrer, + user_agent, + auth_token, + bytes_in, + bytes_out, + etag, + trans_id, + headers, + processing_time) = (unquote(x) for x in + raw_log[16:].split(' ')[:18]) + except ValueError: + self.logger.debug(_('Bad line data: %s') % repr(raw_log)) + return {} + if server != self.server_name: + # incorrect server name in log line + self.logger.debug(_('Bad server name: found "%(found)s" ' \ + 'expected "%(expected)s"') % + {'found': server, 'expected': self.server_name}) + return {} + try: + (version, account, container_name, object_name) = \ + split_path(request, 2, 4, True) + except ValueError, e: + self.logger.debug(_('Invalid path: %(error)s from data: %(log)s') % + {'error': e, 'log': repr(raw_log)}) + return {} + if container_name is not None: + container_name = container_name.split('?', 1)[0] + if object_name is not None: + object_name = object_name.split('?', 1)[0] + account = account.split('?', 1)[0] + query = None + if '?' in request: + request, query = request.split('?', 1) + args = query.split('&') + # Count each query argument. This is used later to aggregate + # the number of format, prefix, etc. queries. + for q in args: + if '=' in q: + k, v = q.split('=', 1) + else: + k = q + # Certain keys will get summmed in stats reporting + # (format, path, delimiter, etc.). Save a "1" here + # to indicate that this request is 1 request for + # its respective key. + if k in LISTING_PARAMS: + d[k] = 1 + d['client_ip'] = client_ip + d['lb_ip'] = lb_ip + d['method'] = method + d['request'] = request + if query: + d['query'] = query + d['http_version'] = http_version + d['code'] = code + d['referrer'] = referrer + d['user_agent'] = user_agent + d['auth_token'] = auth_token + d['bytes_in'] = bytes_in + d['bytes_out'] = bytes_out + d['etag'] = etag + d['trans_id'] = trans_id + d['processing_time'] = processing_time + day, month, year, hour, minute, second = timestamp.split('/') + d['day'] = day + month = ('%02s' % month_map.index(month)).replace(' ', '0') + d['month'] = month + d['year'] = year + d['hour'] = hour + d['minute'] = minute + d['second'] = second + d['tz'] = '+0000' + d['account'] = account + d['container_name'] = container_name + d['object_name'] = object_name + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) + d['code'] = int(d['code']) + return d + + def process(self, obj_stream, data_object_account, data_object_container, + data_object_name): + '''generate hourly groupings of data from one access log file''' + hourly_aggr_info = {} + total_lines = 0 + bad_lines = 0 + for line in obj_stream: + line_data = self.log_line_parser(line) + total_lines += 1 + if not line_data: + bad_lines += 1 + continue + account = line_data['account'] + container_name = line_data['container_name'] + year = line_data['year'] + month = line_data['month'] + day = line_data['day'] + hour = line_data['hour'] + bytes_out = line_data['bytes_out'] + bytes_in = line_data['bytes_in'] + method = line_data['method'] + code = int(line_data['code']) + object_name = line_data['object_name'] + client_ip = line_data['client_ip'] + + op_level = None + if not container_name: + op_level = 'account' + elif container_name and not object_name: + op_level = 'container' + elif object_name: + op_level = 'object' + + aggr_key = (account, year, month, day, hour) + d = hourly_aggr_info.get(aggr_key, {}) + if line_data['lb_ip'] in self.lb_private_ips: + source = 'service' + else: + source = 'public' + + if line_data['client_ip'] in self.service_ips: + source = 'service' + + d[(source, 'bytes_out')] = d.setdefault(( + source, 'bytes_out'), 0) + bytes_out + d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ + bytes_in + + d['format_query'] = d.setdefault('format_query', 0) + \ + line_data.get('format', 0) + d['marker_query'] = d.setdefault('marker_query', 0) + \ + line_data.get('marker', 0) + d['prefix_query'] = d.setdefault('prefix_query', 0) + \ + line_data.get('prefix', 0) + d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ + line_data.get('delimiter', 0) + path = line_data.get('path', 0) + d['path_query'] = d.setdefault('path_query', 0) + path + + code = '%dxx' % (code / 100) + key = (source, op_level, method, code) + d[key] = d.setdefault(key, 0) + 1 + + hourly_aggr_info[aggr_key] = d + if bad_lines > (total_lines * self.warn_percent): + name = '/'.join([data_object_account, data_object_container, + data_object_name]) + self.logger.warning(_('I found a bunch of bad lines in %(name)s '\ + '(%(bad)d bad, %(total)d total)') % + {'name': name, 'bad': bad_lines, 'total': total_lines}) + return hourly_aggr_info + + def keylist_mapping(self): + source_keys = 'service public'.split() + level_keys = 'account container object'.split() + verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() + code_keys = '2xx 4xx 5xx'.split() + + keylist_mapping = { + # : or + 'service_bw_in': ('service', 'bytes_in'), + 'service_bw_out': ('service', 'bytes_out'), + 'public_bw_in': ('public', 'bytes_in'), + 'public_bw_out': ('public', 'bytes_out'), + 'account_requests': set(), + 'container_requests': set(), + 'object_requests': set(), + 'service_request': set(), + 'public_request': set(), + 'ops_count': set(), + } + for verb in verb_keys: + keylist_mapping[verb] = set() + for code in code_keys: + keylist_mapping[code] = set() + for source in source_keys: + for level in level_keys: + for verb in verb_keys: + for code in code_keys: + keylist_mapping['account_requests'].add( + (source, 'account', verb, code)) + keylist_mapping['container_requests'].add( + (source, 'container', verb, code)) + keylist_mapping['object_requests'].add( + (source, 'object', verb, code)) + keylist_mapping['service_request'].add( + ('service', level, verb, code)) + keylist_mapping['public_request'].add( + ('public', level, verb, code)) + keylist_mapping[verb].add( + (source, level, verb, code)) + keylist_mapping[code].add( + (source, level, verb, code)) + keylist_mapping['ops_count'].add( + (source, level, verb, code)) + return keylist_mapping diff --git a/slogging/compressing_file_reader.py b/slogging/compressing_file_reader.py new file mode 100644 index 0000000..c581bdd --- /dev/null +++ b/slogging/compressing_file_reader.py @@ -0,0 +1,73 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zlib +import struct + + +class CompressingFileReader(object): + ''' + Wraps a file object and provides a read method that returns gzip'd data. + + One warning: if read is called with a small value, the data returned may + be bigger than the value. In this case, the "compressed" data will be + bigger than the original data. To solve this, use a bigger read buffer. + + An example use case: + Given an uncompressed file on disk, provide a way to read compressed data + without buffering the entire file data in memory. Using this class, an + uncompressed log file could be uploaded as compressed data with chunked + transfer encoding. + + gzip header and footer code taken from the python stdlib gzip module + + :param file_obj: File object to read from + :param compresslevel: compression level + ''' + + def __init__(self, file_obj, compresslevel=9): + self._f = file_obj + self._compressor = zlib.compressobj(compresslevel, + zlib.DEFLATED, + -zlib.MAX_WBITS, + zlib.DEF_MEM_LEVEL, + 0) + self.done = False + self.first = True + self.crc32 = 0 + self.total_size = 0 + + def read(self, *a, **kw): + if self.done: + return '' + x = self._f.read(*a, **kw) + if x: + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL + self.total_size += len(x) + compressed = self._compressor.compress(x) + if not compressed: + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) + else: + compressed = self._compressor.flush(zlib.Z_FINISH) + crc32 = struct.pack(" 299) \ + and tries < self.retries: + req_copy = webob_request_copy(req, source_file=source_file, + compress=compress) + resp = self.upload_app.handle_request(req_copy) + tries += 1 + return resp + + def upload_file(self, source_file, account, container, object_name, + compress=True, content_type='application/x-gzip', + etag=None): + """ + Upload a file to cloud files. + + :param source_file: path to or file like object to upload + :param account: account to upload to + :param container: container to upload to + :param object_name: name of object being uploaded + :param compress: if True, compresses object as it is uploaded + :param content_type: content-type of object + :param etag: etag for object to check successful upload + :returns: True if successful, False otherwise + """ + target_name = '/v1/%s/%s/%s' % (account, container, object_name) + + # create the container + if not self.create_container(account, container): + return False + + # upload the file to the account + req = webob.Request.blank(target_name, content_type=content_type, + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Transfer-Encoding': 'chunked'}) + req.content_length = None # to make sure we send chunked data + if etag: + req.headers['etag'] = etag + resp = self._handle_request(req, source_file=source_file, + compress=compress) + if not (200 <= resp.status_int < 300): + return False + return True + + def get_object(self, account, container, object_name): + """ + Get object. + + :param account: account name object is in + :param container: container name object is in + :param object_name: name of object to get + :returns: iterator for object data + """ + req = webob.Request.blank('/v1/%s/%s/%s' % + (account, container, object_name), + environ={'REQUEST_METHOD': 'GET'}) + resp = self._handle_request(req) + return resp.status_int, resp.app_iter + + def create_container(self, account, container): + """ + Create container. + + :param account: account name to put the container in + :param container: container name to create + :returns: True if successful, otherwise False + """ + req = webob.Request.blank('/v1/%s/%s' % (account, container), + environ={'REQUEST_METHOD': 'PUT'}) + resp = self._handle_request(req) + return 200 <= resp.status_int < 300 + + def get_container_list(self, account, container, marker=None, + end_marker=None, limit=None, prefix=None, + delimiter=None, full_listing=True): + """ + Get a listing of objects for the container. + + :param account: account name for the container + :param container: container name to get a listing for + :param marker: marker query + :param end_marker: end marker query + :param limit: limit query + :param prefix: prefix query + :param delimeter: string to delimit the queries on + :param full_listing: if True, return a full listing, else returns a max + of 10000 listings + :returns: list of objects + """ + if full_listing: + rv = [] + listing = self.get_container_list(account, container, marker, + end_marker, limit, prefix, + delimiter, full_listing=False) + while listing: + rv.extend(listing) + if not delimiter: + marker = listing[-1]['name'] + else: + marker = listing[-1].get('name', listing[-1].get('subdir')) + listing = self.get_container_list(account, container, marker, + end_marker, limit, prefix, + delimiter, + full_listing=False) + return rv + path = '/v1/%s/%s' % (account, quote(container)) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if end_marker: + qs += '&end_marker=%s' % quote(end_marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + path += '?%s' % qs + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) + resp = self._handle_request(req) + if resp.status_int < 200 or resp.status_int >= 300: + return [] # TODO: distinguish between 404 and empty container + if resp.status_int == 204: + return [] + return json_loads(resp.body) diff --git a/slogging/log_processor.py b/slogging/log_processor.py new file mode 100644 index 0000000..7f76305 --- /dev/null +++ b/slogging/log_processor.py @@ -0,0 +1,581 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ConfigParser import ConfigParser +import zlib +import time +import datetime +import cStringIO +import collections +from paste.deploy import appconfig +import multiprocessing +import Queue +import cPickle +import hashlib + +from swift.common.internal_proxy import InternalProxy +from swift.common.exceptions import ChunkReadTimeout +from swift.common.utils import get_logger, readconf, TRUE_VALUES +from swift.common.daemon import Daemon + +now = datetime.datetime.now + + +class BadFileDownload(Exception): + + def __init__(self, status_code=None): + self.status_code = status_code + + +class LogProcessor(object): + """Load plugins, process logs""" + + def __init__(self, conf, logger): + if isinstance(logger, tuple): + self.logger = get_logger(*logger, log_route='log-processor') + else: + self.logger = logger + + self.conf = conf + self._internal_proxy = None + + # load the processing plugins + self.plugins = {} + plugin_prefix = 'log-processor-' + for section in (x for x in conf if x.startswith(plugin_prefix)): + plugin_name = section[len(plugin_prefix):] + plugin_conf = conf.get(section, {}) + if plugin_conf.get('processable', 'true').lower() not in \ + TRUE_VALUES: + continue + self.plugins[plugin_name] = plugin_conf + class_path = self.plugins[plugin_name]['class_path'] + import_target, class_name = class_path.rsplit('.', 1) + module = __import__(import_target, fromlist=[import_target]) + klass = getattr(module, class_name) + self.plugins[plugin_name]['instance'] = klass(plugin_conf) + self.logger.debug(_('Loaded plugin "%s"') % plugin_name) + + @property + def internal_proxy(self): + if self._internal_proxy is None: + stats_conf = self.conf.get('log-processor', {}) + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig( + 'config:%s' % proxy_server_conf_loc, + name='proxy-server') + self._internal_proxy = InternalProxy(proxy_server_conf, + self.logger, + retries=3) + return self._internal_proxy + + def process_one_file(self, plugin_name, account, container, object_name): + self.logger.info(_('Processing %(obj)s with plugin "%(plugin)s"') % + {'obj': '/'.join((account, container, object_name)), + 'plugin': plugin_name}) + # get an iter of the object data + compressed = object_name.endswith('.gz') + stream = self.get_object_data(account, container, object_name, + compressed=compressed) + # look up the correct plugin and send the stream to it + return self.plugins[plugin_name]['instance'].process(stream, + account, + container, + object_name) + + def get_data_list(self, start_date=None, end_date=None, + listing_filter=None): + total_list = [] + for plugin_name, data in self.plugins.items(): + account = data['swift_account'] + container = data['container_name'] + listing = self.get_container_listing(account, + container, + start_date, + end_date) + for object_name in listing: + # The items in this list end up being passed as positional + # parameters to process_one_file. + x = (plugin_name, account, container, object_name) + if x not in listing_filter: + total_list.append(x) + return total_list + + def get_container_listing(self, swift_account, container_name, + start_date=None, end_date=None, + listing_filter=None): + ''' + Get a container listing, filtered by start_date, end_date, and + listing_filter. Dates, if given, must be in YYYYMMDDHH format + ''' + search_key = None + if start_date is not None: + try: + parsed_date = time.strptime(start_date, '%Y%m%d%H') + except ValueError: + pass + else: + year = '%04d' % parsed_date.tm_year + month = '%02d' % parsed_date.tm_mon + day = '%02d' % parsed_date.tm_mday + hour = '%02d' % parsed_date.tm_hour + search_key = '/'.join([year, month, day, hour]) + end_key = None + if end_date is not None: + try: + parsed_date = time.strptime(end_date, '%Y%m%d%H') + except ValueError: + pass + else: + year = '%04d' % parsed_date.tm_year + month = '%02d' % parsed_date.tm_mon + day = '%02d' % parsed_date.tm_mday + # Since the end_marker filters by <, we need to add something + # to make sure we get all the data under the last hour. Adding + # one to the hour should be all-inclusive. + hour = '%02d' % (parsed_date.tm_hour + 1) + end_key = '/'.join([year, month, day, hour]) + container_listing = self.internal_proxy.get_container_list( + swift_account, + container_name, + marker=search_key, + end_marker=end_key) + results = [] + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if name not in listing_filter: + results.append(name) + return results + + def get_object_data(self, swift_account, container_name, object_name, + compressed=False): + '''reads an object and yields its lines''' + code, o = self.internal_proxy.get_object(swift_account, container_name, + object_name) + if code < 200 or code >= 300: + raise BadFileDownload(code) + last_part = '' + last_compressed_part = '' + # magic in the following zlib.decompressobj argument is courtesy of + # Python decompressing gzip chunk-by-chunk + # http://stackoverflow.com/questions/2423866 + d = zlib.decompressobj(16 + zlib.MAX_WBITS) + try: + for chunk in o: + if compressed: + try: + chunk = d.decompress(chunk) + except zlib.error: + self.logger.debug(_('Bad compressed data for %s') + % '/'.join((swift_account, container_name, + object_name))) + raise BadFileDownload() # bad compressed data + parts = chunk.split('\n') + parts[0] = last_part + parts[0] + for part in parts[:-1]: + yield part + last_part = parts[-1] + if last_part: + yield last_part + except ChunkReadTimeout: + raise BadFileDownload() + + def generate_keylist_mapping(self): + keylist = {} + for plugin in self.plugins: + plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() + if not plugin_keylist: + continue + for k, v in plugin_keylist.items(): + o = keylist.get(k) + if o: + if isinstance(o, set): + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = set(o) + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = v + keylist[k] = o + return keylist + + +class LogProcessorDaemon(Daemon): + """ + Gather raw log data and farm proccessing to generate a csv that is + uploaded to swift. + """ + + def __init__(self, conf): + c = conf.get('log-processor') + super(LogProcessorDaemon, self).__init__(c) + self.total_conf = conf + self.logger = get_logger(c, log_route='log-processor') + self.log_processor = LogProcessor(conf, self.logger) + self.lookback_hours = int(c.get('lookback_hours', '120')) + self.lookback_window = int(c.get('lookback_window', + str(self.lookback_hours))) + self.log_processor_account = c['swift_account'] + self.log_processor_container = c.get('container_name', + 'log_processing_data') + self.worker_count = int(c.get('worker_count', '1')) + self._keylist_mapping = None + self.processed_files_filename = 'processed_files.pickle.gz' + + def get_lookback_interval(self): + """ + :returns: lookback_start, lookback_end. + + Both or just lookback_end can be None. Otherwise, returns strings + of the form 'YYYYMMDDHH'. The interval returned is used as bounds + when looking for logs to processes. + + A returned None means don't limit the log files examined on that + side of the interval. + """ + + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = datetime.timedelta(hours=self.lookback_window) + lookback_end = now() - \ + delta_hours + \ + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + return lookback_start, lookback_end + + def get_processed_files_list(self): + """ + :returns: a set of files that have already been processed or returns + None on error. + + Downloads the set from the stats account. Creates an empty set if + the an existing file cannot be found. + """ + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + stream = self.log_processor.get_object_data( + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename, + compressed=True) + buf = '\n'.join(x for x in stream) + if buf: + files = cPickle.loads(buf) + else: + return None + except BadFileDownload, err: + if err.status_code == 404: + files = set() + else: + return None + return files + + def get_aggregate_data(self, processed_files, input_data): + """ + Aggregates stats data by account/hour, summing as needed. + + :param processed_files: set of processed files + :param input_data: is the output from multiprocess_collate/the plugins. + + :returns: A dict containing data aggregated from the input_data + passed in. + + The dict returned has tuple keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + key:field_value + - key corresponds to something in one of the plugin's keylist + mapping, something like the tuple (source, level, verb, code) + - field_value is the sum of the field_values for the + corresponding values in the input + + Both input_data and the dict returned are hourly aggregations of + stats. + + Multiple values for the same (account, hour, tuple key) found in + input_data are summed in the dict returned. + """ + + aggr_data = {} + for item, data in input_data: + # since item contains the plugin and the log name, new plugins will + # "reprocess" the file and the results will be in the final csv. + processed_files.add(item) + for k, d in data.items(): + existing_data = aggr_data.get(k, {}) + for i, j in d.items(): + current = existing_data.get(i, 0) + # merging strategy for key collisions is addition + # processing plugins need to realize this + existing_data[i] = current + j + aggr_data[k] = existing_data + return aggr_data + + def get_final_info(self, aggr_data): + """ + Aggregates data from aggr_data based on the keylist mapping. + + :param aggr_data: The results of the get_aggregate_data function. + :returns: a dict of further aggregated data + + The dict returned has keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + 'field_name': field_value (int) + + Data is aggregated as specified by the keylist mapping. The + keylist mapping specifies which keys to combine in aggr_data + and the final field_names for these combined keys in the dict + returned. Fields combined are summed. + """ + + final_info = collections.defaultdict(dict) + for account, data in aggr_data.items(): + for key, mapping in self.keylist_mapping.items(): + if isinstance(mapping, (list, set)): + value = 0 + for k in mapping: + try: + value += data[k] + except KeyError: + pass + else: + try: + value = data[mapping] + except KeyError: + value = 0 + final_info[account][key] = value + return final_info + + def store_processed_files_list(self, processed_files): + """ + Stores the proccessed files list in the stats account. + + :param processed_files: set of processed files + """ + + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename) + + def get_output(self, final_info): + """ + :returns: a list of rows to appear in the csv file. + + The first row contains the column headers for the rest of the + rows in the returned list. + + Each row after the first row corresponds to an account's data + for that hour. + """ + + sorted_keylist_mapping = sorted(self.keylist_mapping) + columns = ['data_ts', 'account'] + sorted_keylist_mapping + output = [columns] + for (account, year, month, day, hour), d in final_info.items(): + data_ts = '%04d/%02d/%02d %02d:00:00' % \ + (int(year), int(month), int(day), int(hour)) + row = [data_ts, '%s' % (account)] + for k in sorted_keylist_mapping: + row.append(str(d[k])) + output.append(row) + return output + + def store_output(self, output): + """ + Takes the a list of rows and stores a csv file of the values in the + stats account. + + :param output: list of rows to appear in the csv file + + This csv file is final product of this script. + """ + + out_buf = '\n'.join([','.join(row) for row in output]) + h = hashlib.md5(out_buf).hexdigest() + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h + f = cStringIO.StringIO(out_buf) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + upload_name) + + @property + def keylist_mapping(self): + """ + :returns: the keylist mapping. + + The keylist mapping determines how the stats fields are aggregated + in the final aggregation step. + """ + + if self._keylist_mapping == None: + self._keylist_mapping = \ + self.log_processor.generate_keylist_mapping() + return self._keylist_mapping + + def process_logs(self, logs_to_process, processed_files): + """ + :param logs_to_process: list of logs to process + :param processed_files: set of processed files + + :returns: returns a list of rows of processed data. + + The first row is the column headers. The rest of the rows contain + hourly aggregate data for the account specified in the row. + + Files processed are added to the processed_files set. + + When a large data structure is no longer needed, it is deleted in + an effort to conserve memory. + """ + + # map + processor_args = (self.total_conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process, + self.worker_count) + + # reduce + aggr_data = self.get_aggregate_data(processed_files, results) + del results + + # group + # reduce a large number of keys in aggr_data[k] to a small + # number of output keys + final_info = self.get_final_info(aggr_data) + del aggr_data + + # output + return self.get_output(final_info) + + def run_once(self, *args, **kwargs): + """ + Process log files that fall within the lookback interval. + + Upload resulting csv file to stats account. + + Update processed files list and upload to stats account. + """ + + for k in 'lookback_hours lookback_window'.split(): + if k in kwargs and kwargs[k] is not None: + setattr(self, k, kwargs[k]) + + start = time.time() + self.logger.info(_("Beginning log processing")) + + lookback_start, lookback_end = self.get_lookback_interval() + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + + processed_files = self.get_processed_files_list() + if processed_files == None: + self.logger.error(_('Log processing unable to load list of ' + 'already processed log files')) + return + self.logger.debug(_('found %d processed files') % + len(processed_files)) + + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, processed_files) + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + + if logs_to_process: + output = self.process_logs(logs_to_process, processed_files) + self.store_output(output) + del output + + self.store_processed_files_list(processed_files) + + self.logger.info(_("Log processing done (%0.2f minutes)") % + ((time.time() - start) / 60)) + + +def multiprocess_collate(processor_args, logs_to_process, worker_count): + ''' + yield hourly data from logs_to_process + Every item that this function yields will be added to the processed files + list. + ''' + results = [] + in_queue = multiprocessing.Queue() + out_queue = multiprocessing.Queue() + for _junk in range(worker_count): + p = multiprocessing.Process(target=collate_worker, + args=(processor_args, + in_queue, + out_queue)) + p.start() + results.append(p) + for x in logs_to_process: + in_queue.put(x) + for _junk in range(worker_count): + in_queue.put(None) # tell the worker to end + while True: + try: + item, data = out_queue.get_nowait() + except Queue.Empty: + time.sleep(.01) + else: + if not isinstance(data, Exception): + yield item, data + if not any(r.is_alive() for r in results) and out_queue.empty(): + # all the workers are done and nothing is in the queue + break + + +def collate_worker(processor_args, in_queue, out_queue): + '''worker process for multiprocess_collate''' + p = LogProcessor(*processor_args) + while True: + item = in_queue.get() + if item is None: + # no more work to process + break + try: + ret = p.process_one_file(*item) + except Exception, err: + item_string = '/'.join(item[1:]) + p.logger.exception("Unable to process file '%s'" % (item_string)) + ret = err + out_queue.put((item, ret)) diff --git a/slogging/log_uploader.py b/slogging/log_uploader.py new file mode 100644 index 0000000..ea51061 --- /dev/null +++ b/slogging/log_uploader.py @@ -0,0 +1,188 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +import os +import hashlib +import time +import gzip +import re +import sys +from paste.deploy import appconfig + +from swift.common.internal_proxy import InternalProxy +from swift.common.daemon import Daemon +from swift.common import utils + + +class LogUploader(Daemon): + ''' + Given a local directory, a swift account, and a container name, LogParser + will upload all files in the local directory to the given account/ + container. All but the newest files will be uploaded, and the files' md5 + sum will be computed. The hash is used to prevent duplicate data from + being uploaded multiple times in different files (ex: log lines). Since + the hash is computed, it is also used as the uploaded object's etag to + ensure data integrity. + + Note that after the file is successfully uploaded, it will be unlinked. + + The given proxy server config is used to instantiate a proxy server for + the object uploads. + + The default log file format is: plugin_name-%Y%m%d%H* . Any other format + of log file names must supply a regular expression that defines groups + for year, month, day, and hour. The regular expression will be evaluated + with re.VERBOSE. A common example may be: + source_filename_pattern = ^cdn_logger- + (?P[0-9]{4}) + (?P[0-1][0-9]) + (?P[0-3][0-9]) + (?P[0-2][0-9]) + .*$ + ''' + + def __init__(self, uploader_conf, plugin_name, regex=None, cutoff=None): + super(LogUploader, self).__init__(uploader_conf) + log_name = '%s-log-uploader' % plugin_name + self.logger = utils.get_logger(uploader_conf, log_name, + log_route=plugin_name) + self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/') + self.swift_account = uploader_conf['swift_account'] + self.container_name = uploader_conf['container_name'] + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, + name='proxy-server') + self.internal_proxy = InternalProxy(proxy_server_conf) + self.new_log_cutoff = int(cutoff or + uploader_conf.get('new_log_cutoff', '7200')) + self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \ + utils.TRUE_VALUES + self.filename_pattern = regex or \ + uploader_conf.get('source_filename_pattern', + ''' + ^%s- + (?P[0-9]{4}) + (?P[0-1][0-9]) + (?P[0-3][0-9]) + (?P[0-2][0-9]) + .*$''' % plugin_name) + + def run_once(self, *args, **kwargs): + self.logger.info(_("Uploading logs")) + start = time.time() + self.upload_all_logs() + self.logger.info(_("Uploading logs complete (%0.2f minutes)") % + ((time.time() - start) / 60)) + + def get_relpath_to_files_under_log_dir(self): + """ + Look under log_dir recursively and return all filenames as relpaths + + :returns : list of strs, the relpath to all filenames under log_dir + """ + all_files = [] + for path, dirs, files in os.walk(self.log_dir): + all_files.extend(os.path.join(path, f) for f in files) + return [os.path.relpath(f, start=self.log_dir) for f in all_files] + + def filter_files(self, all_files): + """ + Filter files based on regex pattern + + :param all_files: list of strs, relpath of the filenames under log_dir + :param pattern: regex pattern to match against filenames + + :returns : dict mapping full path of file to match group dict + """ + filename2match = {} + found_match = False + for filename in all_files: + match = re.match(self.filename_pattern, filename, re.VERBOSE) + if match: + found_match = True + full_path = os.path.join(self.log_dir, filename) + filename2match[full_path] = match.groupdict() + else: + self.logger.debug(_('%(filename)s does not match ' + '%(pattern)s') % {'filename': filename, + 'pattern': self.filename_pattern}) + return filename2match + + def upload_all_logs(self): + """ + Match files under log_dir to source_filename_pattern and upload to + swift + """ + all_files = self.get_relpath_to_files_under_log_dir() + filename2match = self.filter_files(all_files) + if not filename2match: + self.logger.error(_('No files in %(log_dir)s match %(pattern)s') % + {'log_dir': self.log_dir, + 'pattern': self.filename_pattern}) + sys.exit(1) + if not self.internal_proxy.create_container(self.swift_account, + self.container_name): + self.logger.error(_('Unable to create container for ' + '%(account)s/%(container)s') % { + 'account': self.swift_account, + 'container': self.container_name}) + return + for filename, match in filename2match.items(): + # don't process very new logs + seconds_since_mtime = time.time() - os.stat(filename).st_mtime + if seconds_since_mtime < self.new_log_cutoff: + self.logger.debug(_("Skipping log: %(file)s " + "(< %(cutoff)d seconds old)") % { + 'file': filename, + 'cutoff': self.new_log_cutoff}) + continue + self.upload_one_log(filename, **match) + + def upload_one_log(self, filename, year, month, day, hour): + """ + Upload one file to swift + """ + if os.path.getsize(filename) == 0: + self.logger.debug(_("Log %s is 0 length, skipping") % filename) + return + self.logger.debug(_("Processing log: %s") % filename) + filehash = hashlib.md5() + already_compressed = True if filename.endswith('.gz') else False + opener = gzip.open if already_compressed else open + f = opener(filename, 'rb') + try: + for line in f: + # filter out bad lines here? + filehash.update(line) + finally: + f.close() + filehash = filehash.hexdigest() + # By adding a hash to the filename, we ensure that uploaded files + # have unique filenames and protect against uploading one file + # more than one time. By using md5, we get an etag for free. + target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) + if self.internal_proxy.upload_file(filename, + self.swift_account, + self.container_name, + target_filename, + compress=(not already_compressed)): + self.logger.debug(_("Uploaded log %(file)s to %(target)s") % + {'file': filename, 'target': target_filename}) + if self.unlink_log: + os.unlink(filename) + else: + self.logger.error(_("ERROR: Upload of log %s failed!") % filename) diff --git a/slogging/stats_processor.py b/slogging/stats_processor.py new file mode 100644 index 0000000..f9496c1 --- /dev/null +++ b/slogging/stats_processor.py @@ -0,0 +1,69 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.common.utils import get_logger + + +class StatsLogProcessor(object): + """Transform account storage stat logs""" + + def __init__(self, conf): + self.logger = get_logger(conf, log_route='stats-processor') + + def process(self, obj_stream, data_object_account, data_object_container, + data_object_name): + '''generate hourly groupings of data from one stats log file''' + account_totals = {} + year, month, day, hour, _junk = data_object_name.split('/') + for line in obj_stream: + if not line: + continue + try: + (account, + container_count, + object_count, + bytes_used) = line.split(',') + except (IndexError, ValueError): + # bad line data + self.logger.debug(_('Bad line data: %s') % repr(line)) + continue + account = account.strip('"') + container_count = int(container_count.strip('"')) + object_count = int(object_count.strip('"')) + bytes_used = int(bytes_used.strip('"')) + aggr_key = (account, year, month, day, hour) + d = account_totals.get(aggr_key, {}) + d['replica_count'] = d.setdefault('replica_count', 0) + 1 + d['container_count'] = d.setdefault('container_count', 0) + \ + container_count + d['object_count'] = d.setdefault('object_count', 0) + \ + object_count + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ + bytes_used + account_totals[aggr_key] = d + return account_totals + + def keylist_mapping(self): + ''' + returns a dictionary of final keys mapped to source keys + ''' + keylist_mapping = { + # : or + 'bytes_used': 'bytes_used', + 'container_count': 'container_count', + 'object_count': 'object_count', + 'replica_count': 'replica_count', + } + return keylist_mapping diff --git a/test_slogging/__init__.py b/test_slogging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_slogging/test_access_processor.py b/test_slogging/test_access_processor.py new file mode 100644 index 0000000..18e1880 --- /dev/null +++ b/test_slogging/test_access_processor.py @@ -0,0 +1,94 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import access_processor + + +class TestAccessProcessor(unittest.TestCase): + + def test_log_line_parser_query_args(self): + p = access_processor.AccessLogProcessor({}) + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + query = 'foo' + for param in access_processor.LISTING_PARAMS: + query += '&%s=blah' % param + log_line[6] = '/v1/a/c/o?%s' % query + log_line = 'x'*16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + for param in access_processor.LISTING_PARAMS: + expected[param] = 1 + expected['query'] = query + self.assertEquals(res, expected) + + def test_log_line_parser_field_count(self): + p = access_processor.AccessLogProcessor({}) + # too few fields + log_line = [str(x) for x in range(17)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x'*16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {} + self.assertEquals(res, expected) + # right amount of fields + log_line = [str(x) for x in range(18)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x'*16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + self.assertEquals(res, expected) + # too many fields + log_line = [str(x) for x in range(19)] + log_line[1] = 'proxy-server' + log_line[4] = '1/Jan/3/4/5/6' + log_line[6] = '/v1/a/c/o' + log_line = 'x'*16 + ' '.join(log_line) + res = p.log_line_parser(log_line) + expected = {'code': 8, 'processing_time': '17', 'auth_token': '11', + 'month': '01', 'second': '6', 'year': '3', 'tz': '+0000', + 'http_version': '7', 'object_name': 'o', 'etag': '14', + 'method': '5', 'trans_id': '15', 'client_ip': '2', + 'bytes_out': 13, 'container_name': 'c', 'day': '1', + 'minute': '5', 'account': 'a', 'hour': '4', + 'referrer': '9', 'request': '/v1/a/c/o', + 'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'} + self.assertEquals(res, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/test_slogging/test_compressing_file_reader.py b/test_slogging/test_compressing_file_reader.py new file mode 100644 index 0000000..65c2955 --- /dev/null +++ b/test_slogging/test_compressing_file_reader.py @@ -0,0 +1,34 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Tests for swift.common.compressing_file_reader """ + +import unittest +import cStringIO + +from swift.common.compressing_file_reader import CompressingFileReader + +class TestCompressingFileReader(unittest.TestCase): + + def test_read(self): + plain = 'obj\ndata' + s = cStringIO.StringIO(plain) + expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\ + 'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\ + '\x00' + x = CompressingFileReader(s) + compressed = ''.join(iter(lambda: x.read(), '')) + self.assertEquals(compressed, expected) + self.assertEquals(x.read(), '') diff --git a/test_slogging/test_db_stats_collector.py b/test_slogging/test_db_stats_collector.py new file mode 100644 index 0000000..3c4949a --- /dev/null +++ b/test_slogging/test_db_stats_collector.py @@ -0,0 +1,238 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import os +import time +import uuid +from shutil import rmtree +from swift.stats import db_stats_collector +from tempfile import mkdtemp +from test.unit import FakeLogger +from swift.common.db import AccountBroker, ContainerBroker +from swift.common.utils import mkdirs + + +class TestDbStats(unittest.TestCase): + + def setUp(self): + self._was_logger = db_stats_collector.get_logger + db_stats_collector.get_logger = FakeLogger + self.testdir = os.path.join(mkdtemp(), 'tmp_test_db_stats') + self.devices = os.path.join(self.testdir, 'node') + rmtree(self.testdir, ignore_errors=1) + mkdirs(os.path.join(self.devices, 'sda')) + self.accounts = os.path.join(self.devices, 'sda', 'accounts') + self.containers = os.path.join(self.devices, 'sda', 'containers') + self.log_dir = '%s/log' % self.testdir + + self.conf = dict(devices=self.devices, + log_dir=self.log_dir, + mount_check='false') + + def tearDown(self): + db_stats_collector.get_logger = self._was_logger + rmtree(self.testdir) + + def test_account_stat_get_data(self): + stat = db_stats_collector.AccountStatsCollector(self.conf) + account_db = AccountBroker("%s/acc.db" % self.accounts, + account='test_acc') + account_db.initialize() + account_db.put_container('test_container', time.time(), + None, 10, 1000) + info = stat.get_data("%s/acc.db" % self.accounts) + self.assertEquals('''"test_acc",1,10,1000\n''', info) + + def test_container_stat_get_data(self): + stat = db_stats_collector.ContainerStatsCollector(self.conf) + container_db = ContainerBroker("%s/con.db" % self.containers, + account='test_acc', container='test_con') + container_db.initialize() + container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') + info = stat.get_data("%s/con.db" % self.containers) + self.assertEquals('''"test_acc","test_con",1,10\n''', info) + + def test_container_stat_get_metadata(self): + stat = db_stats_collector.ContainerStatsCollector(self.conf) + container_db = ContainerBroker("%s/con.db" % self.containers, + account='test_acc', container='test_con') + container_db.initialize() + container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') + info = stat.get_data("%s/con.db" % self.containers) + self.assertEquals('''"test_acc","test_con",1,10\n''', info) + container_db.update_metadata({'test1': ('val', 1000)}) + + def _gen_account_stat(self): + stat = db_stats_collector.AccountStatsCollector(self.conf) + output_data = set() + for i in range(10): + account_db = AccountBroker("%s/stats-201001010%s-%s.db" % + (self.accounts, i, uuid.uuid4().hex), + account='test_acc_%s' % i) + account_db.initialize() + account_db.put_container('test_container', time.time(), + None, 10, 1000) + # this will "commit" the data + account_db.get_info() + output_data.add('''"test_acc_%s",1,10,1000''' % i), + + self.assertEqual(len(output_data), 10) + return stat, output_data + + def _drop_metadata_col(self, broker, acc_name): + broker.conn.execute('''drop table container_stat''') + broker.conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT DEFAULT '%s', + container TEXT DEFAULT 'test_con', + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0' + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (1, 10); + """ % acc_name) + + def _gen_container_stat(self, set_metadata=False, drop_metadata=False): + if set_metadata: + self.conf['metadata_keys'] = 'test1,test2' + # webob runs title on all headers + stat = db_stats_collector.ContainerStatsCollector(self.conf) + output_data = set() + for i in range(10): + cont_db = ContainerBroker( + "%s/container-stats-201001010%s-%s.db" % (self.containers, i, + uuid.uuid4().hex), + account='test_acc_%s' % i, container='test_con') + cont_db.initialize() + cont_db.put_object('test_obj', time.time(), 10, 'text', 'faketag') + metadata_output = '' + if set_metadata: + if i % 2: + cont_db.update_metadata({'X-Container-Meta-Test1': (5, 1)}) + metadata_output = ',1,' + else: + cont_db.update_metadata({'X-Container-Meta-Test2': (7, 2)}) + metadata_output = ',,1' + # this will "commit" the data + cont_db.get_info() + if drop_metadata: + output_data.add('''"test_acc_%s","test_con",1,10,,''' % i) + else: + output_data.add('''"test_acc_%s","test_con",1,10%s''' % + (i, metadata_output)) + if drop_metadata: + self._drop_metadata_col(cont_db, 'test_acc_%s' % i) + + self.assertEqual(len(output_data), 10) + return stat, output_data + + def test_account_stat_run_once_account(self): + stat, output_data = self._gen_account_stat() + stat.run_once() + stat_file = os.listdir(self.log_dir)[0] + with open(os.path.join(self.log_dir, stat_file)) as stat_handle: + for i in range(10): + data = stat_handle.readline() + output_data.discard(data.strip()) + + self.assertEqual(len(output_data), 0) + + def test_account_stat_run_once_container_metadata(self): + + stat, output_data = self._gen_container_stat(set_metadata=True) + stat.run_once() + stat_file = os.listdir(self.log_dir)[0] + with open(os.path.join(self.log_dir, stat_file)) as stat_handle: + headers = stat_handle.readline() + self.assert_(headers.startswith('Account Hash,Container Name,')) + for i in range(10): + data = stat_handle.readline() + output_data.discard(data.strip()) + + self.assertEqual(len(output_data), 0) + + def test_account_stat_run_once_container_no_metadata(self): + + stat, output_data = self._gen_container_stat(set_metadata=True, + drop_metadata=True) + stat.run_once() + stat_file = os.listdir(self.log_dir)[0] + with open(os.path.join(self.log_dir, stat_file)) as stat_handle: + headers = stat_handle.readline() + self.assert_(headers.startswith('Account Hash,Container Name,')) + for i in range(10): + data = stat_handle.readline() + output_data.discard(data.strip()) + + self.assertEqual(len(output_data), 0) + + def test_account_stat_run_once_both(self): + acc_stat, acc_output_data = self._gen_account_stat() + con_stat, con_output_data = self._gen_container_stat() + + acc_stat.run_once() + stat_file = os.listdir(self.log_dir)[0] + with open(os.path.join(self.log_dir, stat_file)) as stat_handle: + for i in range(10): + data = stat_handle.readline() + acc_output_data.discard(data.strip()) + + self.assertEqual(len(acc_output_data), 0) + + con_stat.run_once() + stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0] + with open(os.path.join(self.log_dir, stat_file)) as stat_handle: + headers = stat_handle.readline() + self.assert_(headers.startswith('Account Hash,Container Name,')) + for i in range(10): + data = stat_handle.readline() + con_output_data.discard(data.strip()) + + self.assertEqual(len(con_output_data), 0) + + def test_account_stat_run_once_fail(self): + stat, output_data = self._gen_account_stat() + rmtree(self.accounts) + stat.run_once() + self.assertEquals(len(stat.logger.log_dict['debug']), 1) + + def test_not_implemented(self): + db_stat = db_stats_collector.DatabaseStatsCollector(self.conf, + 'account', 'test_dir', 'stats-%Y%m%d%H_') + self.assertRaises(NotImplementedError, db_stat.get_data) + self.assertRaises(NotImplementedError, db_stat.get_header) + + def test_not_not_mounted(self): + self.conf['mount_check'] = 'true' + stat, output_data = self._gen_account_stat() + stat.run_once() + self.assertEquals(len(stat.logger.log_dict['error']), 1) + +if __name__ == '__main__': + unittest.main() diff --git a/test_slogging/test_internal_proxy.py b/test_slogging/test_internal_proxy.py new file mode 100644 index 0000000..a2e82f8 --- /dev/null +++ b/test_slogging/test_internal_proxy.py @@ -0,0 +1,192 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +import webob +import tempfile +import json + +from swift.common import internal_proxy + +class DumbBaseApplicationFactory(object): + + def __init__(self, status_codes, body=''): + self.status_codes = status_codes[:] + self.body = body + + def __call__(self, *a, **kw): + app = DumbBaseApplication(*a, **kw) + app.status_codes = self.status_codes + try: + app.default_status_code = self.status_codes[-1] + except IndexError: + app.default_status_code = 200 + app.body = self.body + return app + +class DumbBaseApplication(object): + + def __init__(self, *a, **kw): + self.status_codes = [] + self.default_status_code = 200 + self.call_count = 0 + self.body = '' + + def handle_request(self, req): + self.call_count += 1 + req.path_info_pop() + if isinstance(self.body, list): + try: + body = self.body.pop(0) + except IndexError: + body = '' + else: + body = self.body + resp = webob.Response(request=req, body=body, + conditional_response=True) + try: + resp.status_int = self.status_codes.pop(0) + except IndexError: + resp.status_int = self.default_status_code + return resp + + def update_request(self, req): + return req + + +class TestInternalProxy(unittest.TestCase): + + def test_webob_request_copy(self): + req = webob.Request.blank('/') + req2 = internal_proxy.webob_request_copy(req) + self.assertEquals(req.path, req2.path) + self.assertEquals(req.path_info, req2.path_info) + self.assertFalse(req is req2) + + def test_handle_request(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + + def test_handle_request_with_retries(self): + status_codes = [500, 200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + self.assertEquals(p.upload_app.call_count, 2) + self.assertEquals(resp.status_int, 200) + + def test_get_object(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + code, body = p.get_object('a', 'c', 'o') + body = ''.join(body) + self.assertEquals(code, 200) + self.assertEquals(body, '') + + def test_create_container(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + resp = p.create_container('a', 'c') + self.assertTrue(resp) + + def test_handle_request_with_retries_all_error(self): + status_codes = [500, 500, 500, 500, 500] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + self.assertEquals(p.upload_app.call_count, 3) + self.assertEquals(resp.status_int, 500) + + def test_get_container_list_empty(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='[]') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + self.assertEquals(resp, []) + + def test_get_container_list_no_body(self): + status_codes = [204] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + self.assertEquals(resp, []) + + def test_get_container_list_full_listing(self): + status_codes = [200, 200] + obj_a = dict(name='foo', hash='foo', bytes=3, + content_type='text/plain', last_modified='2011/01/01') + obj_b = dict(name='bar', hash='bar', bytes=3, + content_type='text/plain', last_modified='2011/01/01') + body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body=body) + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + expected = ['foo', 'bar'] + self.assertEquals([x['name'] for x in resp], expected) + + def test_get_container_list_full(self): + status_codes = [204] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c', marker='a', end_marker='b', + limit=100, prefix='/', delimiter='.') + self.assertEquals(resp, []) + + def test_upload_file(self): + status_codes = [200, 200] # container PUT + object PUT + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + with tempfile.NamedTemporaryFile() as file_obj: + resp = p.upload_file(file_obj.name, 'a', 'c', 'o') + self.assertTrue(resp) + + def test_upload_file_with_retries(self): + status_codes = [200, 500, 200] # container PUT + error + object PUT + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + with tempfile.NamedTemporaryFile() as file_obj: + resp = p.upload_file(file_obj, 'a', 'c', 'o') + self.assertTrue(resp) + self.assertEquals(p.upload_app.call_count, 3) + + +if __name__ == '__main__': + unittest.main() diff --git a/test_slogging/test_log_processor.py b/test_slogging/test_log_processor.py new file mode 100644 index 0000000..c1b3b68 --- /dev/null +++ b/test_slogging/test_log_processor.py @@ -0,0 +1,834 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test.unit import tmpfile +import Queue +import datetime +import hashlib +import pickle +import time + +from swift.common import internal_proxy +from swift.stats import log_processor +from swift.common.exceptions import ChunkReadTimeout + + +class FakeUploadApp(object): + def __init__(self, *args, **kwargs): + pass + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + +class DumbInternalProxy(object): + def __init__(self, code=200, timeout=False, bad_compressed=False): + self.code = code + self.timeout = timeout + self.bad_compressed = bad_compressed + + def get_container_list(self, account, container, marker=None, + end_marker=None): + n = '2010/03/14/13/obj1' + if marker is None or n > marker: + if end_marker: + if n <= end_marker: + return [{'name': n}] + else: + return [] + return [{'name': n}] + return [] + + def get_object(self, account, container, object_name): + if object_name.endswith('.gz'): + if self.bad_compressed: + # invalid compressed data + def data(): + yield '\xff\xff\xff\xff\xff\xff\xff' + else: + # 'obj\ndata', compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' + else: + def data(): + yield 'obj\n' + if self.timeout: + raise ChunkReadTimeout + yield 'data' + return self.code, data() + +class TestLogProcessor(unittest.TestCase): + + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ + '09/Jul/2010/04/14/30 GET '\ + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' + stats_test_line = 'account,1,2,3' + proxy_config = {'log-processor': { + + } + } + + def test_lazy_load_internal_proxy(self): + # stub out internal_proxy's upload_app + internal_proxy.BaseApplication = FakeUploadApp + dummy_proxy_config = """[app:proxy-server] +use = egg:swift#proxy +""" + with tmpfile(dummy_proxy_config) as proxy_config_file: + conf = {'log-processor': { + 'proxy_server_conf': proxy_config_file, + } + } + p = log_processor.LogProcessor(conf, DumbLogger()) + self.assert_(isinstance(p._internal_proxy, + None.__class__)) + self.assert_(isinstance(p.internal_proxy, + log_processor.InternalProxy)) + self.assertEquals(p.internal_proxy, p._internal_proxy) + + # reset FakeUploadApp + reload(internal_proxy) + + def test_access_log_line_parser(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) + self.assertEquals(result, {'code': 200, + 'processing_time': '0.0262', + 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', + 'month': '07', + 'second': '30', + 'year': '2010', + 'query': 'format=json&foo', + 'tz': '+0000', + 'http_version': 'HTTP/1.0', + 'object_name': 'bar', + 'etag': '-', + 'method': 'GET', + 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', + 'client_ip': '1.2.3.4', + 'format': 1, + 'bytes_out': 95, + 'container_name': 'foo', + 'day': '09', + 'minute': '14', + 'account': 'acct', + 'hour': '04', + 'referrer': '-', + 'request': '/v1/acct/foo/bar', + 'user_agent': 'curl', + 'bytes_in': 6, + 'lb_ip': '4.5.6.7'}) + + def test_process_one_access_file(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + def get_object_data(*a, **kw): + return [self.access_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('access', 'a', 'c', 'o') + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(result, expected) + + def test_process_one_access_file_error(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + self.assertRaises(log_processor.BadFileDownload, p.process_one_file, + 'access', 'a', 'c', 'o') + + def test_get_container_listing(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = p.get_container_listing('a', 'foo') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', listing_filter=expected) + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031412', + end_date='2010031414') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031414') + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031410', + end_date='2010031412') + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031412', + end_date='2010031413') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + + def test_get_object_data(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = list(p.get_object_data('a', 'c', 'o', False)) + expected = ['obj','data'] + self.assertEquals(result, expected) + result = list(p.get_object_data('a', 'c', 'o.gz', True)) + self.assertEquals(result, expected) + + def test_get_object_data_errors(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(bad_compressed=True) + result = p.get_object_data('a', 'c', 'o.gz', True) + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(timeout=True) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + + def test_get_stat_totals(self): + stats_proxy_config = self.proxy_config.copy() + stats_proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.stats_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') + expected = {('account', 'y', 'm', 'd', 'h'): + {'replica_count': 1, + 'object_count': 2, + 'container_count': 1, + 'bytes_used': 3}} + self.assertEquals(result, expected) + + def test_generate_keylist_mapping(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + result = p.generate_keylist_mapping() + expected = {} + self.assertEquals(result, expected) + + def test_generate_keylist_mapping_with_dummy_plugins(self): + class Plugin1(object): + def keylist_mapping(self): + return {'a': 'b', 'c': 'd', 'e': ['f', 'g']} + class Plugin2(object): + def keylist_mapping(self): + return {'a': '1', 'e': '2', 'h': '3'} + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p.plugins['plugin1'] = {'instance': Plugin1()} + p.plugins['plugin2'] = {'instance': Plugin2()} + result = p.generate_keylist_mapping() + expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']), + 'h': '3'} + self.assertEquals(result, expected) + + def test_access_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) + + def test_stats_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) + + def test_collate_worker(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(ret, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_collate_worker_error(self): + def get_object_data(*a,**kw): + raise Exception() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + # these only work for Py2.7+ + #self.assertIsInstance(ret, log_processor.BadFileDownload) + self.assertTrue(isinstance(ret, Exception)) + finally: + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [(item, {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}})] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate_errors(self): + def get_object_data(*a,**kw): + raise log_processor.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data + try: + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + +class TestLogProcessorDaemon(unittest.TestCase): + + def test_get_lookback_interval(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, lookback_hours, lookback_window): + self.lookback_hours = lookback_hours + self.lookback_window = lookback_window + + try: + d = datetime.datetime + + for x in [ + [d(2011, 1, 1), 0, 0, None, None], + [d(2011, 1, 1), 120, 0, '2010122700', None], + [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], + [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], + [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], + [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], + ]: + + log_processor.now = lambda: x[0] + + d = MockLogProcessorDaemon(x[1], x[2]) + self.assertEquals((x[3], x[4]), d.get_lookback_interval()) + finally: + log_processor.now = datetime.datetime.now + + def test_get_processed_files_list(self): + class MockLogProcessor(): + def __init__(self, stream): + self.stream = stream + + def get_object_data(self, *args, **kwargs): + return self.stream + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, stream): + self.log_processor = MockLogProcessor(stream) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + file_list = set(['a', 'b', 'c']) + + for s, l in [['', None], + [pickle.dumps(set()).split('\n'), set()], + [pickle.dumps(file_list).split('\n'), file_list], + ]: + + self.assertEquals(l, + MockLogProcessorDaemon(s).get_processed_files_list()) + + def test_get_processed_files_list_bad_file_downloads(self): + class MockLogProcessor(): + def __init__(self, status_code): + self.err = log_processor.BadFileDownload(status_code) + + def get_object_data(self, *a, **k): + raise self.err + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, status_code): + self.log_processor = MockLogProcessor(status_code) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + for c, l in [[404, set()], [503, None], [None, None]]: + self.assertEquals(l, + MockLogProcessorDaemon(c).get_processed_files_list()) + + def test_get_aggregate_data(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys that are easier to work with + + processed_files = set() + + data_in = [ + ['file1', { + 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + ], + ['file2', {'acct1_time1': {'field1': 10}}], + ] + + expected_data_out = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + pass + + d = MockLogProcessorDaemon() + data_out = d.get_aggregate_data(processed_files, data_in) + + for k, v in expected_data_out.items(): + self.assertEquals(v, data_out[k]) + + self.assertEquals(set(['file1', 'file2']), processed_files) + + def test_get_final_info(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys/values that are easier to work with + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = { + 'out_field1':['field1', 'field2', 'field3'], + 'out_field2':['field2', 'field3'], + 'out_field3':['field3'], + 'out_field4':'field4', + 'out_field5':['field6', 'field7', 'field8'], + 'out_field6':['field6'], + 'out_field7':'field7', + } + + data_in = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, + 'field4': 8, 'field5': 11}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + expected_data_out = { + 'acct1_time1': {'out_field1': 16, 'out_field2': 5, + 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct1_time2': {'out_field1': 9, 'out_field2': 5, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct2_time1': {'out_field1': 13, 'out_field2': 7, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct3_time3': {'out_field1': 17, 'out_field2': 9, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + } + + self.assertEquals(expected_data_out, + MockLogProcessorDaemon().get_final_info(data_in)) + + def test_store_processed_files_list(self): + class MockInternalProxy: + def __init__(self, test, daemon, processed_files): + self.test = test + self.daemon = daemon + self.processed_files = processed_files + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.processed_files, + pickle.loads(f.getvalue())) + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.daemon.processed_files_filename, + filename) + + class MockLogProcessor: + def __init__(self, test, daemon, processed_files): + self.internal_proxy = MockInternalProxy(test, daemon, + processed_files) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, processed_files): + self.log_processor = \ + MockLogProcessor(test, self, processed_files) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + processed_files = set(['a', 'b', 'c']) + MockLogProcessorDaemon(self, processed_files).\ + store_processed_files_list(processed_files) + + def test_get_output(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = {'a':None, 'b':None, 'c':None} + + data_in = { + ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, + ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, + ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, + ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, + } + + expected_data_out = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + data_out = MockLogProcessorDaemon().get_output(data_in) + self.assertEquals(expected_data_out[0], data_out[0]) + + for row in data_out[1:]: + self.assert_(row in expected_data_out) + + for row in expected_data_out[1:]: + self.assert_(row in data_out) + + def test_store_output(self): + try: + real_strftime = time.strftime + mock_strftime_return = '2010/03/02/01/' + def mock_strftime(format): + self.assertEquals('%Y/%m/%d/%H/', format) + return mock_strftime_return + log_processor.time.strftime = mock_strftime + + data_in = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + expected_output = '\n'.join([','.join(row) for row in data_in]) + h = hashlib.md5(expected_output).hexdigest() + expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) + + class MockInternalProxy: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.test = test + self.daemon = daemon + self.expected_filename = expected_filename + self.expected_output = expected_output + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.expected_filename, filename) + self.test.assertEquals(self.expected_output, f.getvalue()) + + class MockLogProcessor: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.internal_proxy = MockInternalProxy(test, daemon, + expected_filename, expected_output) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, expected_filename, expected_output): + self.log_processor = MockLogProcessor(test, self, + expected_filename, expected_output) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + MockLogProcessorDaemon(self, expected_filename, expected_output).\ + store_output(data_in) + finally: + log_processor.time.strftime = real_strftime + + def test_keylist_mapping(self): + # Kind of lame test to see if the propery is both + # generated by a particular method and cached properly. + # The method that actually generates the mapping is + # tested elsewhere. + + value_return = 'keylist_mapping' + class MockLogProcessor: + def __init__(self): + self.call_count = 0 + + def generate_keylist_mapping(self): + self.call_count += 1 + return value_return + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.log_processor = MockLogProcessor() + self._keylist_mapping = None + + d = MockLogProcessorDaemon() + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(1, d.log_processor.call_count) + + def test_process_logs(self): + try: + mock_logs_to_process = 'logs_to_process' + mock_processed_files = 'processed_files' + + real_multiprocess_collate = log_processor.multiprocess_collate + multiprocess_collate_return = 'multiprocess_collate_return' + + get_aggregate_data_return = 'get_aggregate_data_return' + get_final_info_return = 'get_final_info_return' + get_output_return = 'get_output_return' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.test = test + self.total_conf = 'total_conf' + self.logger = 'logger' + self.worker_count = 'worker_count' + + def get_aggregate_data(self, processed_files, results): + self.test.assertEquals(mock_processed_files, processed_files) + self.test.assertEquals(multiprocess_collate_return, results) + return get_aggregate_data_return + + def get_final_info(self, aggr_data): + self.test.assertEquals(get_aggregate_data_return, aggr_data) + return get_final_info_return + + def get_output(self, final_info): + self.test.assertEquals(get_final_info_return, final_info) + return get_output_return + + d = MockLogProcessorDaemon(self) + + def mock_multiprocess_collate(processor_args, logs_to_process, + worker_count): + self.assertEquals(d.total_conf, processor_args[0]) + self.assertEquals(d.logger, processor_args[1]) + + self.assertEquals(mock_logs_to_process, logs_to_process) + self.assertEquals(d.worker_count, worker_count) + + return multiprocess_collate_return + + log_processor.multiprocess_collate = mock_multiprocess_collate + + output = d.process_logs(mock_logs_to_process, mock_processed_files) + self.assertEquals(get_output_return, output) + finally: + log_processor.multiprocess_collate = real_multiprocess_collate + + def test_run_once_get_processed_files_list_returns_none(self): + class MockLogProcessor: + def get_data_list(self, lookback_start, lookback_end, + processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor() + + def get_lookback_interval(self): + return None, None + + def get_processed_files_list(self): + return None + + MockLogProcessorDaemon().run_once() + + def test_run_once_no_logs_to_process(self): + class MockLogProcessor(): + def __init__(self, daemon, test): + self.daemon = daemon + self.test = test + + def get_data_list(self, lookback_start, lookback_end, + processed_files): + self.test.assertEquals(self.daemon.lookback_start, + lookback_start) + self.test.assertEquals(self.daemon.lookback_end, + lookback_end) + self.test.assertEquals(self.daemon.processed_files, + processed_files) + return [] + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor(self, test) + self.lookback_start = 'lookback_start' + self.lookback_end = 'lookback_end' + self.processed_files = ['a', 'b', 'c'] + + def get_lookback_interval(self): + return self.lookback_start, self.lookback_end + + def get_processed_files_list(self): + return self.processed_files + + def process_logs(logs_to_process, processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + MockLogProcessorDaemon(self).run_once() diff --git a/test_slogging/test_log_uploader.py b/test_slogging/test_log_uploader.py new file mode 100644 index 0000000..01bb00c --- /dev/null +++ b/test_slogging/test_log_uploader.py @@ -0,0 +1,240 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +from datetime import datetime +from tempfile import mkdtemp +from shutil import rmtree +from functools import partial +from collections import defaultdict +import random +import string + +from test.unit import temptree +from swift.stats import log_uploader + +import logging +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger() + +COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \ + '\xf3\xf3\xad\x04\x00\x00\x00' + +access_regex = ''' + ^ + (?P[0-9]{4}) + (?P[0-1][0-9]) + (?P[0-3][0-9]) + (?P[0-2][0-9]) + .*$ + ''' + + +def mock_appconfig(*args, **kwargs): + pass + + +class MockInternalProxy(): + + def __init__(self, *args, **kwargs): + pass + + def create_container(self, *args, **kwargs): + return True + + def upload_file(self, *args, **kwargs): + return True + + +_orig_LogUploader = log_uploader.LogUploader + + +class MockLogUploader(_orig_LogUploader): + + def __init__(self, conf, logger=LOGGER): + conf['swift_account'] = conf.get('swift_account', '') + conf['container_name'] = conf.get('container_name', '') + conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0') + conf['source_filename_format'] = conf.get( + 'source_filename_format', conf.get('filename_format')) + log_uploader.LogUploader.__init__(self, conf, 'plugin') + self.logger = logger + self.uploaded_files = [] + + def upload_one_log(self, filename, year, month, day, hour): + d = {'year': year, 'month': month, 'day': day, 'hour': hour} + self.uploaded_files.append((filename, d)) + _orig_LogUploader.upload_one_log(self, filename, year, month, + day, hour) + + +class TestLogUploader(unittest.TestCase): + + def setUp(self): + # mock internal proxy + self._orig_InternalProxy = log_uploader.InternalProxy + self._orig_appconfig = log_uploader.appconfig + log_uploader.InternalProxy = MockInternalProxy + log_uploader.appconfig = mock_appconfig + + def tearDown(self): + log_uploader.appconfig = self._orig_appconfig + log_uploader.InternalProxy = self._orig_InternalProxy + + def test_bad_pattern_in_config(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: + # invalid pattern + conf = {'log_dir': t, + 'source_filename_pattern': '%Y%m%d%h'} # should be %H + uploader = MockLogUploader(conf) + self.assertRaises(SystemExit, uploader.upload_all_logs) + + conf = {'log_dir': t, 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + uploader.upload_all_logs() + self.assertEquals(len(uploader.uploaded_files), 1) + + def test_pattern_upload_all_logs(self): + + # test empty dir + with temptree([]) as t: + conf = {'log_dir': t} + uploader = MockLogUploader(conf) + self.assertRaises(SystemExit, uploader.run_once) + + def get_random_length_str(max_len=10, chars=string.ascii_letters): + return ''.join(random.choice(chars) for x in + range(random.randint(1, max_len))) + + template = 'prefix_%(random)s_%(digits)s.blah.' \ + '%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz' + pattern = '''prefix_.*_[0-9]+\.blah\. + (?P[0-9]{4}) + (?P[0-1][0-9]) + (?P[0-3][0-9]) + (?P[0-2][0-9])00-[0-9]{2}00 + -[0-9]?[0-9]\.gz''' + files_that_should_match = [] + # add some files that match + for i in range(24): + fname = template % { + 'random': get_random_length_str(), + 'digits': get_random_length_str(16, string.digits), + 'datestr': datetime.now().strftime('%Y%m%d'), + 'hour': i, + 'next_hour': i + 1, + 'number': random.randint(0, 20), + } + files_that_should_match.append(fname) + + # add some files that don't match + files = list(files_that_should_match) + for i in range(24): + fname = template % { + 'random': get_random_length_str(), + 'digits': get_random_length_str(16, string.digits), + 'datestr': datetime.now().strftime('%Y%m'), + 'hour': i, + 'next_hour': i + 1, + 'number': random.randint(0, 20), + } + files.append(fname) + + for fname in files: + print fname + + with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t: + self.assertEquals(len(os.listdir(t)), 48) + conf = {'source_filename_pattern': pattern, 'log_dir': t} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(os.listdir(t)), 24) + self.assertEquals(len(uploader.uploaded_files), 24) + files_that_were_uploaded = set(x[0] for x in + uploader.uploaded_files) + for f in files_that_should_match: + self.assert_(os.path.join(t, f) in files_that_were_uploaded) + + def test_log_cutoff(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files) as t: + conf = {'log_dir': t, 'new_log_cutoff': '7200', + 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + conf = {'log_dir': t, 'new_log_cutoff': '0', + 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + + def test_create_container_fail(self): + files = [datetime.now().strftime('%Y%m%d%H')] + conf = {'source_filename_pattern': access_regex} + with temptree(files) as t: + conf['log_dir'] = t + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + + with temptree(files) as t: + conf['log_dir'] = t + uploader = MockLogUploader(conf) + # mock create_container to fail + uploader.internal_proxy.create_container = lambda *args: False + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 0) + + def test_unlink_log(self): + files = [datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA]) as t: + conf = {'log_dir': t, 'unlink_log': 'false', + 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + # file still there + self.assertEquals(len(os.listdir(t)), 1) + + conf = {'log_dir': t, 'unlink_log': 'true', + 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + uploader.run_once() + self.assertEquals(len(uploader.uploaded_files), 1) + # file gone + self.assertEquals(len(os.listdir(t)), 0) + + def test_upload_file_failed(self): + files = ['plugin-%s' % datetime.now().strftime('%Y%m%d%H')] + with temptree(files, contents=[COMPRESSED_DATA]) as t: + conf = {'log_dir': t, 'unlink_log': 'true', + 'source_filename_pattern': access_regex} + uploader = MockLogUploader(conf) + + # mock upload_file to fail, and clean up mock + def mock_upload_file(self, *args, **kwargs): + uploader.uploaded_files.pop() + return False + uploader.internal_proxy.upload_file = mock_upload_file + self.assertRaises(SystemExit, uploader.run_once) + # file still there + self.assertEquals(len(os.listdir(t)), 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/test_slogging/test_stats_processor.py b/test_slogging/test_stats_processor.py new file mode 100644 index 0000000..c3af1c1 --- /dev/null +++ b/test_slogging/test_stats_processor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import stats_processor + + +class TestStatsProcessor(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main()