initial code copy

This commit is contained in:
John Dickinson 2011-06-15 15:19:36 -05:00
parent ad355e87d5
commit 4e9e2b65a9
27 changed files with 3702 additions and 0 deletions

0
.unittests Executable file
View File

4
AUTHORS Normal file
View File

@ -0,0 +1,4 @@
John Dickinson
Clay Gerrard
David Goetz
Greg Lange

4
CHANGELOG Normal file
View File

@ -0,0 +1,4 @@
slogging (1.0)
- initial release since separation from swift project
(http://swift.openstack.org)

202
LICENSE Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

27
bin/swift-account-stats-logger Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.stats.db_stats_collector import AccountStatsCollector
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options()
# currently AccountStatsCollector only supports run_once
options['once'] = True
run_daemon(AccountStatsCollector, conf_file,
section_name='log-processor-stats',
log_name="account-stats", **options)

View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.stats.db_stats_collector import ContainerStatsCollector
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options()
# currently ContainerStatsCollector only supports run_once
options['once'] = True
run_daemon(ContainerStatsCollector, conf_file,
section_name='log-processor-container-stats',
log_name="container-stats", **options)

35
bin/swift-log-stats-collector Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from optparse import OptionParser
from swift.stats.log_processor import LogProcessorDaemon
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
parser = OptionParser(usage='Usage: %prog [options] <conf_file>')
parser.add_option('--lookback_hours', type='int', dest='lookback_hours',
help='Hours in the past to start looking for log files')
parser.add_option('--lookback_window', type='int', dest='lookback_window',
help='Hours past lookback_hours to stop looking for log files')
conf_file, options = parse_options(parser)
# currently the LogProcessorDaemon only supports run_once
options['once'] = True
run_daemon(LogProcessorDaemon, conf_file, section_name=None,
log_name='log-stats-collector', **options)

49
bin/swift-log-uploader Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from optparse import OptionParser
from swift.stats.log_uploader import LogUploader
from swift.common.utils import parse_options
from swift.common import utils
if __name__ == '__main__':
parser = OptionParser("Usage: %prog CONFIG_FILE PLUGIN")
parser.add_option('-c', '--log_cutoff',
help='Override new_log_cutoff.')
parser.add_option('-x', '--regex',
help='Override source_filename_pattern regex.')
conf_file, options = parse_options(parser=parser)
try:
plugin = options['extra_args'][0]
except (IndexError, KeyError):
print "Error: missing plugin name"
sys.exit(1)
uploader_conf = utils.readconf(conf_file, 'log-processor')
section_name = 'log-processor-%s' % plugin
plugin_conf = utils.readconf(conf_file, section_name)
uploader_conf.update(plugin_conf)
# pre-configure logger
logger = utils.get_logger(uploader_conf, log_route='log-uploader',
log_to_console=options.get('verbose', False))
# currently LogUploader only supports run_once
options['once'] = True
regex = options.get('regex')
cutoff = options.get('log_cutoff')
uploader = LogUploader(uploader_conf, plugin,
regex=regex, cutoff=cutoff).run(**options)

View File

@ -0,0 +1,57 @@
# plugin section format is named "log-processor-<plugin>"
[log-processor]
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# container_name = log_processing_data
# proxy_server_conf = /etc/swift/proxy-server.conf
# log_facility = LOG_LOCAL0
# log_level = INFO
# lookback_hours = 120
# lookback_window = 120
# user = swift
[log-processor-access]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data
source_filename_pattern = ^
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.access_processor.AccessLogProcessor
# service ips is for client ip addresses that should be counted as servicenet
# service_ips =
# load balancer private ips is for load balancer ip addresses that should be
# counted as servicenet
# lb_private_ips =
# server_name = proxy-server
# user = swift
# warn_percent = 0.8
[log-processor-stats]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
# devices = /srv/node
# mount_check = true
# user = swift
[log-processor-container-stats]
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = container_stats
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
processable = false
# devices = /srv/node
# mount_check = true
# user = swift
# metadata_keys = comma separated list of user metadata keys to be collected

23
setup.cfg Normal file
View File

@ -0,0 +1,23 @@
[build_sphinx]
all_files = 1
build-dir = doc/build
source-dir = doc/source
[egg_info]
tag_build =
tag_date = 0
tag_svn_revision = 0
[compile_catalog]
directory = locale
domain = swauth
[update_catalog]
domain = slogging
output_dir = locale
input_file = locale/slogging.pot
[extract_messages]
keywords = _ l_ lazy_gettext
mapping_file = babel.cfg
output_file = locale/slogging.pot

65
setup.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup, find_packages
from setuptools.command.sdist import sdist
import os
import subprocess
try:
from babel.messages import frontend
except ImportError:
frontend = None
from slogging import __version__ as version
name = 'slogging'
cmdclass = {'sdist': local_sdist}
if frontend:
cmdclass.update({
'compile_catalog': frontend.compile_catalog,
'extract_messages': frontend.extract_messages,
'init_catalog': frontend.init_catalog,
'update_catalog': frontend.update_catalog,
})
setup(
name=name,
version=version,
description='Slogging',
license='Apache License (2.0)',
author='OpenStack, LLC.',
author_email='me@not.mn',
url='https://github.com/notmyname/slogging',
packages=find_packages(exclude=['test_slogging', 'bin']),
test_suite='nose.collector',
cmdclass=cmdclass,
classifiers=[
'Development Status :: 4 - Beta',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 2.6',
'Environment :: No Input/Output (Daemon)',
],
install_requires=[], # removed for better compat
scripts=[
],
)

0
slogging/__init__.py Normal file
View File

View File

@ -0,0 +1,250 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from urllib import unquote
import copy
from swift.common.utils import split_path, get_logger
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
LISTING_PARAMS = set(
'path limit format delimiter marker end_marker prefix'.split())
class AccessLogProcessor(object):
"""Transform proxy server access logs"""
def __init__(self, conf):
self.server_name = conf.get('server_name', 'proxy-server')
self.lb_private_ips = [x.strip() for x in \
conf.get('lb_private_ips', '').split(',')\
if x.strip()]
self.service_ips = [x.strip() for x in \
conf.get('service_ips', '').split(',')\
if x.strip()]
self.warn_percent = float(conf.get('warn_percent', '0.8'))
self.logger = get_logger(conf, log_route='access-processor')
def log_line_parser(self, raw_log):
'''given a raw access log line, return a dict of the good parts'''
d = {}
try:
(unused,
server,
client_ip,
lb_ip,
timestamp,
method,
request,
http_version,
code,
referrer,
user_agent,
auth_token,
bytes_in,
bytes_out,
etag,
trans_id,
headers,
processing_time) = (unquote(x) for x in
raw_log[16:].split(' ')[:18])
except ValueError:
self.logger.debug(_('Bad line data: %s') % repr(raw_log))
return {}
if server != self.server_name:
# incorrect server name in log line
self.logger.debug(_('Bad server name: found "%(found)s" ' \
'expected "%(expected)s"') %
{'found': server, 'expected': self.server_name})
return {}
try:
(version, account, container_name, object_name) = \
split_path(request, 2, 4, True)
except ValueError, e:
self.logger.debug(_('Invalid path: %(error)s from data: %(log)s') %
{'error': e, 'log': repr(raw_log)})
return {}
if container_name is not None:
container_name = container_name.split('?', 1)[0]
if object_name is not None:
object_name = object_name.split('?', 1)[0]
account = account.split('?', 1)[0]
query = None
if '?' in request:
request, query = request.split('?', 1)
args = query.split('&')
# Count each query argument. This is used later to aggregate
# the number of format, prefix, etc. queries.
for q in args:
if '=' in q:
k, v = q.split('=', 1)
else:
k = q
# Certain keys will get summmed in stats reporting
# (format, path, delimiter, etc.). Save a "1" here
# to indicate that this request is 1 request for
# its respective key.
if k in LISTING_PARAMS:
d[k] = 1
d['client_ip'] = client_ip
d['lb_ip'] = lb_ip
d['method'] = method
d['request'] = request
if query:
d['query'] = query
d['http_version'] = http_version
d['code'] = code
d['referrer'] = referrer
d['user_agent'] = user_agent
d['auth_token'] = auth_token
d['bytes_in'] = bytes_in
d['bytes_out'] = bytes_out
d['etag'] = etag
d['trans_id'] = trans_id
d['processing_time'] = processing_time
day, month, year, hour, minute, second = timestamp.split('/')
d['day'] = day
month = ('%02s' % month_map.index(month)).replace(' ', '0')
d['month'] = month
d['year'] = year
d['hour'] = hour
d['minute'] = minute
d['second'] = second
d['tz'] = '+0000'
d['account'] = account
d['container_name'] = container_name
d['object_name'] = object_name
d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
d['code'] = int(d['code'])
return d
def process(self, obj_stream, data_object_account, data_object_container,
data_object_name):
'''generate hourly groupings of data from one access log file'''
hourly_aggr_info = {}
total_lines = 0
bad_lines = 0
for line in obj_stream:
line_data = self.log_line_parser(line)
total_lines += 1
if not line_data:
bad_lines += 1
continue
account = line_data['account']
container_name = line_data['container_name']
year = line_data['year']
month = line_data['month']
day = line_data['day']
hour = line_data['hour']
bytes_out = line_data['bytes_out']
bytes_in = line_data['bytes_in']
method = line_data['method']
code = int(line_data['code'])
object_name = line_data['object_name']
client_ip = line_data['client_ip']
op_level = None
if not container_name:
op_level = 'account'
elif container_name and not object_name:
op_level = 'container'
elif object_name:
op_level = 'object'
aggr_key = (account, year, month, day, hour)
d = hourly_aggr_info.get(aggr_key, {})
if line_data['lb_ip'] in self.lb_private_ips:
source = 'service'
else:
source = 'public'
if line_data['client_ip'] in self.service_ips:
source = 'service'
d[(source, 'bytes_out')] = d.setdefault((
source, 'bytes_out'), 0) + bytes_out
d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
bytes_in
d['format_query'] = d.setdefault('format_query', 0) + \
line_data.get('format', 0)
d['marker_query'] = d.setdefault('marker_query', 0) + \
line_data.get('marker', 0)
d['prefix_query'] = d.setdefault('prefix_query', 0) + \
line_data.get('prefix', 0)
d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
line_data.get('delimiter', 0)
path = line_data.get('path', 0)
d['path_query'] = d.setdefault('path_query', 0) + path
code = '%dxx' % (code / 100)
key = (source, op_level, method, code)
d[key] = d.setdefault(key, 0) + 1
hourly_aggr_info[aggr_key] = d
if bad_lines > (total_lines * self.warn_percent):
name = '/'.join([data_object_account, data_object_container,
data_object_name])
self.logger.warning(_('I found a bunch of bad lines in %(name)s '\
'(%(bad)d bad, %(total)d total)') %
{'name': name, 'bad': bad_lines, 'total': total_lines})
return hourly_aggr_info
def keylist_mapping(self):
source_keys = 'service public'.split()
level_keys = 'account container object'.split()
verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
code_keys = '2xx 4xx 5xx'.split()
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'service_bw_in': ('service', 'bytes_in'),
'service_bw_out': ('service', 'bytes_out'),
'public_bw_in': ('public', 'bytes_in'),
'public_bw_out': ('public', 'bytes_out'),
'account_requests': set(),
'container_requests': set(),
'object_requests': set(),
'service_request': set(),
'public_request': set(),
'ops_count': set(),
}
for verb in verb_keys:
keylist_mapping[verb] = set()
for code in code_keys:
keylist_mapping[code] = set()
for source in source_keys:
for level in level_keys:
for verb in verb_keys:
for code in code_keys:
keylist_mapping['account_requests'].add(
(source, 'account', verb, code))
keylist_mapping['container_requests'].add(
(source, 'container', verb, code))
keylist_mapping['object_requests'].add(
(source, 'object', verb, code))
keylist_mapping['service_request'].add(
('service', level, verb, code))
keylist_mapping['public_request'].add(
('public', level, verb, code))
keylist_mapping[verb].add(
(source, level, verb, code))
keylist_mapping[code].add(
(source, level, verb, code))
keylist_mapping['ops_count'].add(
(source, level, verb, code))
return keylist_mapping

