set up log-stats-collector as a daemon process to create csv files

This commit is contained in:
John Dickinson 2010-09-10 15:08:06 -05:00
parent de83ed2f9f
commit d8ad8ae473
8 changed files with 265 additions and 59 deletions

View File

@ -0,0 +1,27 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from swift.stats.log_processor import LogProcessorDaemon
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: swift-log-stats-collector CONFIG_FILE"
sys.exit()
conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
stats = LogProcessorDaemon(conf).run(once=True)

View File

@ -15,7 +15,6 @@
# limitations under the License.
import sys
import time
from swift.stats.log_uploader import LogUploader
from swift.common import utils

View File

@ -1,8 +1,8 @@
# plugin section format is named "log-processor-<plugin>"
# section "log-processor" is the generic defaults (overridden by plugins)
[log-processor]
# working_dir = /tmp/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# container_name = log_processing_data
# proxy_server_conf = /etc/swift/proxy-server.conf
# log_facility = LOG_LOCAL0
# log_level = INFO

View File

@ -530,19 +530,26 @@ def item_from_env(env, item_name):
def cache_from_env(env):
return item_from_env(env, 'swift.cache')
def readconf(conf, section_name, log_name=None):
def readconf(conf, section_name=None, log_name=None):
c = ConfigParser()
if not c.read(conf):
print "Unable to read config file %s" % conf
sys.exit(1)
if c.has_section(section_name):
conf = dict(c.items(section_name))
else:
print "Unable to find %s config section in %s" % (section_name, conf)
sys.exit(1)
if "log_name" not in conf:
if log_name is not None:
conf['log_name'] = log_name
if section_name:
if c.has_section(section_name):
conf = dict(c.items(section_name))
else:
conf['log_name'] = section_name
print "Unable to find %s config section in %s" % (section_name, conf)
sys.exit(1)
if "log_name" not in conf:
if log_name is not None:
conf['log_name'] = log_name
else:
conf['log_name'] = section_name
else:
conf = {}
for s in c.sections():
conf.update({s:dict(c.items(s))})
if 'log_name' not in conf:
conf['log_name'] = log_name
return conf

View File

@ -15,6 +15,7 @@
import collections
from urllib import unquote
import copy
from swift.common.utils import split_path
@ -116,7 +117,7 @@ class AccessLogProcessor(object):
d['code'] = int(d['code'])
return d
def process(self, obj_stream):
def process(self, obj_stream, account, container, object_name):
'''generate hourly groupings of data from one access log file'''
hourly_aggr_info = {}
for line in obj_stream:
@ -176,3 +177,48 @@ class AccessLogProcessor(object):
hourly_aggr_info[aggr_key] = d
return hourly_aggr_info
def keylist_mapping(self):
source_keys = 'service public'.split()
level_keys = 'account container object'.split()
verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
code_keys = '2xx 4xx 5xx'.split()
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'service_bw_in': ('service', 'bytes_in'),
'service_bw_out': ('service', 'bytes_out'),
'public_bw_in': ('public', 'bytes_in'),
'public_bw_out': ('public', 'bytes_out'),
'account_requests': set(),
'container_requests': set(),
'object_requests': set(),
'service_request': set(),
'public_request': set(),
'ops_count': set(),
}
for verb in verb_keys:
keylist_mapping[verb] = set()
for code in code_keys:
keylist_mapping[code] = set()
for source in source_keys:
for level in level_keys:
for verb in verb_keys:
for code in code_keys:
keylist_mapping['account_requests'].add(
(source, 'account', verb, code))
keylist_mapping['container_requests'].add(
(source, 'container', verb, code))
keylist_mapping['object_requests'].add(
(source, 'object', verb, code))
keylist_mapping['service_request'].add(
('service', level, verb, code))
keylist_mapping['public_request'].add(
('public', level, verb, code))
keylist_mapping[verb].add(
(source, level, verb, code))
keylist_mapping[code].add(
(source, level, verb, code))
keylist_mapping['ops_count'].add(
(source,level,verb,code))
return keylist_mapping

View File

