diff --git a/bin/swift-account-stats-logger b/bin/swift-account-stats-logger new file mode 100755 index 0000000000..c42554de82 --- /dev/null +++ b/bin/swift-account-stats-logger @@ -0,0 +1,27 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.account_stats import AccountStat +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 2: + print "Usage: swift-account-stats-logger CONFIG_FILE" + sys.exit() + stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats') + stats = AccountStat(stats_conf).run(once=True) diff --git a/bin/swift-log-stats-collector b/bin/swift-log-stats-collector new file mode 100755 index 0000000000..d21135b35c --- /dev/null +++ b/bin/swift-log-stats-collector @@ -0,0 +1,27 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.log_processor import LogProcessorDaemon +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 2: + print "Usage: swift-log-stats-collector CONFIG_FILE" + sys.exit() + conf = utils.readconf(sys.argv[1], log_name='log-stats-collector') + stats = LogProcessorDaemon(conf).run(once=True) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader new file mode 100755 index 0000000000..b557e4c167 --- /dev/null +++ b/bin/swift-log-uploader @@ -0,0 +1,31 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.log_uploader import LogUploader +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 3: + print "Usage: swift-log-uploader CONFIG_FILE plugin" + sys.exit() + uploader_conf = utils.readconf(sys.argv[1], 'log-processor') + plugin = sys.argv[2] + section_name = 'log-processor-%s' % plugin + plugin_conf = utils.readconf(sys.argv[1], section_name) + uploader_conf.update(plugin_conf) + uploader = LogUploader(uploader_conf, plugin).run(once=True) diff --git a/doc/source/index.rst b/doc/source/index.rst index 8760852f13..ee10ae67f0 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -24,6 +24,7 @@ Overview: overview_reaper overview_auth overview_replication + overview_stats Development: diff --git a/doc/source/overview_stats.rst b/doc/source/overview_stats.rst new file mode 100644 index 0000000000..6364de4611 --- /dev/null +++ b/doc/source/overview_stats.rst @@ -0,0 +1,184 @@ +================== +Swift stats system +================== + +The swift stats system is composed of three parts parts: log creation, log +uploading, and log processing. The system handles two types of logs (access +and account stats), but it can be extended to handle other types of logs. + +--------- +Log Types +--------- + +*********** +Access logs +*********** + +Access logs are the proxy server logs. Rackspace uses syslog-ng to redirect +the proxy log output to an hourly log file. For example, a proxy request that +is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412. +This allows easy log rotation and easy per-hour log processing. + +****************** +Account stats logs +****************** + +Account stats logs are generated by a stats system process. +swift-account-stats-logger runs on each account server (via cron) and walks +the filesystem looking for account databases. When an account database is +found, the logger selects the account hash, bytes_used, container_count, and +object_count. These values are then written out as one line in a csv file. One +csv file is produced for every run of swift-account-stats-logger. This means +that, system wide, one csv file is produced for every storage node. Rackspace +runs the account stats logger every hour. Therefore, in a cluster of ten +account servers, ten csv files are produced every hour. Also, every account +will have one entry for every replica in the system. On average, there will be +three copies of each account in the aggregate of all account stat csv files +created in one system-wide run. + +---------------------- +Log Processing plugins +---------------------- + +The swift stats system is written to allow a plugin to be defined for every +log type. Swift includes plugins for both access logs and storage stats logs. +Each plugin is responsible for defining, in a config section, where the logs +are stored on disk, where the logs will be stored in swift (account and +container), the filename format of the logs on disk, the location of the +plugin class definition, and any plugin-specific config values. + +The plugin class definition defines three methods. The constructor must accept +one argument (the dict representation of the plugin's config section). The +process method must accept an iterator, and the account, container, and object +name of the log. The keylist_mapping accepts no parameters. + +------------- +Log Uploading +------------- + +swift-log-uploader accepts a config file and a plugin name. It finds the log +files on disk according to the plugin config section and uploads them to the +swift cluster. This means one uploader process will run on each proxy server +node and each account server node. To not upload partially-written log files, +the uploader will not upload files with an mtime of less than two hours ago. +Rackspace runs this process once an hour via cron. + +-------------- +Log Processing +-------------- + +swift-log-stats-collector accepts a config file and generates a csv that is +uploaded to swift. It loads all plugins defined in the config file, generates +a list of all log files in swift that need to be processed, and passes an +iterable of the log file data to the appropriate plugin's process method. The +process method returns a dictionary of data in the log file keyed on (account, +year, month, day, hour). The log-stats-collector process then combines all +dictionaries from all calls to a process method into one dictionary. Key +collisions within each (account, year, month, day, hour) dictionary are +summed. Finally, the summed dictionary is mapped to the final csv values with +each plugin's keylist_mapping method. + +The resulting csv file has one line per (account, year, month, day, hour) for +all log files processed in that run of swift-log-stats-collector. + + +================================ +Running the stats system on SAIO +================================ + +#. Create a swift account to use for storing stats information, and note the + account hash. The hash will be used in config files. + +#. Install syslog-ng:: + + sudo apt-get install syslog-ng + +#. Add the following to the end of `/etc/syslog-ng/syslog-ng.conf`:: + + # Added for swift logging + destination df_local1 { file("/var/log/swift/proxy.log" owner() group()); }; + destination df_local1_err { file("/var/log/swift/proxy.error" owner() group()); }; + destination df_local1_hourly { file("/var/log/swift/hourly/$YEAR$MONTH$DAY$HOUR" owner() group()); }; + filter f_local1 { facility(local1) and level(info); }; + + filter f_local1_err { facility(local1) and not level(info); }; + + # local1.info -/var/log/swift/proxy.log + # write to local file and to remove log server + log { + source(s_all); + filter(f_local1); + destination(df_local1); + destination(df_local1_hourly); + }; + + # local1.error -/var/log/swift/proxy.error + # write to local file and to remove log server + log { + source(s_all); + filter(f_local1_err); + destination(df_local1_err); + }; + +#. Restart syslog-ng + +#. Create the log directories:: + + mkdir /var/log/swift/hourly + mkdir /var/log/swift/stats + chown -R : /var/log/swift + +#. Create `/etc/swift/log-processor.conf`:: + + [log-processor] + swift_account = + user = + + [log-processor-access] + swift_account = + container_name = log_data + log_dir = /var/log/swift/hourly/ + source_filename_format = %Y%m%d%H + class_path = swift.stats.access_processor.AccessLogProcessor + user = + + [log-processor-stats] + swift_account = + container_name = account_stats + log_dir = /var/log/swift/stats/ + source_filename_format = %Y%m%d%H_* + class_path = swift.stats.stats_processor.StatsLogProcessor + account_server_conf = /etc/swift/account-server/1.conf + user = + +#. Add the following under [app:proxy-server] in `/etc/swift/proxy-server.conf`:: + + log_facility = LOG_LOCAL1 + +#. Create a `cron` job to run once per hour to create the stats logs. In + `/etc/cron.d/swift-stats-log-creator`:: + + 0 * * * * swift-account-stats-logger /etc/swift/log-processor.conf + +#. Create a `cron` job to run once per hour to upload the stats logs. In + `/etc/cron.d/swift-stats-log-uploader`:: + + 10 * * * * swift-log-uploader /etc/swift/log-processor.conf stats + +#. Create a `cron` job to run once per hour to upload the access logs. In + `/etc/cron.d/swift-access-log-uploader`:: + + 5 * * * * swift-log-uploader /etc/swift/log-processor.conf access + +#. Create a `cron` job to run once per hour to process the logs. In + `/etc/cron.d/swift-stats-processor`:: + + 30 * * * * swift-log-stats-collector /etc/swift/log-processor.conf + +After running for a few hours, you should start to see .csv files in the +log_processing_data container in the swift stats account that was created +earlier. This file will have one entry per account per hour for each account +with activity in that hour. One .csv file should be produced per hour. Note +that the stats will be delayed by at least two hours by default. This can be +changed with the new_log_cutoff variable in the config file. See +`log-processing.conf-sample` for more details. \ No newline at end of file diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample new file mode 100644 index 0000000000..11805add0b --- /dev/null +++ b/etc/log-processing.conf-sample @@ -0,0 +1,39 @@ +# plugin section format is named "log-processor-" + +[log-processor] +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# container_name = log_processing_data +# proxy_server_conf = /etc/swift/proxy-server.conf +# log_facility = LOG_LOCAL0 +# log_level = INFO +# lookback_hours = 120 +# lookback_window = 120 +# user = swift + +[log-processor-access] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = log_data +source_filename_format = access-%Y%m%d%H +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.access_processor.AccessLogProcessor +# service ips is for client ip addresses that should be counted as servicenet +# service_ips = +# load balancer private ips is for load balancer ip addresses that should be +# counted as servicenet +# lb_private_ips = +# server_name = proxy +# user = swift +# warn_percent = 0.8 + +[log-processor-stats] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = account_stats +source_filename_format = stats-%Y%m%d%H_* +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.stats_processor.StatsLogProcessor +# account_server_conf = /etc/swift/account-server.conf +# user = swift \ No newline at end of file diff --git a/setup.py b/setup.py index 936bf92f8e..b50fe7d4b1 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,9 @@ setup( 'bin/swift-ring-builder', 'bin/swift-stats-populate', 'bin/swift-stats-report', 'bin/swift-bench', + 'bin/swift-log-uploader', + 'bin/swift-log-stats-collector', + 'bin/swift-account-stats-logger', ], entry_points={ 'paste.app_factory': [ diff --git a/swift/common/compressing_file_reader.py b/swift/common/compressing_file_reader.py new file mode 100644 index 0000000000..d6de9154eb --- /dev/null +++ b/swift/common/compressing_file_reader.py @@ -0,0 +1,73 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zlib +import struct + + +class CompressingFileReader(object): + ''' + Wraps a file object and provides a read method that returns gzip'd data. + + One warning: if read is called with a small value, the data returned may + be bigger than the value. In this case, the "compressed" data will be + bigger than the original data. To solve this, use a bigger read buffer. + + An example use case: + Given an uncompressed file on disk, provide a way to read compressed data + without buffering the entire file data in memory. Using this class, an + uncompressed log file could be uploaded as compressed data with chunked + transfer encoding. + + gzip header and footer code taken from the python stdlib gzip module + + :param file_obj: File object to read from + :param compresslevel: compression level + ''' + + def __init__(self, file_obj, compresslevel=9): + self._f = file_obj + self._compressor = zlib.compressobj(compresslevel, + zlib.DEFLATED, + -zlib.MAX_WBITS, + zlib.DEF_MEM_LEVEL, + 0) + self.done = False + self.first = True + self.crc32 = 0 + self.total_size = 0 + + def read(self, *a, **kw): + if self.done: + return '' + x = self._f.read(*a, **kw) + if x: + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL + self.total_size += len(x) + compressed = self._compressor.compress(x) + if not compressed: + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) + else: + compressed = self._compressor.flush(zlib.Z_FINISH) + crc32 = struct.pack(" 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + if not (200 <= resp.status_int < 300): + return False + return True + + def get_object(self, account, container, object_name): + """ + Get object. + + :param account: account name object is in + :param container: container name object is in + :param object_name: name of object to get + :returns: iterator for object data + """ + req = webob.Request.blank('/v1/%s/%s/%s' % + (account, container, object_name), + environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + return resp.status_int, resp.app_iter + + def create_container(self, account, container): + """ + Create container. + + :param account: account name to put the container in + :param container: container name to create + :returns: True if successful, otherwise False + """ + req = webob.Request.blank('/v1/%s/%s' % (account, container), + environ={'REQUEST_METHOD': 'PUT'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + return 200 <= resp.status_int < 300 + + def get_container_list(self, account, container, marker=None, limit=None, + prefix=None, delimiter=None, full_listing=True): + """ + Get container listing. + + :param account: account name for the container + :param container: container name to get the listing of + :param marker: marker query + :param limit: limit to query + :param prefix: prefix query + :param delimeter: delimeter for query + :param full_listing: if True, make enough requests to get all listings + :returns: list of objects + """ + if full_listing: + rv = [] + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + while listing: + rv.extend(listing) + if not delimiter: + marker = listing[-1]['name'] + else: + marker = listing[-1].get('name', listing[-1].get('subdir')) + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + return rv + path = '/v1/%s/%s' % (account, container) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + path += '?%s' % qs + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + if resp.status_int == 204: + return [] + if 200 <= resp.status_int < 300: + return json_loads(resp.body) diff --git a/swift/common/utils.py b/swift/common/utils.py index 768422153c..f8feb73968 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -553,12 +553,13 @@ def cache_from_env(env): return item_from_env(env, 'swift.cache') -def readconf(conf, section_name, log_name=None, defaults=None): +def readconf(conf, section_name=None, log_name=None, defaults=None): """ Read config file and return config items as a dict :param conf: path to config file - :param section_name: config section to read + :param section_name: config section to read (will return all sections if + not defined) :param log_name: name to be used with logging (will use section_name if not defined) :param defaults: dict of default values to pre-populate the config with @@ -570,16 +571,24 @@ def readconf(conf, section_name, log_name=None, defaults=None): if not c.read(conf): print "Unable to read config file %s" % conf sys.exit(1) - if c.has_section(section_name): - conf = dict(c.items(section_name)) - else: - print "Unable to find %s config section in %s" % (section_name, conf) - sys.exit(1) - if "log_name" not in conf: - if log_name is not None: - conf['log_name'] = log_name + if section_name: + if c.has_section(section_name): + conf = dict(c.items(section_name)) else: - conf['log_name'] = section_name + print "Unable to find %s config section in %s" % (section_name, + conf) + sys.exit(1) + if "log_name" not in conf: + if log_name is not None: + conf['log_name'] = log_name + else: + conf['log_name'] = section_name + else: + conf = {} + for s in c.sections(): + conf.update({s: dict(c.items(s))}) + if 'log_name' not in conf: + conf['log_name'] = log_name return conf diff --git a/swift/stats/__init__.py b/swift/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py new file mode 100644 index 0000000000..5d8766b9df --- /dev/null +++ b/swift/stats/access_processor.py @@ -0,0 +1,239 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +from urllib import unquote +import copy + +from swift.common.utils import split_path, get_logger + +month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() + + +class AccessLogProcessor(object): + """Transform proxy server access logs""" + + def __init__(self, conf): + self.server_name = conf.get('server_name', 'proxy') + self.lb_private_ips = [x.strip() for x in \ + conf.get('lb_private_ips', '').split(',')\ + if x.strip()] + self.service_ips = [x.strip() for x in \ + conf.get('service_ips', '').split(',')\ + if x.strip()] + self.warn_percent = float(conf.get('warn_percent', '0.8')) + self.logger = get_logger(conf) + + def log_line_parser(self, raw_log): + '''given a raw access log line, return a dict of the good parts''' + d = {} + try: + (_, + server, + client_ip, + lb_ip, + timestamp, + method, + request, + http_version, + code, + referrer, + user_agent, + auth_token, + bytes_in, + bytes_out, + etag, + trans_id, + headers, + processing_time) = (unquote(x) for x in raw_log[16:].split(' ')) + except ValueError: + self.logger.debug('Bad line data: %s' % repr(raw_log)) + return {} + if server != self.server_name: + # incorrect server name in log line + self.logger.debug('Bad server name: found "%s" expected "%s"' \ + % (server, self.server_name)) + return {} + (version, + account, + container_name, + object_name) = split_path(request, 2, 4, True) + if container_name is not None: + container_name = container_name.split('?', 1)[0] + if object_name is not None: + object_name = object_name.split('?', 1)[0] + account = account.split('?', 1)[0] + query = None + if '?' in request: + request, query = request.split('?', 1) + args = query.split('&') + # Count each query argument. This is used later to aggregate + # the number of format, prefix, etc. queries. + for q in args: + if '=' in q: + k, v = q.split('=', 1) + else: + k = q + # Certain keys will get summmed in stats reporting + # (format, path, delimiter, etc.). Save a "1" here + # to indicate that this request is 1 request for + # its respective key. + d[k] = 1 + d['client_ip'] = client_ip + d['lb_ip'] = lb_ip + d['method'] = method + d['request'] = request + if query: + d['query'] = query + d['http_version'] = http_version + d['code'] = code + d['referrer'] = referrer + d['user_agent'] = user_agent + d['auth_token'] = auth_token + d['bytes_in'] = bytes_in + d['bytes_out'] = bytes_out + d['etag'] = etag + d['trans_id'] = trans_id + d['processing_time'] = processing_time + day, month, year, hour, minute, second = timestamp.split('/') + d['day'] = day + month = ('%02s' % month_map.index(month)).replace(' ', '0') + d['month'] = month + d['year'] = year + d['hour'] = hour + d['minute'] = minute + d['second'] = second + d['tz'] = '+0000' + d['account'] = account + d['container_name'] = container_name + d['object_name'] = object_name + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) + d['code'] = int(d['code']) + return d + + def process(self, obj_stream, account, container, object_name): + '''generate hourly groupings of data from one access log file''' + hourly_aggr_info = {} + total_lines = 0 + bad_lines = 0 + for line in obj_stream: + line_data = self.log_line_parser(line) + total_lines += 1 + if not line_data: + bad_lines += 1 + continue + account = line_data['account'] + container_name = line_data['container_name'] + year = line_data['year'] + month = line_data['month'] + day = line_data['day'] + hour = line_data['hour'] + bytes_out = line_data['bytes_out'] + bytes_in = line_data['bytes_in'] + method = line_data['method'] + code = int(line_data['code']) + object_name = line_data['object_name'] + client_ip = line_data['client_ip'] + + op_level = None + if not container_name: + op_level = 'account' + elif container_name and not object_name: + op_level = 'container' + elif object_name: + op_level = 'object' + + aggr_key = (account, year, month, day, hour) + d = hourly_aggr_info.get(aggr_key, {}) + if line_data['lb_ip'] in self.lb_private_ips: + source = 'service' + else: + source = 'public' + + if line_data['client_ip'] in self.service_ips: + source = 'service' + + d[(source, 'bytes_out')] = d.setdefault(( + source, 'bytes_out'), 0) + bytes_out + d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ + bytes_in + + d['format_query'] = d.setdefault('format_query', 0) + \ + line_data.get('format', 0) + d['marker_query'] = d.setdefault('marker_query', 0) + \ + line_data.get('marker', 0) + d['prefix_query'] = d.setdefault('prefix_query', 0) + \ + line_data.get('prefix', 0) + d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ + line_data.get('delimiter', 0) + path = line_data.get('path', 0) + d['path_query'] = d.setdefault('path_query', 0) + path + + code = '%dxx' % (code / 100) + key = (source, op_level, method, code) + d[key] = d.setdefault(key, 0) + 1 + + hourly_aggr_info[aggr_key] = d + if bad_lines > (total_lines * self.warn_percent): + name = '/'.join([account, container, object_name]) + self.logger.warning('I found a bunch of bad lines in %s '\ + '(%d bad, %d total)' % (name, bad_lines, total_lines)) + return hourly_aggr_info + + def keylist_mapping(self): + source_keys = 'service public'.split() + level_keys = 'account container object'.split() + verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() + code_keys = '2xx 4xx 5xx'.split() + + keylist_mapping = { + # : or + 'service_bw_in': ('service', 'bytes_in'), + 'service_bw_out': ('service', 'bytes_out'), + 'public_bw_in': ('public', 'bytes_in'), + 'public_bw_out': ('public', 'bytes_out'), + 'account_requests': set(), + 'container_requests': set(), + 'object_requests': set(), + 'service_request': set(), + 'public_request': set(), + 'ops_count': set(), + } + for verb in verb_keys: + keylist_mapping[verb] = set() + for code in code_keys: + keylist_mapping[code] = set() + for source in source_keys: + for level in level_keys: + for verb in verb_keys: + for code in code_keys: + keylist_mapping['account_requests'].add( + (source, 'account', verb, code)) + keylist_mapping['container_requests'].add( + (source, 'container', verb, code)) + keylist_mapping['object_requests'].add( + (source, 'object', verb, code)) + keylist_mapping['service_request'].add( + ('service', level, verb, code)) + keylist_mapping['public_request'].add( + ('public', level, verb, code)) + keylist_mapping[verb].add( + (source, level, verb, code)) + keylist_mapping[code].add( + (source, level, verb, code)) + keylist_mapping['ops_count'].add( + (source, level, verb, code)) + return keylist_mapping diff --git a/swift/stats/account_stats.py b/swift/stats/account_stats.py new file mode 100644 index 0000000000..ddf4192119 --- /dev/null +++ b/swift/stats/account_stats.py @@ -0,0 +1,111 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from paste.deploy import appconfig +import shutil +import hashlib + +from swift.account.server import DATADIR as account_server_data_dir +from swift.common.db import AccountBroker +from swift.common.internal_proxy import InternalProxy +from swift.common.utils import renamer, get_logger, readconf, mkdirs +from swift.common.constraints import check_mount +from swift.common.daemon import Daemon + + +class AccountStat(Daemon): + """ + Extract storage stats from account databases on the account + storage nodes + """ + + def __init__(self, stats_conf): + super(AccountStat, self).__init__(stats_conf) + target_dir = stats_conf.get('log_dir', '/var/log/swift') + account_server_conf_loc = stats_conf.get('account_server_conf', + '/etc/swift/account-server.conf') + server_conf = appconfig('config:%s' % account_server_conf_loc, + name='account-server') + filename_format = stats_conf['source_filename_format'] + if filename_format.count('*') > 1: + raise Exception('source filename format should have at max one *') + self.filename_format = filename_format + self.target_dir = target_dir + mkdirs(self.target_dir) + self.devices = server_conf.get('devices', '/srv/node') + self.mount_check = server_conf.get('mount_check', 'true').lower() in \ + ('true', 't', '1', 'on', 'yes', 'y') + self.logger = get_logger(stats_conf, 'swift-account-stats-logger') + + def run_once(self): + self.logger.info("Gathering account stats") + start = time.time() + self.find_and_process() + self.logger.info("Gathering account stats complete (%0.2f minutes)" % + ((time.time() - start) / 60)) + + def find_and_process(self): + src_filename = time.strftime(self.filename_format) + working_dir = os.path.join(self.target_dir, '.stats_tmp') + shutil.rmtree(working_dir, ignore_errors=True) + mkdirs(working_dir) + tmp_filename = os.path.join(working_dir, src_filename) + hasher = hashlib.md5() + with open(tmp_filename, 'wb') as statfile: + # csv has the following columns: + # Account Name, Container Count, Object Count, Bytes Used + for device in os.listdir(self.devices): + if self.mount_check and not check_mount(self.devices, device): + self.logger.error("Device %s is not mounted, skipping." % + device) + continue + accounts = os.path.join(self.devices, + device, + account_server_data_dir) + if not os.path.exists(accounts): + self.logger.debug("Path %s does not exist, skipping." % + accounts) + continue + for root, dirs, files in os.walk(accounts, topdown=False): + for filename in files: + if filename.endswith('.db'): + db_path = os.path.join(root, filename) + broker = AccountBroker(db_path) + if not broker.is_deleted(): + (account_name, + _, _, _, + container_count, + object_count, + bytes_used, + _, _) = broker.get_info() + line_data = '"%s",%d,%d,%d\n' % ( + account_name, container_count, + object_count, bytes_used) + statfile.write(line_data) + hasher.update(line_data) + file_hash = hasher.hexdigest() + hash_index = src_filename.find('*') + if hash_index < 0: + # if there is no * in the target filename, the uploader probably + # won't work because we are crafting a filename that doesn't + # fit the pattern + src_filename = '_'.join([src_filename, file_hash]) + else: + parts = src_filename[:hash_index], src_filename[hash_index + 1:] + src_filename = ''.join([parts[0], file_hash, parts[1]]) + renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) + shutil.rmtree(working_dir, ignore_errors=True) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py new file mode 100644 index 0000000000..6fd6c68597 --- /dev/null +++ b/swift/stats/log_processor.py @@ -0,0 +1,424 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ConfigParser import ConfigParser +import zlib +import time +import datetime +import cStringIO +import collections +from paste.deploy import appconfig +import multiprocessing +import Queue +import cPickle +import hashlib + +from swift.common.internal_proxy import InternalProxy +from swift.common.exceptions import ChunkReadTimeout +from swift.common.utils import get_logger, readconf +from swift.common.daemon import Daemon + + +class BadFileDownload(Exception): + pass + + +class LogProcessor(object): + """Load plugins, process logs""" + + def __init__(self, conf, logger): + if isinstance(logger, tuple): + self.logger = get_logger(*logger) + else: + self.logger = logger + + self.conf = conf + self._internal_proxy = None + + # load the processing plugins + self.plugins = {} + plugin_prefix = 'log-processor-' + for section in (x for x in conf if x.startswith(plugin_prefix)): + plugin_name = section[len(plugin_prefix):] + plugin_conf = conf.get(section, {}) + self.plugins[plugin_name] = plugin_conf + class_path = self.plugins[plugin_name]['class_path'] + import_target, class_name = class_path.rsplit('.', 1) + module = __import__(import_target, fromlist=[import_target]) + klass = getattr(module, class_name) + self.plugins[plugin_name]['instance'] = klass(plugin_conf) + self.logger.debug('Loaded plugin "%s"' % plugin_name) + + @property + def internal_proxy(self): + if self._internal_proxy is None: + stats_conf = self.conf.get('log-processor', {}) + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig( + 'config:%s' % proxy_server_conf_loc, + name='proxy-server') + self._internal_proxy = InternalProxy(proxy_server_conf, + self.logger, + retries=3) + else: + return self._internal_proxy + + def process_one_file(self, plugin_name, account, container, object_name): + self.logger.info('Processing %s/%s/%s with plugin "%s"' % (account, + container, + object_name, + plugin_name)) + # get an iter of the object data + compressed = object_name.endswith('.gz') + stream = self.get_object_data(account, container, object_name, + compressed=compressed) + # look up the correct plugin and send the stream to it + return self.plugins[plugin_name]['instance'].process(stream, + account, + container, + object_name) + + def get_data_list(self, start_date=None, end_date=None, + listing_filter=None): + total_list = [] + for plugin_name, data in self.plugins.items(): + account = data['swift_account'] + container = data['container_name'] + listing = self.get_container_listing(account, + container, + start_date, + end_date) + for object_name in listing: + # The items in this list end up being passed as positional + # parameters to process_one_file. + x = (plugin_name, account, container, object_name) + if x not in listing_filter: + total_list.append(x) + return total_list + + def get_container_listing(self, swift_account, container_name, + start_date=None, end_date=None, + listing_filter=None): + ''' + Get a container listing, filtered by start_date, end_date, and + listing_filter. Dates, if given, should be in YYYYMMDDHH format + ''' + search_key = None + if start_date is not None: + date_parts = [] + try: + year, start_date = start_date[:4], start_date[4:] + if year: + date_parts.append(year) + month, start_date = start_date[:2], start_date[2:] + if month: + date_parts.append(month) + day, start_date = start_date[:2], start_date[2:] + if day: + date_parts.append(day) + hour, start_date = start_date[:2], start_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + search_key = '/'.join(date_parts) + end_key = None + if end_date is not None: + date_parts = [] + try: + year, end_date = end_date[:4], end_date[4:] + if year: + date_parts.append(year) + month, end_date = end_date[:2], end_date[2:] + if month: + date_parts.append(month) + day, end_date = end_date[:2], end_date[2:] + if day: + date_parts.append(day) + hour, end_date = end_date[:2], end_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + end_key = '/'.join(date_parts) + container_listing = self.internal_proxy.get_container_list( + swift_account, + container_name, + marker=search_key) + results = [] + if container_listing is not None: + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if end_key and name > end_key: + break + if name not in listing_filter: + results.append(name) + return results + + def get_object_data(self, swift_account, container_name, object_name, + compressed=False): + '''reads an object and yields its lines''' + code, o = self.internal_proxy.get_object(swift_account, + container_name, + object_name) + if code < 200 or code >= 300: + return + last_part = '' + last_compressed_part = '' + # magic in the following zlib.decompressobj argument is courtesy of + # Python decompressing gzip chunk-by-chunk + # http://stackoverflow.com/questions/2423866 + d = zlib.decompressobj(16 + zlib.MAX_WBITS) + try: + for chunk in o: + if compressed: + try: + chunk = d.decompress(chunk) + except zlib.error: + self.logger.debug('Bad compressed data for %s/%s/%s' % + (swift_account, + container_name, + object_name)) + raise BadFileDownload() # bad compressed data + parts = chunk.split('\n') + parts[0] = last_part + parts[0] + for part in parts[:-1]: + yield part + last_part = parts[-1] + if last_part: + yield last_part + except ChunkReadTimeout: + raise BadFileDownload() + + def generate_keylist_mapping(self): + keylist = {} + for plugin in self.plugins: + plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() + if not plugin_keylist: + continue + for k, v in plugin_keylist.items(): + o = keylist.get(k) + if o: + if isinstance(o, set): + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = set(o) + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = v + keylist[k] = o + return keylist + + +class LogProcessorDaemon(Daemon): + """ + Gather raw log data and farm proccessing to generate a csv that is + uploaded to swift. + """ + + def __init__(self, conf): + c = conf.get('log-processor') + super(LogProcessorDaemon, self).__init__(c) + self.total_conf = conf + self.logger = get_logger(c) + self.log_processor = LogProcessor(conf, self.logger) + self.lookback_hours = int(c.get('lookback_hours', '120')) + self.lookback_window = int(c.get('lookback_window', + str(self.lookback_hours))) + self.log_processor_account = c['swift_account'] + self.log_processor_container = c.get('container_name', + 'log_processing_data') + self.worker_count = int(c.get('worker_count', '1')) + + def run_once(self): + self.logger.info("Beginning log processing") + start = time.time() + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = datetime.datetime.now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = datetime.timedelta(hours=self.lookback_window) + lookback_end = datetime.datetime.now() - \ + delta_hours + \ + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + processed_files_stream = self.log_processor.get_object_data( + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz', + compressed=True) + buf = '\n'.join(x for x in processed_files_stream) + if buf: + already_processed_files = cPickle.loads(buf) + else: + already_processed_files = set() + except: + already_processed_files = set() + self.logger.debug('found %d processed files' % \ + len(already_processed_files)) + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, + already_processed_files) + self.logger.info('loaded %d files to process' % len(logs_to_process)) + if not logs_to_process: + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time() - start) / 60)) + return + + # map + processor_args = (self.total_conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process, + self.worker_count) + + #reduce + aggr_data = {} + processed_files = already_processed_files + for item, data in results: + # since item contains the plugin and the log name, new plugins will + # "reprocess" the file and the results will be in the final csv. + processed_files.add(item) + for k, d in data.items(): + existing_data = aggr_data.get(k, {}) + for i, j in d.items(): + current = existing_data.get(i, 0) + # merging strategy for key collisions is addition + # processing plugins need to realize this + existing_data[i] = current + j + aggr_data[k] = existing_data + + # group + # reduce a large number of keys in aggr_data[k] to a small number of + # output keys + keylist_mapping = self.log_processor.generate_keylist_mapping() + final_info = collections.defaultdict(dict) + for account, data in aggr_data.items(): + for key, mapping in keylist_mapping.items(): + if isinstance(mapping, (list, set)): + value = 0 + for k in mapping: + try: + value += data[k] + except KeyError: + pass + else: + try: + value = data[mapping] + except KeyError: + value = 0 + final_info[account][key] = value + + # output + sorted_keylist_mapping = sorted(keylist_mapping) + columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) + out_buf = [columns] + for (account, year, month, day, hour), d in final_info.items(): + data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) + row = [data_ts] + row.append('%s' % account) + for k in sorted_keylist_mapping: + row.append('%s' % d[k]) + out_buf.append(','.join(row)) + out_buf = '\n'.join(out_buf) + h = hashlib.md5(out_buf).hexdigest() + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h + f = cStringIO.StringIO(out_buf) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + upload_name) + + # cleanup + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz') + + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time() - start) / 60)) + + +def multiprocess_collate(processor_args, logs_to_process, worker_count): + '''yield hourly data from logs_to_process''' + results = [] + in_queue = multiprocessing.Queue() + out_queue = multiprocessing.Queue() + for _ in range(worker_count): + p = multiprocessing.Process(target=collate_worker, + args=(processor_args, + in_queue, + out_queue)) + p.start() + results.append(p) + for x in logs_to_process: + in_queue.put(x) + for _ in range(worker_count): + in_queue.put(None) + count = 0 + while True: + try: + item, data = out_queue.get_nowait() + count += 1 + if data: + yield item, data + if count >= len(logs_to_process): + # this implies that one result will come from every request + break + except Queue.Empty: + time.sleep(.1) + for r in results: + r.join() + + +def collate_worker(processor_args, in_queue, out_queue): + '''worker process for multiprocess_collate''' + p = LogProcessor(*processor_args) + while True: + try: + item = in_queue.get_nowait() + if item is None: + break + except Queue.Empty: + time.sleep(.1) + else: + ret = p.process_one_file(*item) + out_queue.put((item, ret)) diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py new file mode 100644 index 0000000000..a8cc92739f --- /dev/null +++ b/swift/stats/log_uploader.py @@ -0,0 +1,170 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +import os +import hashlib +import time +import gzip +import glob +from paste.deploy import appconfig + +from swift.common.internal_proxy import InternalProxy +from swift.common.daemon import Daemon +from swift.common import utils + + +class LogUploader(Daemon): + ''' + Given a local directory, a swift account, and a container name, LogParser + will upload all files in the local directory to the given account/ + container. All but the newest files will be uploaded, and the files' md5 + sum will be computed. The hash is used to prevent duplicate data from + being uploaded multiple times in different files (ex: log lines). Since + the hash is computed, it is also used as the uploaded object's etag to + ensure data integrity. + + Note that after the file is successfully uploaded, it will be unlinked. + + The given proxy server config is used to instantiate a proxy server for + the object uploads. + ''' + + def __init__(self, uploader_conf, plugin_name): + super(LogUploader, self).__init__(uploader_conf) + log_dir = uploader_conf.get('log_dir', '/var/log/swift/') + swift_account = uploader_conf['swift_account'] + container_name = uploader_conf['container_name'] + source_filename_format = uploader_conf['source_filename_format'] + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, + name='proxy-server') + new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200')) + unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \ + ('true', 'on', '1', 'yes') + self.unlink_log = unlink_log + self.new_log_cutoff = new_log_cutoff + if not log_dir.endswith('/'): + log_dir = log_dir + '/' + self.log_dir = log_dir + self.swift_account = swift_account + self.container_name = container_name + self.filename_format = source_filename_format + self.internal_proxy = InternalProxy(proxy_server_conf) + log_name = 'swift-log-uploader-%s' % plugin_name + self.logger = utils.get_logger(uploader_conf, plugin_name) + + def run_once(self): + self.logger.info("Uploading logs") + start = time.time() + self.upload_all_logs() + self.logger.info("Uploading logs complete (%0.2f minutes)" % + ((time.time() - start) / 60)) + + def upload_all_logs(self): + i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] + i.sort() + year_offset = month_offset = day_offset = hour_offset = None + base_offset = len(self.log_dir) + for start, c in i: + offset = base_offset + start + if c == '%Y': + year_offset = offset, offset + 4 + # Add in the difference between len(%Y) and the expanded + # version of %Y (????). This makes sure the codes after this + # one will align properly in the final filename. + base_offset += 2 + elif c == '%m': + month_offset = offset, offset + 2 + elif c == '%d': + day_offset = offset, offset + 2 + elif c == '%H': + hour_offset = offset, offset + 2 + if not (year_offset and month_offset and day_offset and hour_offset): + # don't have all the parts, can't upload anything + return + glob_pattern = self.filename_format + glob_pattern = glob_pattern.replace('%Y', '????', 1) + glob_pattern = glob_pattern.replace('%m', '??', 1) + glob_pattern = glob_pattern.replace('%d', '??', 1) + glob_pattern = glob_pattern.replace('%H', '??', 1) + filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) + current_hour = int(time.strftime('%H')) + today = int(time.strftime('%Y%m%d')) + self.internal_proxy.create_container(self.swift_account, + self.container_name) + for filename in filelist: + try: + # From the filename, we need to derive the year, month, day, + # and hour for the file. These values are used in the uploaded + # object's name, so they should be a reasonably accurate + # representation of the time for which the data in the file was + # collected. The file's last modified time is not a reliable + # representation of the data in the file. For example, an old + # log file (from hour A) may be uploaded or moved into the + # log_dir in hour Z. The file's modified time will be for hour + # Z, and therefore the object's name in the system will not + # represent the data in it. + # If the filename doesn't match the format, it shouldn't be + # uploaded. + year = filename[slice(*year_offset)] + month = filename[slice(*month_offset)] + day = filename[slice(*day_offset)] + hour = filename[slice(*hour_offset)] + except IndexError: + # unexpected filename format, move on + self.logger.error("Unexpected log: %s" % filename) + continue + if ((time.time() - os.stat(filename).st_mtime) < + self.new_log_cutoff): + # don't process very new logs + self.logger.debug( + "Skipping log: %s (< %d seconds old)" % (filename, + self.new_log_cutoff)) + continue + self.upload_one_log(filename, year, month, day, hour) + + def upload_one_log(self, filename, year, month, day, hour): + if os.path.getsize(filename) == 0: + self.logger.debug("Log %s is 0 length, skipping" % filename) + return + self.logger.debug("Processing log: %s" % filename) + filehash = hashlib.md5() + already_compressed = True if filename.endswith('.gz') else False + opener = gzip.open if already_compressed else open + f = opener(filename, 'rb') + try: + for line in f: + # filter out bad lines here? + filehash.update(line) + finally: + f.close() + filehash = filehash.hexdigest() + # By adding a hash to the filename, we ensure that uploaded files + # have unique filenames and protect against uploading one file + # more than one time. By using md5, we get an etag for free. + target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) + if self.internal_proxy.upload_file(filename, + self.swift_account, + self.container_name, + target_filename, + compress=(not already_compressed)): + self.logger.debug("Uploaded log %s to %s" % + (filename, target_filename)) + if self.unlink_log: + os.unlink(filename) + else: + self.logger.error("ERROR: Upload of log %s failed!" % filename) diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py new file mode 100644 index 0000000000..6caaae7840 --- /dev/null +++ b/swift/stats/stats_processor.py @@ -0,0 +1,68 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.common.utils import get_logger + + +class StatsLogProcessor(object): + """Transform account storage stat logs""" + + def __init__(self, conf): + self.logger = get_logger(conf) + + def process(self, obj_stream, account, container, object_name): + '''generate hourly groupings of data from one stats log file''' + account_totals = {} + year, month, day, hour, _ = object_name.split('/') + for line in obj_stream: + if not line: + continue + try: + (account, + container_count, + object_count, + bytes_used) = line.split(',') + except (IndexError, ValueError): + # bad line data + self.logger.debug('Bad line data: %s' % repr(line)) + continue + account = account.strip('"') + container_count = int(container_count.strip('"')) + object_count = int(object_count.strip('"')) + bytes_used = int(bytes_used.strip('"')) + aggr_key = (account, year, month, day, hour) + d = account_totals.get(aggr_key, {}) + d['replica_count'] = d.setdefault('replica_count', 0) + 1 + d['container_count'] = d.setdefault('container_count', 0) + \ + container_count + d['object_count'] = d.setdefault('object_count', 0) + \ + object_count + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ + bytes_used + account_totals[aggr_key] = d + return account_totals + + def keylist_mapping(self): + ''' + returns a dictionary of final keys mapped to source keys + ''' + keylist_mapping = { + # : or + 'bytes_used': 'bytes_used', + 'container_count': 'container_count', + 'object_count': 'object_count', + 'replica_count': 'replica_count', + } + return keylist_mapping diff --git a/test/unit/common/test_compressing_file_reader.py b/test/unit/common/test_compressing_file_reader.py new file mode 100644 index 0000000000..5394a97a72 --- /dev/null +++ b/test/unit/common/test_compressing_file_reader.py @@ -0,0 +1,34 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Tests for swift.common.compressing_file_reader """ + +import unittest +import cStringIO + +from swift.common.compressing_file_reader import CompressingFileReader + +class TestCompressingFileReader(unittest.TestCase): + + def test_read(self): + plain = 'obj\ndata' + s = cStringIO.StringIO(plain) + expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\ + 'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\ + '\x00' + x = CompressingFileReader(s) + compressed = ''.join(iter(lambda: x.read(), '')) + self.assertEquals(compressed, expected) + self.assertEquals(x.read(), '') diff --git a/test/unit/common/test_internal_proxy.py b/test/unit/common/test_internal_proxy.py new file mode 100644 index 0000000000..248bf1cf23 --- /dev/null +++ b/test/unit/common/test_internal_proxy.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.common import internal_proxy + + +class TestInternalProxy(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 344cee4ec8..92be1077c0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -247,5 +247,33 @@ class TestUtils(unittest.TestCase): self.assert_(callable( utils.load_libc_function('some_not_real_function'))) + def test_readconf(self): + conf = '''[section1] +foo = bar + +[section2] +log_name = yarr''' + f = open('/tmp/test', 'wb') + f.write(conf) + f.close() + result = utils.readconf('/tmp/test') + expected = {'log_name': None, + 'section1': {'foo': 'bar'}, + 'section2': {'log_name': 'yarr'}} + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1') + expected = {'log_name': 'section1', 'foo': 'bar'} + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section2').get('log_name') + expected = 'yarr' + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name') + expected = 'foo' + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'}) + expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'} + self.assertEquals(result, expected) + os.unlink('/tmp/test') + if __name__ == '__main__': unittest.main() diff --git a/test/unit/stats/__init__.py b/test/unit/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/stats/test_access_processor.py b/test/unit/stats/test_access_processor.py new file mode 100644 index 0000000000..47013ca8ae --- /dev/null +++ b/test/unit/stats/test_access_processor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import access_processor + + +class TestAccessProcessor(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_account_stats.py b/test/unit/stats/test_account_stats.py new file mode 100644 index 0000000000..e318739dda --- /dev/null +++ b/test/unit/stats/test_account_stats.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import account_stats + + +class TestAccountStats(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py new file mode 100644 index 0000000000..4ff73eccf3 --- /dev/null +++ b/test/unit/stats/test_log_processor.py @@ -0,0 +1,227 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from swift.stats import log_processor + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + +class DumbInternalProxy(object): + def get_container_list(self, account, container, marker=None): + n = '2010/03/14/13/obj1' + if marker is None or n > marker: + return [{'name': n}] + else: + return [] + + def get_object(self, account, container, object_name): + code = 200 + if object_name.endswith('.gz'): + # same data as below, compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' + else: + def data(): + yield 'obj\n' + yield 'data' + return code, data() + +class TestLogProcessor(unittest.TestCase): + + access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\ + '09/Jul/2010/04/14/30 GET '\ + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' + stats_test_line = 'account,1,2,3' + proxy_config = {'log-processor': { + + } + } + + def test_access_log_line_parser(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) + self.assertEquals(result, {'code': 200, + 'processing_time': '0.0262', + 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', + 'month': '07', + 'second': '30', + 'year': '2010', + 'query': 'format=json&foo', + 'tz': '+0000', + 'http_version': 'HTTP/1.0', + 'object_name': 'bar', + 'etag': '-', + 'foo': 1, + 'method': 'GET', + 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', + 'client_ip': '1.2.3.4', + 'format': 1, + 'bytes_out': 95, + 'container_name': 'foo', + 'day': '09', + 'minute': '14', + 'account': 'acct', + 'hour': '04', + 'referrer': '-', + 'request': '/v1/acct/foo/bar', + 'user_agent': 'curl', + 'bytes_in': 6, + 'lb_ip': '4.5.6.7'}) + + def test_process_one_access_file(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + def get_object_data(*a, **kw): + return [self.access_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('access', 'a', 'c', 'o') + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(result, expected) + + def test_get_container_listing(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = p.get_container_listing('a', 'foo') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', listing_filter=expected) + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031412', + end_date='2010031414') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031414') + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031410', + end_date='2010031412') + expected = [] + self.assertEquals(result, expected) + + def test_get_object_data(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = list(p.get_object_data('a', 'c', 'o', False)) + expected = ['obj','data'] + self.assertEquals(result, expected) + result = list(p.get_object_data('a', 'c', 'o.gz', True)) + self.assertEquals(result, expected) + + def test_get_stat_totals(self): + stats_proxy_config = self.proxy_config.copy() + stats_proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.stats_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') + expected = {('account', 'y', 'm', 'd', 'h'): + {'replica_count': 1, + 'object_count': 2, + 'container_count': 1, + 'bytes_used': 3}} + self.assertEquals(result, expected) + + def test_generate_keylist_mapping(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + result = p.generate_keylist_mapping() + expected = {} + print p.plugins + self.assertEquals(result, expected) + + def test_generate_keylist_mapping_with_dummy_plugins(self): + class Plugin1(object): + def keylist_mapping(self): + return {'a': 'b', 'c': 'd', 'e': ['f', 'g']} + class Plugin2(object): + def keylist_mapping(self): + return {'a': '1', 'e': '2', 'h': '3'} + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p.plugins['plugin1'] = {'instance': Plugin1()} + p.plugins['plugin2'] = {'instance': Plugin2()} + result = p.generate_keylist_mapping() + expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']), + 'h': '3'} + self.assertEquals(result, expected) + + def test_access_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) + + def test_stats_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) diff --git a/test/unit/stats/test_log_uploader.py b/test/unit/stats/test_log_uploader.py new file mode 100644 index 0000000000..8e889ad918 --- /dev/null +++ b/test/unit/stats/test_log_uploader.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import log_uploader + + +class TestLogUploader(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_stats_processor.py b/test/unit/stats/test_stats_processor.py new file mode 100644 index 0000000000..4720d1f035 --- /dev/null +++ b/test/unit/stats/test_stats_processor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import stats_processor + + +class TestStatsProcessor(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main()