View File

@ -0,0 +1,73 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import zlib
import struct
class CompressingFileReader(object):
'''
Wraps a file object and provides a read method that returns gzip'd data.
One warning: if read is called with a small value, the data returned may
be bigger than the value. In this case, the "compressed" data will be
bigger than the original data. To solve this, use a bigger read buffer.
An example use case:
Given an uncompressed file on disk, provide a way to read compressed data
without buffering the entire file data in memory. Using this class, an
uncompressed log file could be uploaded as compressed data with chunked
transfer encoding.
gzip header and footer code taken from the python stdlib gzip module
:param file_obj: File object to read from
:param compresslevel: compression level
'''
def __init__(self, file_obj, compresslevel=9):
self._f = file_obj
self._compressor = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self.done = False
self.first = True
self.crc32 = 0
self.total_size = 0
def read(self, *a, **kw):
if self.done:
return ''
x = self._f.read(*a, **kw)
if x:
self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
self.total_size += len(x)
compressed = self._compressor.compress(x)
if not compressed:
compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
compressed = self._compressor.flush(zlib.Z_FINISH)
crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
size = struct.pack("<L", self.total_size & 0xffffffffL)
footer = crc32 + size
compressed += footer
self.done = True
if self.first:
self.first = False
header = '\037\213\010\000\000\000\000\000\002\377'
compressed = header + compressed
return compressed

View File

@ -0,0 +1,177 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from paste.deploy import appconfig
import shutil
import hashlib
import urllib
from swift.account.server import DATADIR as account_server_data_dir
from swift.container.server import DATADIR as container_server_data_dir
from swift.common.db import AccountBroker, ContainerBroker
from swift.common.utils import renamer, get_logger, readconf, mkdirs, \
TRUE_VALUES, remove_file
from swift.common.constraints import check_mount
from swift.common.daemon import Daemon
class DatabaseStatsCollector(Daemon):
"""
Extract storage stats from account databases on the account
storage nodes
Any subclasses must define the function get_data.
"""
def __init__(self, stats_conf, stats_type, data_dir, filename_format):
super(DatabaseStatsCollector, self).__init__(stats_conf)
self.stats_type = stats_type
self.data_dir = data_dir
self.filename_format = filename_format
self.devices = stats_conf.get('devices', '/srv/node')
self.mount_check = stats_conf.get('mount_check',
'true').lower() in TRUE_VALUES
self.target_dir = stats_conf.get('log_dir', '/var/log/swift')
mkdirs(self.target_dir)
self.logger = get_logger(stats_conf,
log_route='%s-stats' % stats_type)
def run_once(self, *args, **kwargs):
self.logger.info(_("Gathering %s stats" % self.stats_type))
start = time.time()
self.find_and_process()
self.logger.info(_("Gathering %s stats complete (%0.2f minutes)") %
(self.stats_type, (time.time() - start) / 60))
def get_data(self):
raise NotImplementedError('Subclasses must override')
def get_header(self):
raise NotImplementedError('Subclasses must override')
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
working_dir = os.path.join(self.target_dir,
'.%-stats_tmp' % self.stats_type)
shutil.rmtree(working_dir, ignore_errors=True)
mkdirs(working_dir)
tmp_filename = os.path.join(working_dir, src_filename)
hasher = hashlib.md5()
try:
with open(tmp_filename, 'wb') as statfile:
statfile.write(self.get_header())
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices,
device):
self.logger.error(
_("Device %s is not mounted, skipping.") % device)
continue
db_dir = os.path.join(self.devices, device, self.data_dir)
if not os.path.exists(db_dir):
self.logger.debug(
_("Path %s does not exist, skipping.") % db_dir)
continue
for root, dirs, files in os.walk(db_dir, topdown=False):
for filename in files:
if filename.endswith('.db'):
db_path = os.path.join(root, filename)
line_data = self.get_data(db_path)
if line_data:
statfile.write(line_data)
hasher.update(line_data)
src_filename += hasher.hexdigest()
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
finally:
shutil.rmtree(working_dir, ignore_errors=True)
class AccountStatsCollector(DatabaseStatsCollector):
"""
Extract storage stats from account databases on the account
storage nodes
"""
def __init__(self, stats_conf):
super(AccountStatsCollector, self).__init__(stats_conf, 'account',
account_server_data_dir,
'stats-%Y%m%d%H_')
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Count, Object Count, Bytes Used
"""
line_data = None
broker = AccountBroker(db_path)
if not broker.is_deleted():
info = broker.get_info()
line_data = '"%s",%d,%d,%d\n' % (info['account'],
info['container_count'],
info['object_count'],
info['bytes_used'])
return line_data
def get_header(self):
return ''
class ContainerStatsCollector(DatabaseStatsCollector):
"""
Extract storage stats from container databases on the container
storage nodes
"""
def __init__(self, stats_conf):
super(ContainerStatsCollector, self).__init__(stats_conf, 'container',
container_server_data_dir,
'container-stats-%Y%m%d%H_')
# webob calls title on all the header keys
self.metadata_keys = ['X-Container-Meta-%s' % mkey.strip().title()
for mkey in stats_conf.get('metadata_keys', '').split(',')
if mkey.strip()]
def get_header(self):
header = 'Account Hash,Container Name,Object Count,Bytes Used'
if self.metadata_keys:
xtra_headers = ','.join(self.metadata_keys)
header += ',%s' % xtra_headers
header += '\n'
return header
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Name, Object Count, Bytes Used
This will just collect whether or not the metadata is set
using a 1 or ''.
"""
line_data = None
broker = ContainerBroker(db_path)
if not broker.is_deleted():
info = broker.get_info(include_metadata=bool(self.metadata_keys))
encoded_container_name = urllib.quote(info['container'])
line_data = '"%s","%s",%d,%d' % (
info['account'], encoded_container_name,
info['object_count'], info['bytes_used'])
if self.metadata_keys:
metadata_results = ','.join(
[info['metadata'].get(mkey) and '1' or ''
for mkey in self.metadata_keys])
line_data += ',%s' % metadata_results
line_data += '\n'
return line_data

210
slogging/internal_proxy.py Normal file
View File