@ -48,7 +48,7 @@ class AccountStat(Daemon):
src_filename = time.strftime(self.filename_format)
tmp_filename = os.path.join('/tmp', src_filename)
with open(tmp_filename, 'wb') as statfile:
#statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n')
#statfile.write('Account Name, Container Count, Object Count, Bytes Used\n')
for device in os.listdir(self.devices):
if self.mount_check and \
not os.path.ismount(os.path.join(self.devices, device)):
@ -68,16 +68,14 @@ class AccountStat(Daemon):
broker = AccountBroker(os.path.join(root, filename))
if not broker.is_deleted():
account_name,
created_at,
_, _,
_, _, _,
container_count,
object_count,
bytes_used,
_, _ = broker.get_info()
line_data = '"%s",%d,%d,%d,%s\n' % (account_name,
line_data = '"%s",%d,%d,%d\n' % (account_name,
container_count,
object_count,
bytes_used,
created_at)
bytes_used)
statfile.write(line_data)
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))

View File

@ -15,36 +15,23 @@
from ConfigParser import ConfigParser
import zlib
import time
import datetime
import cStringIO
import collections
from swift.common.internal_proxy import InternalProxy
from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger
class ConfigError(Exception):
pass
class MissingProxyConfig(ConfigError):
pass
from swift.common.utils import get_logger, readconf
class LogProcessor(object):
def __init__(self, conf, logger):
stats_conf = conf.get('log-processor', {})
working_dir = stats_conf.get('working_dir', '/tmp/swift/')
if working_dir.endswith('/') and len(working_dir) > 1:
working_dir = working_dir[:-1]
self.working_dir = working_dir
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
try:
c = ConfigParser()
c.read(proxy_server_conf_loc)
proxy_server_conf = dict(c.items('proxy-server'))
except:
raise
raise MissingProxyConfig()
self.proxy_server_conf = proxy_server_conf
self.proxy_server_conf = readconf(proxy_server_conf_loc, 'proxy-server')
if isinstance(logger, tuple):
self.logger = get_logger(*logger)
else:
@ -71,7 +58,10 @@ class LogProcessor(object):
stream = self.get_object_data(account, container, object_name,
compressed=compressed)
# look up the correct plugin and send the stream to it
return self.plugins[plugin_name]['instance'].process(stream)
return self.plugins[plugin_name]['instance'].process(stream,
account,
container,
object_name)
def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
total_list = []
@ -81,6 +71,8 @@ class LogProcessor(object):
l = self.get_container_listing(account, container, start_date,
end_date, listing_filter)
for i in l:
# The items in this list end up being passed as positional
# parameters to process_one_file.
total_list.append((p, account, container, i))
return total_list
@ -174,21 +166,146 @@ class LogProcessor(object):
except ChunkReadTimeout:
raise BadFileDownload()
def multiprocess_collate(processor_args,
start_date=None,
end_date=None,
listing_filter=None):
'''get listing of files and yield hourly data from them'''
p = LogProcessor(*processor_args)
all_files = p.get_data_list(start_date, end_date, listing_filter)
def generate_keylist_mapping(self):
keylist = {}
for plugin in self.plugins:
plugin_keylist = self.plugins['instance'].keylist_mapping()
for k, v in plugin_keylist.items():
o = keylist.get(k)
if o:
if isinstance(o, set):
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = set(o)
if isinstance(v, set):
o.update(v)
else:
o.update([v])
else:
o = v
keylist[k] = o
return keylist
p.logger.info('loaded %d files to process' % len(all_files))
if not all_files:
# no work to do
return
class LogProcessorDaemon(Daemon):
def __init__(self, conf):
super(self, LogProcessorDaemon).__init__(conf)
self.log_processor = LogProcessor(conf, self.logger)
c = readconf(conf)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window', '%s'%lookback_hours))
self.log_processor_account = c['swift_account']
self.log_processor_container = c.get('container_name', 'log_processing_data')
worker_count = multiprocessing.cpu_count() - 1
def run_once(self):
self.logger.info("Beginning log processing")
start = time.time()
if lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
lookback_start = datetime.datetime.now() - \
datetime.timedelta(hours=lookback_hours)
lookback_start = lookback_start.strftime('%Y%m%d')
if lookback_window == 0:
lookback_end = None
else:
lookback_end = datetime.datetime.now() - \
datetime.timedelta(hours=lookback_hours) + \
datetime.timedelta(hours=lookback_window)
lookback_end = lookback_end.strftime('%Y%m%d')
try:
processed_files_stream = self.log_processor,get_object_data(
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz',
compressed=True)
buf = ''.join(x for x in processed_files_stream)
already_processed_files = cPickle.loads(buf)
except:
already_processed_files = set()
logs_to_process = self.log_processor.get_data_list(lookback_start,
lookback_end,
already_processed_files)
self.logger.info('loaded %d files to process' % len(logs_to_process))
if not logs_to_process:
self.logger.info("Log processing done (%0.2f minutes)" %
((time.time()-start)/60))
return
# map
processor_args = (conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process)
#reduce
aggr_data = {}
processed_files = already_processed_files
for item, data in results.items():
# since item contains the plugin and the log name, new plugins will
# "reprocess" the file and the results will be in the final csv.
processed_files.append(item)
for k, d in data.items():
existing_data = aggr_data.get(k, {})
for i, j in d.items():
current = existing_data.get(i, 0)
# merging strategy for key collisions is addition
# processing plugins need to realize this
existing_data[i] = current + j
aggr_data[k] = existing_data
# group
# reduce a large number of keys in aggr_data[k] to a small number of
# output keys
keylist_mapping = generate_keylist_mapping()
final_info = collections.defaultdict(dict)
for account, data in rows.items():
for key, mapping in keylist_mapping.items():
if isinstance(mapping, (list, set)):
value = 0
for k in mapping:
try:
value += data[k]
except KeyError:
pass
else:
try:
value = data[mapping]
except KeyError:
value = 0
final_info[account][key] = value
# output
sorted_keylist_mapping = sorted(keylist_mapping)
columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping)
print columns
for (account, year, month, day, hour), d in final_info.items():
bill_ts = ''
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
row = [bill_ts, data_ts]
row.append('%s' % account)
for k in sorted_keylist_mapping:
row.append('%s'%d[k])
print ','.join(row)
# cleanup
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
self.log_processor.internal_proxy.upload_file(s,
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz')
self.logger.info("Log processing done (%0.2f minutes)" %
((time.time()-start)/60))
def multiprocess_collate(processor_args, logs_to_process):
'''yield hourly data from logs_to_process'''
worker_count = multiprocessing.cpu_count()
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
@ -199,7 +316,7 @@ def multiprocess_collate(processor_args,
out_queue))
p.start()
results.append(p)
for x in all_files:
for x in logs_to_process:
in_queue.put(x)
for _ in range(worker_count):
in_queue.put(None)
@ -229,6 +346,5 @@ def collate_worker(processor_args, in_queue, out_queue):
except Queue.Empty:
time.sleep(.1)
else:
ret = None
ret = p.process_one_file(item)
ret = p.process_one_file(*item)
out_queue.put((item, ret))