@ -0,0 +1,210 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import webob
from urllib import quote, unquote
from json import loads as json_loads
from swift.common.compressing_file_reader import CompressingFileReader
from swift.proxy.server import BaseApplication
class MemcacheStub(object):
def get(self, *a, **kw): # pragma: no cover
return None
def set(self, *a, **kw): # pragma: no cover
return None
def incr(self, *a, **kw): # pragma: no cover
return 0
def delete(self, *a, **kw): # pragma: no cover
return None
def set_multi(self, *a, **kw): # pragma: no cover
return None
def get_multi(self, *a, **kw): # pragma: no cover
return []
def make_request_body_file(source_file, compress=True):
if hasattr(source_file, 'seek'):
source_file.seek(0)
else:
source_file = open(source_file, 'rb')
if compress:
compressed_file = CompressingFileReader(source_file)
return compressed_file
return source_file
def webob_request_copy(orig_req, source_file=None, compress=True):
req_copy = orig_req.copy()
if source_file:
req_copy.body_file = make_request_body_file(source_file,
compress=compress)
req_copy.content_length = orig_req.content_length
return req_copy
class InternalProxy(object):
"""
Set up a private instance of a proxy server that allows normal requests
to be made without having to actually send the request to the proxy.
This also doesn't log the requests to the normal proxy logs.
:param proxy_server_conf: proxy server configuration dictionary
:param logger: logger to log requests to
:param retries: number of times to retry each request
"""
def __init__(self, proxy_server_conf=None, logger=None, retries=0):
self.upload_app = BaseApplication(proxy_server_conf,
memcache=MemcacheStub(),
logger=logger)
self.retries = retries
def _handle_request(self, req, source_file=None, compress=True):
req = self.upload_app.update_request(req)
req_copy = webob_request_copy(req, source_file=source_file,
compress=compress)
resp = self.upload_app.handle_request(req_copy)
tries = 1
while (resp.status_int < 200 or resp.status_int > 299) \
and tries < self.retries:
req_copy = webob_request_copy(req, source_file=source_file,
compress=compress)
resp = self.upload_app.handle_request(req_copy)
tries += 1
return resp
def upload_file(self, source_file, account, container, object_name,
compress=True, content_type='application/x-gzip',
etag=None):
"""
Upload a file to cloud files.
:param source_file: path to or file like object to upload
:param account: account to upload to
:param container: container to upload to
:param object_name: name of object being uploaded
:param compress: if True, compresses object as it is uploaded
:param content_type: content-type of object
:param etag: etag for object to check successful upload
:returns: True if successful, False otherwise
"""
target_name = '/v1/%s/%s/%s' % (account, container, object_name)
# create the container
if not self.create_container(account, container):
return False
# upload the file to the account
req = webob.Request.blank(target_name, content_type=content_type,
environ={'REQUEST_METHOD': 'PUT'},
headers={'Transfer-Encoding': 'chunked'})
req.content_length = None # to make sure we send chunked data
if etag:
req.headers['etag'] = etag
resp = self._handle_request(req, source_file=source_file,
compress=compress)
if not (200 <= resp.status_int < 300):
return False
return True
def get_object(self, account, container, object_name):
"""
Get object.
:param account: account name object is in
:param container: container name object is in
:param object_name: name of object to get
:returns: iterator for object data
"""
req = webob.Request.blank('/v1/%s/%s/%s' %
(account, container, object_name),
environ={'REQUEST_METHOD': 'GET'})
resp = self._handle_request(req)
return resp.status_int, resp.app_iter
def create_container(self, account, container):
"""
Create container.
:param account: account name to put the container in
:param container: container name to create
:returns: True if successful, otherwise False
"""
req = webob.Request.blank('/v1/%s/%s' % (account, container),
environ={'REQUEST_METHOD': 'PUT'})
resp = self._handle_request(req)
return 200 <= resp.status_int < 300
def get_container_list(self, account, container, marker=None,
end_marker=None, limit=None, prefix=None,
delimiter=None, full_listing=True):
"""
Get a listing of objects for the container.
:param account: account name for the container
:param container: container name to get a listing for
:param marker: marker query
:param end_marker: end marker query
:param limit: limit query
:param prefix: prefix query
:param delimeter: string to delimit the queries on
:param full_listing: if True, return a full listing, else returns a max
of 10000 listings
:returns: list of objects
"""
if full_listing:
rv = []
listing = self.get_container_list(account, container, marker,
end_marker, limit, prefix,
delimiter, full_listing=False)
while listing:
rv.extend(listing)
if not delimiter:
marker = listing[-1]['name']
else:
marker = listing[-1].get('name', listing[-1].get('subdir'))
listing = self.get_container_list(account, container, marker,
end_marker, limit, prefix,
delimiter,
full_listing=False)
return rv
path = '/v1/%s/%s' % (account, quote(container))
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if end_marker:
qs += '&end_marker=%s' % quote(end_marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
if delimiter:
qs += '&delimiter=%s' % quote(delimiter)
path += '?%s' % qs
req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
resp = self._handle_request(req)
if resp.status_int < 200 or resp.status_int >= 300:
return [] # TODO: distinguish between 404 and empty container
if resp.status_int == 204:
return []
return json_loads(resp.body)

581
slogging/log_processor.py Normal file
View File

@ -0,0 +1,581 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import zlib
import time
import datetime
import cStringIO
import collections
from paste.deploy import appconfig
import multiprocessing
import Queue
import cPickle
import hashlib
from swift.common.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf, TRUE_VALUES
from swift.common.daemon import Daemon
now = datetime.datetime.now
class BadFileDownload(Exception):
def __init__(self, status_code=None):
self.status_code = status_code
class LogProcessor(object):
"""Load plugins, process logs"""
def __init__(self, conf, logger):
if isinstance(logger, tuple):
self.logger = get_logger(*logger, log_route='log-processor')
else:
self.logger = logger
self.conf = conf
self._internal_proxy = None
# load the processing plugins
self.plugins = {}
plugin_prefix = 'log-processor-'
for section in (x for x in conf if x.startswith(plugin_prefix)):
plugin_name = section[len(plugin_prefix):]
plugin_conf = conf.get(section, {})
if plugin_conf.get('processable', 'true').lower() not in \
TRUE_VALUES:
continue
self.plugins[plugin_name] = plugin_conf
class_path = self.plugins[plugin_name]['class_path']
import_target, class_name = class_path.rsplit('.', 1)
module = __import__(import_target, fromlist=[import_target])
klass = getattr(module, class_name)
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
self.logger.debug(_('Loaded plugin "%s"') % plugin_name)
@property
def internal_proxy(self):
if self._internal_proxy is None:
stats_conf = self.conf.get('log-processor', {})
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig(
'config:%s' % proxy_server_conf_loc,
name='proxy-server')
self._internal_proxy = InternalProxy(proxy_server_conf,
self.logger,
retries=3)
return self._internal_proxy
def process_one_file(self, plugin_name, account, container, object_name):
self.logger.info(_('Processing %(obj)s with plugin "%(plugin)s"') %
{'obj': '/'.join((account, container, object_name)),
'plugin': plugin_name})
# get an iter of the object data
compressed = object_name.endswith('.gz')
stream = self.get_object_data(account, container, object_name,
compressed=compressed)
# look up the correct plugin and send the stream to it
return self.plugins[plugin_name]['instance'].process(stream,
account,
container,
object_name)
def get_data_list(self, start_date=None, end_date=None,
listing_filter=None):
total_list = []
for plugin_name, data in self.plugins.items():
account = data['swift_account']
container = data['container_name']
listing = self.get_container_listing(account,
container,
start_date,
end_date)
for object_name in listing:
# The items in this list end up being passed as positional
# parameters to process_one_file.
x = (plugin_name, account, container, object_name)
if x not in listing_filter:
total_list.append(x)
return total_list
def get_container_listing(self, swift_account, container_name,
start_date=None, end_date=None,
listing_filter=None):
'''
Get a container listing, filtered by start_date, end_date, and
listing_filter. Dates, if given, must be in YYYYMMDDHH format
'''
search_key = None
if start_date is not None:
try:
parsed_date = time.strptime(start_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
hour = '%02d' % parsed_date.tm_hour
search_key = '/'.join([year, month, day, hour])
end_key = None
if end_date is not None:
try:
parsed_date = time.strptime(end_date, '%Y%m%d%H')
except ValueError:
pass
else:
year = '%04d' % parsed_date.tm_year
month = '%02d' % parsed_date.tm_mon
day = '%02d' % parsed_date.tm_mday
# Since the end_marker filters by <, we need to add something
# to make sure we get all the data under the last hour. Adding
# one to the hour should be all-inclusive.
hour = '%02d' % (parsed_date.tm_hour + 1)
end_key = '/'.join([year, month, day, hour])
container_listing = self.internal_proxy.get_container_list(
swift_account,
container_name,
marker=search_key,
end_marker=end_key)
results = []
if listing_filter is None:
listing_filter = set()
for item in container_listing:
name = item['name']
if name not in listing_filter:
results.append(name)
return results
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
raise BadFileDownload(code)
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
# Python decompressing gzip chunk-by-chunk
# http://stackoverflow.com/questions/2423866
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
try:
for chunk in o:
if compressed:
try:
chunk = d.decompress(chunk)
except zlib.error:
self.logger.debug(_('Bad compressed data for %s')
% '/'.join((swift_account, container_name,
object_name)))
raise BadFileDownload() # bad compressed data
parts = chunk.split('\n')
parts[0] = last_part + parts[0]
for part in parts[:-1]:
yield part
last_part = parts[-1]
if last_part:
yield last_part
except ChunkReadTimeout:
raise BadFileDownload()
def generate_keylist_mapping(self):
keylist = {}
for plugin in self.plugins:
plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
if not plugin_keylist:
continue
for k, v in plugin_keylist.items():
o = keylist.get(k)
if o:
if isinstance(o, set):
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = set(o)
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = v
keylist[k] = o
return keylist
class LogProcessorDaemon(Daemon):
"""
Gather raw log data and farm proccessing to generate a csv that is
uploaded to swift.
"""
def __init__(self, conf):
c = conf.get('log-processor')
super(LogProcessorDaemon, self).__init__(c)
self.total_conf = conf
self.logger = get_logger(c, log_route='log-processor')
self.log_processor = LogProcessor(conf, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',
str(self.lookback_hours)))
self.log_processor_account = c['swift_account']
self.log_processor_container = c.get('container_name',
'log_processing_data')
self.worker_count = int(c.get('worker_count', '1'))
self._keylist_mapping = None
self.processed_files_filename = 'processed_files.pickle.gz'
def get_lookback_interval(self):
"""
:returns: lookback_start, lookback_end.
Both or just lookback_end can be None. Otherwise, returns strings
of the form 'YYYYMMDDHH'. The interval returned is used as bounds
when looking for logs to processes.
A returned None means don't limit the log files examined on that
side of the interval.
"""
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = datetime.timedelta(hours=self.lookback_hours)
lookback_start = now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = datetime.timedelta(hours=self.lookback_window)
lookback_end = now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
return lookback_start, lookback_end
def get_processed_files_list(self):
"""
:returns: a set of files that have already been processed or returns
None on error.
Downloads the set from the stats account. Creates an empty set if
the an existing file cannot be found.
"""
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
# running), one could manually prune the file to remove older
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
stream = self.log_processor.get_object_data(
self.log_processor_account,
self.log_processor_container,
self.processed_files_filename,
compressed=True)
buf = '\n'.join(x for x in stream)
if buf:
files = cPickle.loads(buf)
else:
return None
except BadFileDownload, err:
if err.status_code == 404:
files = set()
else:
return None
return files
def get_aggregate_data(self, processed_files, input_data):
"""
Aggregates stats data by account/hour, summing as needed.
:param processed_files: set of processed files
:param input_data: is the output from multiprocess_collate/the plugins.
:returns: A dict containing data aggregated from the input_data
passed in.
The dict returned has tuple keys of the form:
(account, year, month, day, hour)
The dict returned has values that are dicts with items of this
form:
key:field_value
- key corresponds to something in one of the plugin's keylist
mapping, something like the tuple (source, level, verb, code)
- field_value is the sum of the field_values for the
corresponding values in the input
Both input_data and the dict returned are hourly aggregations of
stats.
Multiple values for the same (account, hour, tuple key) found in
input_data are summed in the dict returned.
"""
aggr_data = {}
for item, data in input_data:
# since item contains the plugin and the log name, new plugins will
# "reprocess" the file and the results will be in the final csv.
processed_files.add(item)
for k, d in data.items():
existing_data = aggr_data.get(k, {})
for i, j in d.items():
current = existing_data.get(i, 0)
# merging strategy for key collisions is addition
# processing plugins need to realize this
existing_data[i] = current + j
aggr_data[k] = existing_data
return aggr_data
def get_final_info(self, aggr_data):
"""
Aggregates data from aggr_data based on the keylist mapping.
:param aggr_data: The results of the get_aggregate_data function.
:returns: a dict of further aggregated data
The dict returned has keys of the form:
(account, year, month, day, hour)
The dict returned has values that are dicts with items of this
form:
'field_name': field_value (int)
Data is aggregated as specified by the keylist mapping. The
keylist mapping specifies which keys to combine in aggr_data
and the final field_names for these combined keys in the dict
returned. Fields combined are summed.
"""
final_info = collections.defaultdict(dict)
for account, data in aggr_data.items():
for key, mapping in self.keylist_mapping.items():
if isinstance(mapping, (list, set)):
value = 0
for k in mapping:
try:
value += data[k]
except KeyError:
pass
else:
try:
value = data[mapping]
except KeyError:
value = 0
final_info[account][key] = value
return final_info
def store_processed_files_list(self, processed_files):
"""
Stores the proccessed files list in the stats account.
:param processed_files: set of processed files
"""
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
self.processed_files_filename)
def get_output(self, final_info):
"""
:returns: a list of rows to appear in the csv file.
The first row contains the column headers for the rest of the
rows in the returned list.
Each row after the first row corresponds to an account's data
for that hour.
"""
sorted_keylist_mapping = sorted(self.keylist_mapping)
columns = ['data_ts', 'account'] + sorted_keylist_mapping
output = [columns]
for (account, year, month, day, hour), d in final_info.items():
data_ts = '%04d/%02d/%02d %02d:00:00' % \
(int(year), int(month), int(day), int(hour))
row = [data_ts, '%s' % (account)]
for k in sorted_keylist_mapping:
row.append(str(d[k]))
output.append(row)
return output
def store_output(self, output):
"""
Takes the a list of rows and stores a csv file of the values in the
stats account.
:param output: list of rows to appear in the csv file
This csv file is final product of this script.
"""
out_buf = '\n'.join([','.join(row) for row in output])
h = hashlib.md5(out_buf).hexdigest()
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
f = cStringIO.StringIO(out_buf)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
upload_name)
@property
def keylist_mapping(self):
"""
:returns: the keylist mapping.
The keylist mapping determines how the stats fields are aggregated
in the final aggregation step.
"""
if self._keylist_mapping == None:
self._keylist_mapping = \
self.log_processor.generate_keylist_mapping()
return self._keylist_mapping
def process_logs(self, logs_to_process, processed_files):
"""
:param logs_to_process: list of logs to process
:param processed_files: set of processed files
:returns: returns a list of rows of processed data.
The first row is the column headers. The rest of the rows contain
hourly aggregate data for the account specified in the row.
Files processed are added to the processed_files set.
When a large data structure is no longer needed, it is deleted in
an effort to conserve memory.
"""
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
self.worker_count)
# reduce
aggr_data = self.get_aggregate_data(processed_files, results)
del results
# group
# reduce a large number of keys in aggr_data[k] to a small
# number of output keys
final_info = self.get_final_info(aggr_data)
del aggr_data
# output
return self.get_output(final_info)
def run_once(self, *args, **kwargs):
"""
Process log files that fall within the lookback interval.
Upload resulting csv file to stats account.
Update processed files list and upload to stats account.
"""
for k in 'lookback_hours lookback_window'.split():
if k in kwargs and kwargs[k] is not None:
setattr(self, k, kwargs[k])
start = time.time()
self.logger.info(_("Beginning log processing"))
lookback_start, lookback_end = self.get_lookback_interval()
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
processed_files = self.get_processed_files_list()
if processed_files == None:
self.logger.error(_('Log processing unable to load list of '
'already processed log files'))
return
self.logger.debug(_('found %d processed files') %
len(processed_files))
logs_to_process = self.log_processor.get_data_list(lookback_start,
lookback_end, processed_files)
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
if logs_to_process:
output = self.process_logs(logs_to_process, processed_files)
self.store_output(output)
del output
self.store_processed_files_list(processed_files)
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
list.
'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _junk in range(worker_count):
p = multiprocessing.Process(target=collate_worker,
args=(processor_args,
in_queue,
out_queue))
p.start()
results.append(p)
for x in logs_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None) # tell the worker to end
while True:
try:
item, data = out_queue.get_nowait()
except Queue.Empty:
time.sleep(.01)
else:
if not isinstance(data, Exception):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
ret = p.process_one_file(*item)
except Exception, err:
item_string = '/'.join(item[1:])
p.logger.exception("Unable to process file '%s'" % (item_string))
ret = err
out_queue.put((item, ret))

188
slogging/log_uploader.py Normal file
View File

@ -0,0 +1,188 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import hashlib
import time
import gzip
import re
import sys
from paste.deploy import appconfig
from swift.common.internal_proxy import InternalProxy
from swift.common.daemon import Daemon
from swift.common import utils
class LogUploader(Daemon):
'''
Given a local directory, a swift account, and a container name, LogParser
will upload all files in the local directory to the given account/
container. All but the newest files will be uploaded, and the files' md5
sum will be computed. The hash is used to prevent duplicate data from
being uploaded multiple times in different files (ex: log lines). Since
the hash is computed, it is also used as the uploaded object's etag to
ensure data integrity.
Note that after the file is successfully uploaded, it will be unlinked.
The given proxy server config is used to instantiate a proxy server for
the object uploads.
The default log file format is: plugin_name-%Y%m%d%H* . Any other format
of log file names must supply a regular expression that defines groups
for year, month, day, and hour. The regular expression will be evaluated
with re.VERBOSE. A common example may be:
source_filename_pattern = ^cdn_logger-
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
'''
def __init__(self, uploader_conf, plugin_name, regex=None, cutoff=None):
super(LogUploader, self).__init__(uploader_conf)
log_name = '%s-log-uploader' % plugin_name
self.logger = utils.get_logger(uploader_conf, log_name,
log_route=plugin_name)
self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
self.swift_account = uploader_conf['swift_account']
self.container_name = uploader_conf['container_name']
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
name='proxy-server')
self.internal_proxy = InternalProxy(proxy_server_conf)
self.new_log_cutoff = int(cutoff or
uploader_conf.get('new_log_cutoff', '7200'))
self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \
utils.TRUE_VALUES
self.filename_pattern = regex or \
uploader_conf.get('source_filename_pattern',
'''
^%s-
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$''' % plugin_name)
def run_once(self, *args, **kwargs):
self.logger.info(_("Uploading logs"))
start = time.time()
self.upload_all_logs()
self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
((time.time() - start) / 60))
def get_relpath_to_files_under_log_dir(self):
"""
Look under log_dir recursively and return all filenames as relpaths
:returns : list of strs, the relpath to all filenames under log_dir
"""
all_files = []
for path, dirs, files in os.walk(self.log_dir):
all_files.extend(os.path.join(path, f) for f in files)
return [os.path.relpath(f, start=self.log_dir) for f in all_files]
def filter_files(self, all_files):
"""
Filter files based on regex pattern
:param all_files: list of strs, relpath of the filenames under log_dir
:param pattern: regex pattern to match against filenames
:returns : dict mapping full path of file to match group dict
"""
filename2match = {}
found_match = False
for filename in all_files:
match = re.match(self.filename_pattern, filename, re.VERBOSE)
if match:
found_match = True
full_path = os.path.join(self.log_dir, filename)
filename2match[full_path] = match.groupdict()
else:
self.logger.debug(_('%(filename)s does not match '
'%(pattern)s') % {'filename': filename,
'pattern': self.filename_pattern})
return filename2match
def upload_all_logs(self):
"""
Match files under log_dir to source_filename_pattern and upload to
swift
"""
all_files = self.get_relpath_to_files_under_log_dir()
filename2match = self.filter_files(all_files)
if not filename2match:
self.logger.error(_('No files in %(log_dir)s match %(pattern)s') %
{'log_dir': self.log_dir,
'pattern': self.filename_pattern})
sys.exit(1)
if not self.internal_proxy.create_container(self.swift_account,
self.container_name):
self.logger.error(_('Unable to create container for '
'%(account)s/%(container)s') % {
'account': self.swift_account,
'container': self.container_name})
return
for filename, match in filename2match.items():
# don't process very new logs
seconds_since_mtime = time.time() - os.stat(filename).st_mtime
if seconds_since_mtime < self.new_log_cutoff:
self.logger.debug(_("Skipping log: %(file)s "
"(< %(cutoff)d seconds old)") % {
'file': filename,
'cutoff': self.new_log_cutoff})
continue
self.upload_one_log(filename, **match)
def upload_one_log(self, filename, year, month, day, hour):
"""
Upload one file to swift
"""
if os.path.getsize(filename) == 0:
self.logger.debug(_("Log %s is 0 length, skipping") % filename)
return
self.logger.debug(_("Processing log: %s") % filename)
filehash = hashlib.md5()
already_compressed = True if filename.endswith('.gz') else False
opener = gzip.open if already_compressed else open
f = opener(filename, 'rb')
try:
for line in f:
# filter out bad lines here?
filehash.update(line)
finally:
f.close()
filehash = filehash.hexdigest()
# By adding a hash to the filename, we ensure that uploaded files
# have unique filenames and protect against uploading one file
# more than one time. By using md5, we get an etag for free.
target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
if self.internal_proxy.upload_file(filename,
self.swift_account,
self.container_name,
target_filename,
compress=(not already_compressed)):
self.logger.debug(_("Uploaded log %(file)s to %(target)s") %
{'file': filename, 'target': target_filename})
if self.unlink_log:
os.unlink(filename)
else:
self.logger.error(_("ERROR: Upload of log %s failed!") % filename)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.common.utils import get_logger
class StatsLogProcessor(object):
"""Transform account storage stat logs"""
def __init__(self, conf):
self.logger = get_logger(conf, log_route='stats-processor')
def process(self, obj_stream, data_object_account, data_object_container,
data_object_name):
'''generate hourly groupings of data from one stats log file'''
account_totals = {}
year, month, day, hour, _junk = data_object_name.split('/')
for line in obj_stream:
if not line:
continue
try:
(account,
container_count,
object_count,
bytes_used) = line.split(',')
except (IndexError, ValueError):
# bad line data
self.logger.debug(_('Bad line data: %s') % repr(line))
continue
account = account.strip('"')
container_count = int(container_count.strip('"'))
object_count = int(object_count.strip('"'))
bytes_used = int(bytes_used.strip('"'))
aggr_key = (account, year, month, day, hour)
d = account_totals.get(aggr_key, {})
d['replica_count'] = d.setdefault('replica_count', 0) + 1
d['container_count'] = d.setdefault('container_count', 0) + \
container_count
d['object_count'] = d.setdefault('object_count', 0) + \
object_count
d['bytes_used'] = d.setdefault('bytes_used', 0) + \
bytes_used
account_totals[aggr_key] = d
return account_totals
def keylist_mapping(self):
'''
returns a dictionary of final keys mapped to source keys
'''
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'bytes_used': 'bytes_used',
'container_count': 'container_count',
'object_count': 'object_count',
'replica_count': 'replica_count',
}
return keylist_mapping