View File

@ -18,9 +18,10 @@ class StatsLogProcessor(object):
def __init__(self, conf):
pass
def process(self, obj_stream):
def process(self, obj_stream, account, container, object_name):
'''generate hourly groupings of data from one stats log file'''
account_totals = {}
year, month, day, hour, _ = object_name.split('/')
for line in obj_stream:
if not line:
continue
@ -35,17 +36,29 @@ class StatsLogProcessor(object):
object_count = int(object_count.strip('"'))
bytes_used = int(bytes_used.strip('"'))
created_at = created_at.strip('"')
d = account_totals.get(account, {})
d['count'] = d.setdefault('count', 0) + 1
aggr_key = (account, year, month, day, hour)
d = account_totals.get(aggr_key, {})
d['replica_count'] = d.setdefault('count', 0) + 1
d['container_count'] = d.setdefault('container_count', 0) + \
container_count
d['object_count'] = d.setdefault('object_count', 0) + \
object_count
d['bytes_used'] = d.setdefault('bytes_used', 0) + \
bytes_used
d['created_at'] = created_at
account_totals[account] = d
except (IndexError, ValueError):
# bad line data
pass
return account_totals
def keylist_mapping(self):
'''
returns a dictionary of final keys mapped to source keys
'''
keylist_mapping = {
# <db key> : <row key> or <set of row keys>
'bytes_used': 'bytes_used',
'container_count': 'container_count',
'object_count': 'object_count',
'replica_count': 'replica_count',
}