View File

View File

@ -0,0 +1,94 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import access_processor
class TestAccessProcessor(unittest.TestCase):
def test_log_line_parser_query_args(self):
p = access_processor.AccessLogProcessor({})
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
query = 'foo'
for param in access_processor.LISTING_PARAMS:
query += '&%s=blah' % param
log_line[6] = '/v1/a/c/o?%s' % query
log_line = 'x'*16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
for param in access_processor.LISTING_PARAMS:
expected[param] = 1
expected['query'] = query
self.assertEquals(res, expected)
def test_log_line_parser_field_count(self):
p = access_processor.AccessLogProcessor({})
# too few fields
log_line = [str(x) for x in range(17)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x'*16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {}
self.assertEquals(res, expected)
# right amount of fields
log_line = [str(x) for x in range(18)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x'*16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
self.assertEquals(res, expected)
# too many fields
log_line = [str(x) for x in range(19)]
log_line[1] = 'proxy-server'
log_line[4] = '1/Jan/3/4/5/6'
log_line[6] = '/v1/a/c/o'
log_line = 'x'*16 + ' '.join(log_line)
res = p.log_line_parser(log_line)
expected = {'code': 8, 'processing_time': '17', 'auth_token': '11',
'month': '01', 'second': '6', 'year': '3', 'tz': '+0000',
'http_version': '7', 'object_name': 'o', 'etag': '14',
'method': '5', 'trans_id': '15', 'client_ip': '2',
'bytes_out': 13, 'container_name': 'c', 'day': '1',
'minute': '5', 'account': 'a', 'hour': '4',
'referrer': '9', 'request': '/v1/a/c/o',
'user_agent': '10', 'bytes_in': 12, 'lb_ip': '3'}
self.assertEquals(res, expected)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,34 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Tests for swift.common.compressing_file_reader """
import unittest
import cStringIO
from swift.common.compressing_file_reader import CompressingFileReader
class TestCompressingFileReader(unittest.TestCase):
def test_read(self):
plain = 'obj\ndata'
s = cStringIO.StringIO(plain)
expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\
'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\
'\x00'
x = CompressingFileReader(s)
compressed = ''.join(iter(lambda: x.read(), ''))
self.assertEquals(compressed, expected)
self.assertEquals(x.read(), '')

View File

@ -0,0 +1,238 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import time
import uuid
from shutil import rmtree
from swift.stats import db_stats_collector
from tempfile import mkdtemp
from test.unit import FakeLogger
from swift.common.db import AccountBroker, ContainerBroker
from swift.common.utils import mkdirs
class TestDbStats(unittest.TestCase):
def setUp(self):
self._was_logger = db_stats_collector.get_logger
db_stats_collector.get_logger = FakeLogger
self.testdir = os.path.join(mkdtemp(), 'tmp_test_db_stats')
self.devices = os.path.join(self.testdir, 'node')
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
self.accounts = os.path.join(self.devices, 'sda', 'accounts')
self.containers = os.path.join(self.devices, 'sda', 'containers')
self.log_dir = '%s/log' % self.testdir
self.conf = dict(devices=self.devices,
log_dir=self.log_dir,
mount_check='false')
def tearDown(self):
db_stats_collector.get_logger = self._was_logger
rmtree(self.testdir)
def test_account_stat_get_data(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
account_db = AccountBroker("%s/acc.db" % self.accounts,
account='test_acc')
account_db.initialize()
account_db.put_container('test_container', time.time(),
None, 10, 1000)
info = stat.get_data("%s/acc.db" % self.accounts)
self.assertEquals('''"test_acc",1,10,1000\n''', info)
def test_container_stat_get_data(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
container_db = ContainerBroker("%s/con.db" % self.containers,
account='test_acc', container='test_con')
container_db.initialize()
container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
def test_container_stat_get_metadata(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
container_db = ContainerBroker("%s/con.db" % self.containers,
account='test_acc', container='test_con')
container_db.initialize()
container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
container_db.update_metadata({'test1': ('val', 1000)})
def _gen_account_stat(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
output_data = set()
for i in range(10):
account_db = AccountBroker("%s/stats-201001010%s-%s.db" %
(self.accounts, i, uuid.uuid4().hex),
account='test_acc_%s' % i)
account_db.initialize()
account_db.put_container('test_container', time.time(),
None, 10, 1000)
# this will "commit" the data
account_db.get_info()
output_data.add('''"test_acc_%s",1,10,1000''' % i),
self.assertEqual(len(output_data), 10)
return stat, output_data
def _drop_metadata_col(self, broker, acc_name):
broker.conn.execute('''drop table container_stat''')
broker.conn.executescript("""
CREATE TABLE container_stat (
account TEXT DEFAULT '%s',
container TEXT DEFAULT 'test_con',
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (1, 10);
""" % acc_name)
def _gen_container_stat(self, set_metadata=False, drop_metadata=False):
if set_metadata:
self.conf['metadata_keys'] = 'test1,test2'
# webob runs title on all headers
stat = db_stats_collector.ContainerStatsCollector(self.conf)
output_data = set()
for i in range(10):
cont_db = ContainerBroker(
"%s/container-stats-201001010%s-%s.db" % (self.containers, i,
uuid.uuid4().hex),
account='test_acc_%s' % i, container='test_con')
cont_db.initialize()
cont_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
metadata_output = ''
if set_metadata:
if i % 2:
cont_db.update_metadata({'X-Container-Meta-Test1': (5, 1)})
metadata_output = ',1,'
else:
cont_db.update_metadata({'X-Container-Meta-Test2': (7, 2)})
metadata_output = ',,1'
# this will "commit" the data
cont_db.get_info()
if drop_metadata:
output_data.add('''"test_acc_%s","test_con",1,10,,''' % i)
else:
output_data.add('''"test_acc_%s","test_con",1,10%s''' %
(i, metadata_output))
if drop_metadata:
self._drop_metadata_col(cont_db, 'test_acc_%s' % i)
self.assertEqual(len(output_data), 10)
return stat, output_data
def test_account_stat_run_once_account(self):
stat, output_data = self._gen_account_stat()
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_no_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True,
drop_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_both(self):
acc_stat, acc_output_data = self._gen_account_stat()
con_stat, con_output_data = self._gen_container_stat()
acc_stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
for i in range(10):
data = stat_handle.readline()
acc_output_data.discard(data.strip())
self.assertEqual(len(acc_output_data), 0)
con_stat.run_once()
stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
con_output_data.discard(data.strip())
self.assertEqual(len(con_output_data), 0)
def test_account_stat_run_once_fail(self):
stat, output_data = self._gen_account_stat()
rmtree(self.accounts)
stat.run_once()
self.assertEquals(len(stat.logger.log_dict['debug']), 1)
def test_not_implemented(self):
db_stat = db_stats_collector.DatabaseStatsCollector(self.conf,
'account', 'test_dir', 'stats-%Y%m%d%H_')
self.assertRaises(NotImplementedError, db_stat.get_data)
self.assertRaises(NotImplementedError, db_stat.get_header)
def test_not_not_mounted(self):
self.conf['mount_check'] = 'true'
stat, output_data = self._gen_account_stat()
stat.run_once()
self.assertEquals(len(stat.logger.log_dict['error']), 1)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,192 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
import webob
import tempfile
import json
from swift.common import internal_proxy
class DumbBaseApplicationFactory(object):
def __init__(self, status_codes, body=''):
self.status_codes = status_codes[:]
self.body = body
def __call__(self, *a, **kw):
app = DumbBaseApplication(*a, **kw)
app.status_codes = self.status_codes
try:
app.default_status_code = self.status_codes[-1]
except IndexError:
app.default_status_code = 200
app.body = self.body
return app
class DumbBaseApplication(object):
def __init__(self, *a, **kw):
self.status_codes = []
self.default_status_code = 200
self.call_count = 0
self.body = ''
def handle_request(self, req):
self.call_count += 1
req.path_info_pop()
if isinstance(self.body, list):
try:
body = self.body.pop(0)
except IndexError:
body = ''
else:
body = self.body
resp = webob.Response(request=req, body=body,
conditional_response=True)
try:
resp.status_int = self.status_codes.pop(0)
except IndexError:
resp.status_int = self.default_status_code
return resp
def update_request(self, req):
return req
class TestInternalProxy(unittest.TestCase):
def test_webob_request_copy(self):
req = webob.Request.blank('/')
req2 = internal_proxy.webob_request_copy(req)
self.assertEquals(req.path, req2.path)
self.assertEquals(req.path_info, req2.path_info)
self.assertFalse(req is req2)
def test_handle_request(self):
status_codes = [200]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy()
req = webob.Request.blank('/')
orig_req = internal_proxy.webob_request_copy(req)
resp = p._handle_request(req)
self.assertEquals(req.path_info, orig_req.path_info)
def test_handle_request_with_retries(self):
status_codes = [500, 200]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy(retries=3)
req = webob.Request.blank('/')
orig_req = internal_proxy.webob_request_copy(req)
resp = p._handle_request(req)
self.assertEquals(req.path_info, orig_req.path_info)
self.assertEquals(p.upload_app.call_count, 2)
self.assertEquals(resp.status_int, 200)
def test_get_object(self):
status_codes = [200]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy()
code, body = p.get_object('a', 'c', 'o')
body = ''.join(body)
self.assertEquals(code, 200)
self.assertEquals(body, '')
def test_create_container(self):
status_codes = [200]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy()
resp = p.create_container('a', 'c')
self.assertTrue(resp)
def test_handle_request_with_retries_all_error(self):
status_codes = [500, 500, 500, 500, 500]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy(retries=3)
req = webob.Request.blank('/')
orig_req = internal_proxy.webob_request_copy(req)
resp = p._handle_request(req)
self.assertEquals(req.path_info, orig_req.path_info)
self.assertEquals(p.upload_app.call_count, 3)
self.assertEquals(resp.status_int, 500)
def test_get_container_list_empty(self):
status_codes = [200]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes, body='[]')
p = internal_proxy.InternalProxy()
resp = p.get_container_list('a', 'c')
self.assertEquals(resp, [])
def test_get_container_list_no_body(self):
status_codes = [204]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes, body='')
p = internal_proxy.InternalProxy()
resp = p.get_container_list('a', 'c')
self.assertEquals(resp, [])
def test_get_container_list_full_listing(self):
status_codes = [200, 200]
obj_a = dict(name='foo', hash='foo', bytes=3,
content_type='text/plain', last_modified='2011/01/01')
obj_b = dict(name='bar', hash='bar', bytes=3,
content_type='text/plain', last_modified='2011/01/01')
body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes, body=body)
p = internal_proxy.InternalProxy()
resp = p.get_container_list('a', 'c')
expected = ['foo', 'bar']
self.assertEquals([x['name'] for x in resp], expected)
def test_get_container_list_full(self):
status_codes = [204]
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes, body='')
p = internal_proxy.InternalProxy()
resp = p.get_container_list('a', 'c', marker='a', end_marker='b',
limit=100, prefix='/', delimiter='.')
self.assertEquals(resp, [])
def test_upload_file(self):
status_codes = [200, 200] # container PUT + object PUT
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy()
with tempfile.NamedTemporaryFile() as file_obj:
resp = p.upload_file(file_obj.name, 'a', 'c', 'o')
self.assertTrue(resp)
def test_upload_file_with_retries(self):
status_codes = [200, 500, 200] # container PUT + error + object PUT
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
status_codes)
p = internal_proxy.InternalProxy(retries=3)
with tempfile.NamedTemporaryFile() as file_obj:
resp = p.upload_file(file_obj, 'a', 'c', 'o')
self.assertTrue(resp)
self.assertEquals(p.upload_app.call_count, 3)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,834 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test.unit import tmpfile
import Queue
import datetime
import hashlib
import pickle
import time
from swift.common import internal_proxy
from swift.stats import log_processor
from swift.common.exceptions import ChunkReadTimeout
class FakeUploadApp(object):
def __init__(self, *args, **kwargs):
pass
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
def foo(self, *a, **kw):
pass
class DumbInternalProxy(object):
def __init__(self, code=200, timeout=False, bad_compressed=False):
self.code = code
self.timeout = timeout
self.bad_compressed = bad_compressed
def get_container_list(self, account, container, marker=None,
end_marker=None):
n = '2010/03/14/13/obj1'
if marker is None or n > marker:
if end_marker:
if n <= end_marker:
return [{'name': n}]
else:
return []
return [{'name': n}]
return []
def get_object(self, account, container, object_name):
if object_name.endswith('.gz'):
if self.bad_compressed:
# invalid compressed data
def data():
yield '\xff\xff\xff\xff\xff\xff\xff'
else:
# 'obj\ndata', compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
else:
def data():
yield 'obj\n'
if self.timeout:
raise ChunkReadTimeout
yield 'data'
return self.code, data()
class TestLogProcessor(unittest.TestCase):
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
stats_test_line = 'account,1,2,3'
proxy_config = {'log-processor': {
}
}
def test_lazy_load_internal_proxy(self):
# stub out internal_proxy's upload_app
internal_proxy.BaseApplication = FakeUploadApp
dummy_proxy_config = """[app:proxy-server]
use = egg:swift#proxy
"""
with tmpfile(dummy_proxy_config) as proxy_config_file:
conf = {'log-processor': {
'proxy_server_conf': proxy_config_file,
}
}
p = log_processor.LogProcessor(conf, DumbLogger())
self.assert_(isinstance(p._internal_proxy,
None.__class__))
self.assert_(isinstance(p.internal_proxy,
log_processor.InternalProxy))
self.assertEquals(p.internal_proxy, p._internal_proxy)
# reset FakeUploadApp
reload(internal_proxy)
def test_access_log_line_parser(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
self.assertEquals(result, {'code': 200,
'processing_time': '0.0262',
'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
'month': '07',
'second': '30',
'year': '2010',
'query': 'format=json&foo',
'tz': '+0000',
'http_version': 'HTTP/1.0',
'object_name': 'bar',
'etag': '-',
'method': 'GET',
'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
'client_ip': '1.2.3.4',
'format': 1,
'bytes_out': 95,
'container_name': 'foo',
'day': '09',
'minute': '14',
'account': 'acct',
'hour': '04',
'referrer': '-',
'request': '/v1/acct/foo/bar',
'user_agent': 'curl',
'bytes_in': 6,
'lb_ip': '4.5.6.7'})
def test_process_one_access_file(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
def get_object_data(*a, **kw):
return [self.access_test_line]
p.get_object_data = get_object_data
result = p.process_one_file('access', 'a', 'c', 'o')
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(result, expected)
def test_process_one_access_file_error(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
'access', 'a', 'c', 'o')
def test_get_container_listing(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
result = p.get_container_listing('a', 'foo')
expected = ['2010/03/14/13/obj1']
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', listing_filter=expected)
expected = []
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031412',
end_date='2010031414')
expected = ['2010/03/14/13/obj1']
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031414')
expected = []
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031410',
end_date='2010031412')
expected = []
self.assertEquals(result, expected)
result = p.get_container_listing('a', 'foo', start_date='2010031412',
end_date='2010031413')
expected = ['2010/03/14/13/obj1']
self.assertEquals(result, expected)
def test_get_object_data(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
result = list(p.get_object_data('a', 'c', 'o', False))
expected = ['obj','data']
self.assertEquals(result, expected)
result = list(p.get_object_data('a', 'c', 'o.gz', True))
self.assertEquals(result, expected)
def test_get_object_data_errors(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
result = p.get_object_data('a', 'c', 'o.gz', True)
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(timeout=True)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy()
stats_proxy_config.update({
'log-processor-stats': {
'class_path':
'swift.stats.stats_processor.StatsLogProcessor'
}})
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.stats_test_line]
p.get_object_data = get_object_data
result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
expected = {('account', 'y', 'm', 'd', 'h'):
{'replica_count': 1,
'object_count': 2,
'container_count': 1,
'bytes_used': 3}}
self.assertEquals(result, expected)
def test_generate_keylist_mapping(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
result = p.generate_keylist_mapping()
expected = {}
self.assertEquals(result, expected)
def test_generate_keylist_mapping_with_dummy_plugins(self):
class Plugin1(object):
def keylist_mapping(self):
return {'a': 'b', 'c': 'd', 'e': ['f', 'g']}
class Plugin2(object):
def keylist_mapping(self):
return {'a': '1', 'e': '2', 'h': '3'}
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p.plugins['plugin1'] = {'instance': Plugin1()}
p.plugins['plugin2'] = {'instance': Plugin2()}
result = p.generate_keylist_mapping()
expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']),
'h': '3'}
self.assertEquals(result, expected)
def test_access_keylist_mapping_format(self):
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(proxy_config, DumbLogger())
mapping = p.generate_keylist_mapping()
for k, v in mapping.items():
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))
def test_stats_keylist_mapping_format(self):
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-stats': {
'class_path':
'swift.stats.stats_processor.StatsLogProcessor'
}})
p = log_processor.LogProcessor(proxy_config, DumbLogger())
mapping = p.generate_keylist_mapping()
for k, v in mapping.items():
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise Exception()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, Exception))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
class TestLogProcessorDaemon(unittest.TestCase):
def test_get_lookback_interval(self):
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, lookback_hours, lookback_window):
self.lookback_hours = lookback_hours
self.lookback_window = lookback_window
try:
d = datetime.datetime
for x in [
[d(2011, 1, 1), 0, 0, None, None],
[d(2011, 1, 1), 120, 0, '2010122700', None],
[d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
[d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
[d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
[d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
]:
log_processor.now = lambda: x[0]
d = MockLogProcessorDaemon(x[1], x[2])
self.assertEquals((x[3], x[4]), d.get_lookback_interval())
finally:
log_processor.now = datetime.datetime.now
def test_get_processed_files_list(self):
class MockLogProcessor():
def __init__(self, stream):
self.stream = stream
def get_object_data(self, *args, **kwargs):
return self.stream
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, stream):
self.log_processor = MockLogProcessor(stream)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
file_list = set(['a', 'b', 'c'])
for s, l in [['', None],
[pickle.dumps(set()).split('\n'), set()],
[pickle.dumps(file_list).split('\n'), file_list],
]:
self.assertEquals(l,
MockLogProcessorDaemon(s).get_processed_files_list())
def test_get_processed_files_list_bad_file_downloads(self):
class MockLogProcessor():
def __init__(self, status_code):
self.err = log_processor.BadFileDownload(status_code)
def get_object_data(self, *a, **k):
raise self.err
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, status_code):
self.log_processor = MockLogProcessor(status_code)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
for c, l in [[404, set()], [503, None], [None, None]]:
self.assertEquals(l,
MockLogProcessorDaemon(c).get_processed_files_list())
def test_get_aggregate_data(self):
# when run "for real"
# the various keys/values in the input and output
# dictionaries are often not simple strings
# for testing we can use keys that are easier to work with
processed_files = set()
data_in = [
['file1', {
'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
],
['file2', {'acct1_time1': {'field1': 10}}],
]
expected_data_out = {
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
pass
d = MockLogProcessorDaemon()
data_out = d.get_aggregate_data(processed_files, data_in)
for k, v in expected_data_out.items():
self.assertEquals(v, data_out[k])
self.assertEquals(set(['file1', 'file2']), processed_files)
def test_get_final_info(self):
# when run "for real"
# the various keys/values in the input and output
# dictionaries are often not simple strings
# for testing we can use keys/values that are easier to work with
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self._keylist_mapping = {
'out_field1':['field1', 'field2', 'field3'],
'out_field2':['field2', 'field3'],
'out_field3':['field3'],
'out_field4':'field4',
'out_field5':['field6', 'field7', 'field8'],
'out_field6':['field6'],
'out_field7':'field7',
}
data_in = {
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
'field4': 8, 'field5': 11},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
expected_data_out = {
'acct1_time1': {'out_field1': 16, 'out_field2': 5,
'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct1_time2': {'out_field1': 9, 'out_field2': 5,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct2_time1': {'out_field1': 13, 'out_field2': 7,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct3_time3': {'out_field1': 17, 'out_field2': 9,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
}
self.assertEquals(expected_data_out,
MockLogProcessorDaemon().get_final_info(data_in))
def test_store_processed_files_list(self):
class MockInternalProxy:
def __init__(self, test, daemon, processed_files):
self.test = test
self.daemon = daemon
self.processed_files = processed_files
def upload_file(self, f, account, container, filename):
self.test.assertEquals(self.processed_files,
pickle.loads(f.getvalue()))
self.test.assertEquals(self.daemon.log_processor_account,
account)
self.test.assertEquals(self.daemon.log_processor_container,
container)
self.test.assertEquals(self.daemon.processed_files_filename,
filename)
class MockLogProcessor:
def __init__(self, test, daemon, processed_files):
self.internal_proxy = MockInternalProxy(test, daemon,
processed_files)
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test, processed_files):
self.log_processor = \
MockLogProcessor(test, self, processed_files)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
processed_files = set(['a', 'b', 'c'])
MockLogProcessorDaemon(self, processed_files).\
store_processed_files_list(processed_files)
def test_get_output(self):
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self._keylist_mapping = {'a':None, 'b':None, 'c':None}
data_in = {
('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
}
expected_data_out = [
['data_ts', 'account', 'a', 'b', 'c'],
['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
]
data_out = MockLogProcessorDaemon().get_output(data_in)
self.assertEquals(expected_data_out[0], data_out[0])
for row in data_out[1:]:
self.assert_(row in expected_data_out)
for row in expected_data_out[1:]:
self.assert_(row in data_out)
def test_store_output(self):
try:
real_strftime = time.strftime
mock_strftime_return = '2010/03/02/01/'
def mock_strftime(format):
self.assertEquals('%Y/%m/%d/%H/', format)
return mock_strftime_return
log_processor.time.strftime = mock_strftime
data_in = [
['data_ts', 'account', 'a', 'b', 'c'],
['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
]
expected_output = '\n'.join([','.join(row) for row in data_in])
h = hashlib.md5(expected_output).hexdigest()
expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
class MockInternalProxy:
def __init__(self, test, daemon, expected_filename,
expected_output):
self.test = test
self.daemon = daemon
self.expected_filename = expected_filename
self.expected_output = expected_output
def upload_file(self, f, account, container, filename):
self.test.assertEquals(self.daemon.log_processor_account,
account)
self.test.assertEquals(self.daemon.log_processor_container,
container)
self.test.assertEquals(self.expected_filename, filename)
self.test.assertEquals(self.expected_output, f.getvalue())
class MockLogProcessor:
def __init__(self, test, daemon, expected_filename,
expected_output):
self.internal_proxy = MockInternalProxy(test, daemon,
expected_filename, expected_output)
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test, expected_filename, expected_output):
self.log_processor = MockLogProcessor(test, self,
expected_filename, expected_output)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
MockLogProcessorDaemon(self, expected_filename, expected_output).\
store_output(data_in)
finally:
log_processor.time.strftime = real_strftime
def test_keylist_mapping(self):
# Kind of lame test to see if the propery is both
# generated by a particular method and cached properly.
# The method that actually generates the mapping is
# tested elsewhere.
value_return = 'keylist_mapping'
class MockLogProcessor:
def __init__(self):
self.call_count = 0
def generate_keylist_mapping(self):
self.call_count += 1
return value_return
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self.log_processor = MockLogProcessor()
self._keylist_mapping = None
d = MockLogProcessorDaemon()
self.assertEquals(value_return, d.keylist_mapping)
self.assertEquals(value_return, d.keylist_mapping)
self.assertEquals(1, d.log_processor.call_count)
def test_process_logs(self):
try:
mock_logs_to_process = 'logs_to_process'
mock_processed_files = 'processed_files'
real_multiprocess_collate = log_processor.multiprocess_collate
multiprocess_collate_return = 'multiprocess_collate_return'
get_aggregate_data_return = 'get_aggregate_data_return'
get_final_info_return = 'get_final_info_return'
get_output_return = 'get_output_return'
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test):
self.test = test
self.total_conf = 'total_conf'
self.logger = 'logger'
self.worker_count = 'worker_count'
def get_aggregate_data(self, processed_files, results):
self.test.assertEquals(mock_processed_files, processed_files)
self.test.assertEquals(multiprocess_collate_return, results)
return get_aggregate_data_return
def get_final_info(self, aggr_data):
self.test.assertEquals(get_aggregate_data_return, aggr_data)
return get_final_info_return
def get_output(self, final_info):
self.test.assertEquals(get_final_info_return, final_info)
return get_output_return
d = MockLogProcessorDaemon(self)
def mock_multiprocess_collate(processor_args, logs_to_process,
worker_count):
self.assertEquals(d.total_conf, processor_args[0])
self.assertEquals(d.logger, processor_args[1])
self.assertEquals(mock_logs_to_process, logs_to_process)
self.assertEquals(d.worker_count, worker_count)
return multiprocess_collate_return
log_processor.multiprocess_collate = mock_multiprocess_collate
output = d.process_logs(mock_logs_to_process, mock_processed_files)
self.assertEquals(get_output_return, output)
finally:
log_processor.multiprocess_collate = real_multiprocess_collate
def test_run_once_get_processed_files_list_returns_none(self):
class MockLogProcessor:
def get_data_list(self, lookback_start, lookback_end,
processed_files):
raise unittest.TestCase.failureException, \
'Method should not be called'
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self.logger = DumbLogger()
self.log_processor = MockLogProcessor()
def get_lookback_interval(self):
return None, None
def get_processed_files_list(self):
return None
MockLogProcessorDaemon().run_once()
def test_run_once_no_logs_to_process(self):
class MockLogProcessor():
def __init__(self, daemon, test):
self.daemon = daemon
self.test = test
def get_data_list(self, lookback_start, lookback_end,
processed_files):
self.test.assertEquals(self.daemon.lookback_start,
lookback_start)
self.test.assertEquals(self.daemon.lookback_end,
lookback_end)
self.test.assertEquals(self.daemon.processed_files,
processed_files)
return []
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test):
self.logger = DumbLogger()
self.log_processor = MockLogProcessor(self, test)
self.lookback_start = 'lookback_start'
self.lookback_end = 'lookback_end'
self.processed_files = ['a', 'b', 'c']
def get_lookback_interval(self):
return self.lookback_start, self.lookback_end
def get_processed_files_list(self):
return self.processed_files
def process_logs(logs_to_process, processed_files):
raise unittest.TestCase.failureException, \
'Method should not be called'
MockLogProcessorDaemon(self).run_once()

View File

@ -0,0 +1,240 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from datetime import datetime
from tempfile import mkdtemp
from shutil import rmtree
from functools import partial
from collections import defaultdict
import random
import string
from test.unit import temptree
from swift.stats import log_uploader
import logging
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \
'\xf3\xf3\xad\x04\x00\x00\x00'
access_regex = '''
^
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])
.*$
'''
def mock_appconfig(*args, **kwargs):
pass
class MockInternalProxy():
def __init__(self, *args, **kwargs):
pass
def create_container(self, *args, **kwargs):
return True
def upload_file(self, *args, **kwargs):
return True
_orig_LogUploader = log_uploader.LogUploader
class MockLogUploader(_orig_LogUploader):
def __init__(self, conf, logger=LOGGER):
conf['swift_account'] = conf.get('swift_account', '')
conf['container_name'] = conf.get('container_name', '')
conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0')
conf['source_filename_format'] = conf.get(
'source_filename_format', conf.get('filename_format'))
log_uploader.LogUploader.__init__(self, conf, 'plugin')
self.logger = logger
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
_orig_LogUploader.upload_one_log(self, filename, year, month,
day, hour)
class TestLogUploader(unittest.TestCase):
def setUp(self):
# mock internal proxy
self._orig_InternalProxy = log_uploader.InternalProxy
self._orig_appconfig = log_uploader.appconfig
log_uploader.InternalProxy = MockInternalProxy
log_uploader.appconfig = mock_appconfig
def tearDown(self):
log_uploader.appconfig = self._orig_appconfig
log_uploader.InternalProxy = self._orig_InternalProxy
def test_bad_pattern_in_config(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
# invalid pattern
conf = {'log_dir': t,
'source_filename_pattern': '%Y%m%d%h'} # should be %H
uploader = MockLogUploader(conf)
self.assertRaises(SystemExit, uploader.upload_all_logs)
conf = {'log_dir': t, 'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 1)
def test_pattern_upload_all_logs(self):
# test empty dir
with temptree([]) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
self.assertRaises(SystemExit, uploader.run_once)
def get_random_length_str(max_len=10, chars=string.ascii_letters):
return ''.join(random.choice(chars) for x in
range(random.randint(1, max_len)))
template = 'prefix_%(random)s_%(digits)s.blah.' \
'%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz'
pattern = '''prefix_.*_[0-9]+\.blah\.
(?P<year>[0-9]{4})
(?P<month>[0-1][0-9])
(?P<day>[0-3][0-9])
(?P<hour>[0-2][0-9])00-[0-9]{2}00
-[0-9]?[0-9]\.gz'''
files_that_should_match = []
# add some files that match
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m%d'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files_that_should_match.append(fname)
# add some files that don't match
files = list(files_that_should_match)
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files.append(fname)
for fname in files:
print fname
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
self.assertEquals(len(os.listdir(t)), 48)
conf = {'source_filename_pattern': pattern, 'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(os.listdir(t)), 24)
self.assertEquals(len(uploader.uploaded_files), 24)
files_that_were_uploaded = set(x[0] for x in
uploader.uploaded_files)
for f in files_that_should_match:
self.assert_(os.path.join(t, f) in files_that_were_uploaded)
def test_log_cutoff(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t, 'new_log_cutoff': '7200',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
conf = {'log_dir': t, 'new_log_cutoff': '0',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
def test_create_container_fail(self):
files = [datetime.now().strftime('%Y%m%d%H')]
conf = {'source_filename_pattern': access_regex}
with temptree(files) as t:
conf['log_dir'] = t
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
with temptree(files) as t:
conf['log_dir'] = t
uploader = MockLogUploader(conf)
# mock create_container to fail
uploader.internal_proxy.create_container = lambda *args: False
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
def test_unlink_log(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'false',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
conf = {'log_dir': t, 'unlink_log': 'true',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file gone
self.assertEquals(len(os.listdir(t)), 0)
def test_upload_file_failed(self):
files = ['plugin-%s' % datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'true',
'source_filename_pattern': access_regex}
uploader = MockLogUploader(conf)
# mock upload_file to fail, and clean up mock
def mock_upload_file(self, *args, **kwargs):
uploader.uploaded_files.pop()
return False
uploader.internal_proxy.upload_file = mock_upload_file
self.assertRaises(SystemExit, uploader.run_once)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,29 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
import unittest
from swift.stats import stats_processor
class TestStatsProcessor(unittest.TestCase):
def test_placeholder(self):
pass
if __name__ == '__main__':
unittest.main()