diff --git a/bin/st b/bin/st index 51a7637881..1a4bf34312 100755 --- a/bin/st +++ b/bin/st @@ -1132,6 +1132,11 @@ def st_stat(options, args): if not args: try: headers = conn.head_account() + if options.verbose > 1: + options.print_queue.put(''' +StorageURL: %s +Auth Token: %s +'''.strip('\n') % (conn.url, conn.token)) container_count = int(headers.get('x-account-container-count', 0)) object_count = int(headers.get('x-account-object-count', 0)) bytes_used = int(headers.get('x-account-bytes-used', 0)) @@ -1397,8 +1402,10 @@ Example: '''.strip('\n') % globals()) parser.add_option('-s', '--snet', action='store_true', dest='snet', default=False, help='Use SERVICENET internal network') - parser.add_option('-q', '--quiet', action='store_false', dest='verbose', - default=True, help='Suppress status output') + parser.add_option('-v', '--verbose', action='count', dest='verbose', + default=1, help='Print more info') + parser.add_option('-q', '--quiet', action='store_const', dest='verbose', + const=0, default=1, help='Suppress status output') parser.add_option('-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicate that you really want the ' 'whole account for commands that require --all in such ' diff --git a/bin/swift-account-stats-logger b/bin/swift-account-stats-logger new file mode 100755 index 0000000000..c42554de82 --- /dev/null +++ b/bin/swift-account-stats-logger @@ -0,0 +1,27 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.account_stats import AccountStat +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 2: + print "Usage: swift-account-stats-logger CONFIG_FILE" + sys.exit() + stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats') + stats = AccountStat(stats_conf).run(once=True) diff --git a/bin/swift-bench b/bin/swift-bench new file mode 100755 index 0000000000..e058882acb --- /dev/null +++ b/bin/swift-bench @@ -0,0 +1,132 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys +import signal +import uuid +from optparse import OptionParser + +from swift.common.bench import BenchController +from swift.common.utils import readconf, NamedLogger + +# The defaults should be sufficient to run swift-bench on a SAIO +CONF_DEFAULTS = { + 'auth': '', + 'user': '', + 'key': '', + 'object_sources': '', + 'put_concurrency': '10', + 'get_concurrency': '10', + 'del_concurrency': '10', + 'concurrency': '', + 'object_size': '1', + 'num_objects': '1000', + 'num_gets': '10000', + 'delete': 'yes', + 'container_name': uuid.uuid4().hex, + 'use_proxy': 'yes', + 'url': '', + 'account': '', + 'devices': 'sdb1', + 'log_level': 'INFO', + 'timeout': '10', + } + +SAIO_DEFAULTS = { + 'auth': 'http://saio:11000/v1.0', + 'user': 'test:tester', + 'key': 'testing', + } + +if __name__ == '__main__': + usage = "usage: %prog [OPTIONS] [CONF_FILE]" + usage += """\n\nConf file with SAIO defaults: + + [bench] + auth = http://saio:11000/v1.0 + user = test:tester + key = testing + concurrency = 10 + object_size = 1 + num_objects = 1000 + num_gets = 10000 + delete = yes + """ + parser = OptionParser(usage=usage) + parser.add_option('', '--saio', dest='saio', action='store_true', + default=False, help='Run benchmark with SAIO defaults') + parser.add_option('-A', '--auth', dest='auth', + help='URL for obtaining an auth token') + parser.add_option('-U', '--user', dest='user', + help='User name for obtaining an auth token') + parser.add_option('-K', '--key', dest='key', + help='Key for obtaining an auth token') + parser.add_option('-u', '--url', dest='url', + help='Storage URL') + parser.add_option('-c', '--concurrency', dest='concurrency', + help='Number of concurrent connections to use') + parser.add_option('-s', '--object-size', dest='object_size', + help='Size of objects to PUT (in bytes)') + parser.add_option('-n', '--num-objects', dest='num_objects', + help='Number of objects to PUT') + parser.add_option('-g', '--num-gets', dest='num_gets', + help='Number of GET operations to perform') + parser.add_option('-x', '--no-delete', dest='delete', action='store_false', + help='If set, will not delete the objects created') + + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + options, args = parser.parse_args() + if options.saio: + CONF_DEFAULTS.update(SAIO_DEFAULTS) + if args: + conf = args[0] + if not os.path.exists(conf): + sys.exit("No such conf file: %s" % conf) + conf = readconf(conf, 'bench', log_name='swift-bench', + defaults=CONF_DEFAULTS) + else: + conf = CONF_DEFAULTS + parser.set_defaults(**conf) + options, _ = parser.parse_args() + if options.concurrency is not '': + options.put_concurrency = options.concurrency + options.get_concurrency = options.concurrency + options.del_concurrency = options.concurrency + + def sigterm(signum, frame): + sys.exit('Termination signal received.') + signal.signal(signal.SIGTERM, sigterm) + + logger = logging.getLogger() + logger.setLevel({ + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL}.get( + options.log_level.lower(), logging.INFO)) + loghandler = logging.StreamHandler() + logformat = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + loghandler.setFormatter(logformat) + logger.addHandler(loghandler) + logger = NamedLogger(logger, 'swift-bench') + + controller = BenchController(logger, options) + controller.run() diff --git a/bin/swift-log-stats-collector b/bin/swift-log-stats-collector new file mode 100755 index 0000000000..d21135b35c --- /dev/null +++ b/bin/swift-log-stats-collector @@ -0,0 +1,27 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.log_processor import LogProcessorDaemon +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 2: + print "Usage: swift-log-stats-collector CONFIG_FILE" + sys.exit() + conf = utils.readconf(sys.argv[1], log_name='log-stats-collector') + stats = LogProcessorDaemon(conf).run(once=True) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader new file mode 100755 index 0000000000..b557e4c167 --- /dev/null +++ b/bin/swift-log-uploader @@ -0,0 +1,31 @@ +#!/usr/bin/python +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from swift.stats.log_uploader import LogUploader +from swift.common import utils + +if __name__ == '__main__': + if len(sys.argv) < 3: + print "Usage: swift-log-uploader CONFIG_FILE plugin" + sys.exit() + uploader_conf = utils.readconf(sys.argv[1], 'log-processor') + plugin = sys.argv[2] + section_name = 'log-processor-%s' % plugin + plugin_conf = utils.readconf(sys.argv[1], section_name) + uploader_conf.update(plugin_conf) + uploader = LogUploader(uploader_conf, plugin).run(once=True) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 0c2e52e59a..df5b4f642d 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -47,7 +47,7 @@ If you need more throughput to either Account or Container Services, they may each be deployed to their own servers. For example you might use faster (but more expensive) SAS or even SSD drives to get faster disk I/O to the databases. -Load balancing and network design is left as an excercise to the reader, +Load balancing and network design is left as an exercise to the reader, but this is a very important part of the cluster, so time should be spent designing the network for a Swift cluster. @@ -59,7 +59,7 @@ Preparing the Ring The first step is to determine the number of partitions that will be in the ring. We recommend that there be a minimum of 100 partitions per drive to -insure even distribution accross the drives. A good starting point might be +insure even distribution across the drives. A good starting point might be to figure out the maximum number of drives the cluster will contain, and then multiply by 100, and then round up to the nearest power of two. @@ -154,8 +154,8 @@ Option Default Description ------------------ ---------- --------------------------------------------- swift_dir /etc/swift Swift configuration directory devices /srv/node Parent directory of where devices are mounted -mount_check true Weather or not check if the devices are - mounted to prevent accidently writing +mount_check true Whether or not check if the devices are + mounted to prevent accidentally writing to the root device bind_ip 0.0.0.0 IP Address for server to bind to bind_port 6000 Port for server to bind to @@ -173,7 +173,7 @@ use paste.deploy entry point for the object log_name object-server Label used when logging log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level -log_requests True Weather or not to log each request +log_requests True Whether or not to log each request user swift User to run as node_timeout 3 Request timeout to external services conn_timeout 0.5 Connection timeout to external services @@ -193,7 +193,7 @@ Option Default Description log_name object-replicator Label used when logging log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level -daemonize yes Weather or not to run replication as a +daemonize yes Whether or not to run replication as a daemon run_pause 30 Time in seconds to wait between replication passes @@ -249,9 +249,9 @@ The following configuration options are available: Option Default Description ------------------ ---------- -------------------------------------------- swift_dir /etc/swift Swift configuration directory -devices /srv/node Parent irectory of where devices are mounted -mount_check true Weather or not check if the devices are - mounted to prevent accidently writing +devices /srv/node Parent directory of where devices are mounted +mount_check true Whether or not check if the devices are + mounted to prevent accidentally writing to the root device bind_ip 0.0.0.0 IP Address for server to bind to bind_port 6001 Port for server to bind to @@ -339,8 +339,8 @@ Option Default Description ------------------ ---------- --------------------------------------------- swift_dir /etc/swift Swift configuration directory devices /srv/node Parent directory or where devices are mounted -mount_check true Weather or not check if the devices are - mounted to prevent accidently writing +mount_check true Whether or not check if the devices are + mounted to prevent accidentally writing to the root device bind_ip 0.0.0.0 IP Address for server to bind to bind_port 6002 Port for server to bind to @@ -353,7 +353,7 @@ user swift User to run as ================== ============== ========================================== Option Default Description ------------------ -------------- ------------------------------------------ -use paste.deploy entry point for the account +use Entry point for paste.deploy for the account server. For most cases, this should be `egg:swift#account`. log_name account-server Label used when logging @@ -412,6 +412,11 @@ conn_timeout 0.5 Connection timeout to external services Proxy Server Configuration -------------------------- +An example Proxy Server configuration can be found at +etc/proxy-server.conf-sample in the source code repository. + +The following configuration options are available: + [DEFAULT] ============================ =============== ============================= @@ -432,7 +437,7 @@ key_file Path to the ssl .key ============================ =============== ============================= Option Default Description ---------------------------- --------------- ----------------------------- -use paste.deploy entry point for +use Entry point for paste.deploy for the proxy server. For most cases, this should be `egg:swift#proxy`. @@ -443,10 +448,10 @@ log_headers True If True, log headers in each request recheck_account_existence 60 Cache timeout in seconds to send memcached for account - existance + existence recheck_container_existence 60 Cache timeout in seconds to send memcached for container - existance + existence object_chunk_size 65536 Chunk size to read from object servers client_chunk_size 65536 Chunk size to read from @@ -474,7 +479,7 @@ rate_limit_account_whitelist Comma separated list of rate limit rate_limit_account_blacklist Comma separated list of account name hashes to block - completly + completely ============================ =============== ============================= [auth] @@ -482,7 +487,7 @@ rate_limit_account_blacklist Comma separated list of ============ =================================== ======================== Option Default Description ------------ ----------------------------------- ------------------------ -use paste.deploy entry point +use Entry point for paste.deploy to use for auth. To use the swift dev auth, set to: @@ -500,7 +505,7 @@ Memcached Considerations ------------------------ Several of the Services rely on Memcached for caching certain types of -lookups, such as auth tokens, and container/account existance. Swift does +lookups, such as auth tokens, and container/account existence. Swift does not do any caching of actual object data. Memcached should be able to run on any servers that have available RAM and CPU. At Rackspace, we run Memcached on the proxy servers. The `memcache_servers` config option @@ -526,7 +531,7 @@ Most services support either a worker or concurrency value in the settings. This allows the services to make effective use of the cores available. A good starting point to set the concurrency level for the proxy and storage services to 2 times the number of cores available. If more than one service is -sharing a server, then some experimentaiton may be needed to find the best +sharing a server, then some experimentation may be needed to find the best balance. At Rackspace, our Proxy servers have dual quad core processors, giving us 8 @@ -548,7 +553,7 @@ Filesystem Considerations ------------------------- Swift is designed to be mostly filesystem agnostic--the only requirement -beeing that the filesystem supports extended attributes (xattrs). After +being that the filesystem supports extended attributes (xattrs). After thorough testing with our use cases and hardware configurations, XFS was the best all-around choice. If you decide to use a filesystem other than XFS, we highly recommend thorough testing. @@ -611,5 +616,5 @@ Logging Considerations Swift is set up to log directly to syslog. Every service can be configured with the `log_facility` option to set the syslog log facility destination. -It is recommended to use syslog-ng to route the logs to specific log +We recommended using syslog-ng to route the logs to specific log files locally on the server and also to remote log collecting servers. diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 9365665cb9..17a443d692 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -7,9 +7,7 @@ Instructions for setting up a dev VM ------------------------------------ This documents setting up a virtual machine for doing Swift development. The -virtual machine will emulate running a four node Swift cluster. It assumes -you're using *VMware Fusion 3* on *Mac OS X Snow Leopard*, but should give a -good idea what to do on other environments. +virtual machine will emulate running a four node Swift cluster. * Get the *Ubuntu 10.04 LTS (Lucid Lynx)* server image: @@ -17,20 +15,9 @@ good idea what to do on other environments. - Ubuntu Live/Install: http://cdimage.ubuntu.com/releases/10.04/release/ubuntu-10.04-dvd-amd64.iso (4.1 GB) - Ubuntu Mirrors: https://launchpad.net/ubuntu/+cdmirrors -* Create guest virtual machine: - - #. `Continue without disc` - #. `Use operating system installation disc image file`, pick the .iso - from above. - #. Select `Linux` and `Ubuntu 64-bit`. - #. Fill in the *Linux Easy Install* details. - #. `Customize Settings`, name the image whatever you want - (`SAIO` for instance.) - #. When the `Settings` window comes up, select `Hard Disk`, create an - extra disk (the defaults are fine). - #. Start the virtual machine up and wait for the easy install to - finish. - +* Create guest virtual machine from the Ubuntu image (if you are going to use + a separate partition for swift data, be sure to add another device when + creating the VM) * As root on guest (you'll have to log in as you, then `sudo su -`): #. `apt-get install python-software-properties` @@ -41,11 +28,22 @@ good idea what to do on other environments. python-xattr sqlite3 xfsprogs python-webob python-eventlet python-greenlet python-pastedeploy` #. Install anything else you want, like screen, ssh, vim, etc. - #. `fdisk /dev/sdb` (set up a single partition) - #. `mkfs.xfs -i size=1024 /dev/sdb1` + #. If you would like to use another partition for storage: + + #. `fdisk /dev/sdb` (set up a single partition) + #. `mkfs.xfs -i size=1024 /dev/sdb1` + #. Edit `/etc/fstab` and add + `/dev/sdb1 /mnt/sdb1 xfs noatime,nodiratime,nobarrier,logbufs=8 0 0` + + #. If you would like to use a loopback device instead of another partition: + + #. `dd if=/dev/zero of=/srv/swift-disk bs=1024 count=0 seek=1000000` + (modify seek to make a larger or smaller partition) + #. `mkfs.xfs -i size=1024 /srv/swift-disk` + #. Edit `/etc/fstab` and add + `/srv/swift-disk /mnt/sdb1 xfs loop,noatime,nodiratime,nobarrier,logbufs=8 0 0` + #. `mkdir /mnt/sdb1` - #. Edit `/etc/fstab` and add - `/dev/sdb1 /mnt/sdb1 xfs noatime,nodiratime,nobarrier,logbufs=8 0 0` #. `mount /mnt/sdb1` #. `mkdir /mnt/sdb1/1 /mnt/sdb1/2 /mnt/sdb1/3 /mnt/sdb1/4 /mnt/sdb1/test` #. `chown : /mnt/sdb1/*` @@ -56,7 +54,7 @@ good idea what to do on other environments. #. Add to `/etc/rc.local` (before the `exit 0`):: mkdir /var/run/swift - chown : /var/run/swift + chown : /var/run/swift #. Create /etc/rsyncd.conf:: @@ -64,7 +62,7 @@ good idea what to do on other environments. gid = log file = /var/log/rsyncd.log pid file = /var/run/rsyncd.pid - + address = 127.0.0.1 [account6012] max connections = 25 @@ -472,6 +470,11 @@ good idea what to do on other environments. sudo service rsyslog restart sudo service memcached restart + .. note:: + + If you are using a loopback device, substitute `/dev/sdb1` above with + `/srv/swift-disk` + #. Create `~/bin/remakerings`:: #!/bin/bash diff --git a/doc/source/index.rst b/doc/source/index.rst index d782eb99cb..6e5a7f6592 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -24,6 +24,7 @@ Overview: overview_reaper overview_auth overview_replication + overview_stats rate_limiting Development: diff --git a/doc/source/overview_stats.rst b/doc/source/overview_stats.rst new file mode 100644 index 0000000000..6364de4611 --- /dev/null +++ b/doc/source/overview_stats.rst @@ -0,0 +1,184 @@ +================== +Swift stats system +================== + +The swift stats system is composed of three parts parts: log creation, log +uploading, and log processing. The system handles two types of logs (access +and account stats), but it can be extended to handle other types of logs. + +--------- +Log Types +--------- + +*********** +Access logs +*********** + +Access logs are the proxy server logs. Rackspace uses syslog-ng to redirect +the proxy log output to an hourly log file. For example, a proxy request that +is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412. +This allows easy log rotation and easy per-hour log processing. + +****************** +Account stats logs +****************** + +Account stats logs are generated by a stats system process. +swift-account-stats-logger runs on each account server (via cron) and walks +the filesystem looking for account databases. When an account database is +found, the logger selects the account hash, bytes_used, container_count, and +object_count. These values are then written out as one line in a csv file. One +csv file is produced for every run of swift-account-stats-logger. This means +that, system wide, one csv file is produced for every storage node. Rackspace +runs the account stats logger every hour. Therefore, in a cluster of ten +account servers, ten csv files are produced every hour. Also, every account +will have one entry for every replica in the system. On average, there will be +three copies of each account in the aggregate of all account stat csv files +created in one system-wide run. + +---------------------- +Log Processing plugins +---------------------- + +The swift stats system is written to allow a plugin to be defined for every +log type. Swift includes plugins for both access logs and storage stats logs. +Each plugin is responsible for defining, in a config section, where the logs +are stored on disk, where the logs will be stored in swift (account and +container), the filename format of the logs on disk, the location of the +plugin class definition, and any plugin-specific config values. + +The plugin class definition defines three methods. The constructor must accept +one argument (the dict representation of the plugin's config section). The +process method must accept an iterator, and the account, container, and object +name of the log. The keylist_mapping accepts no parameters. + +------------- +Log Uploading +------------- + +swift-log-uploader accepts a config file and a plugin name. It finds the log +files on disk according to the plugin config section and uploads them to the +swift cluster. This means one uploader process will run on each proxy server +node and each account server node. To not upload partially-written log files, +the uploader will not upload files with an mtime of less than two hours ago. +Rackspace runs this process once an hour via cron. + +-------------- +Log Processing +-------------- + +swift-log-stats-collector accepts a config file and generates a csv that is +uploaded to swift. It loads all plugins defined in the config file, generates +a list of all log files in swift that need to be processed, and passes an +iterable of the log file data to the appropriate plugin's process method. The +process method returns a dictionary of data in the log file keyed on (account, +year, month, day, hour). The log-stats-collector process then combines all +dictionaries from all calls to a process method into one dictionary. Key +collisions within each (account, year, month, day, hour) dictionary are +summed. Finally, the summed dictionary is mapped to the final csv values with +each plugin's keylist_mapping method. + +The resulting csv file has one line per (account, year, month, day, hour) for +all log files processed in that run of swift-log-stats-collector. + + +================================ +Running the stats system on SAIO +================================ + +#. Create a swift account to use for storing stats information, and note the + account hash. The hash will be used in config files. + +#. Install syslog-ng:: + + sudo apt-get install syslog-ng + +#. Add the following to the end of `/etc/syslog-ng/syslog-ng.conf`:: + + # Added for swift logging + destination df_local1 { file("/var/log/swift/proxy.log" owner() group()); }; + destination df_local1_err { file("/var/log/swift/proxy.error" owner() group()); }; + destination df_local1_hourly { file("/var/log/swift/hourly/$YEAR$MONTH$DAY$HOUR" owner() group()); }; + filter f_local1 { facility(local1) and level(info); }; + + filter f_local1_err { facility(local1) and not level(info); }; + + # local1.info -/var/log/swift/proxy.log + # write to local file and to remove log server + log { + source(s_all); + filter(f_local1); + destination(df_local1); + destination(df_local1_hourly); + }; + + # local1.error -/var/log/swift/proxy.error + # write to local file and to remove log server + log { + source(s_all); + filter(f_local1_err); + destination(df_local1_err); + }; + +#. Restart syslog-ng + +#. Create the log directories:: + + mkdir /var/log/swift/hourly + mkdir /var/log/swift/stats + chown -R : /var/log/swift + +#. Create `/etc/swift/log-processor.conf`:: + + [log-processor] + swift_account = + user = + + [log-processor-access] + swift_account = + container_name = log_data + log_dir = /var/log/swift/hourly/ + source_filename_format = %Y%m%d%H + class_path = swift.stats.access_processor.AccessLogProcessor + user = + + [log-processor-stats] + swift_account = + container_name = account_stats + log_dir = /var/log/swift/stats/ + source_filename_format = %Y%m%d%H_* + class_path = swift.stats.stats_processor.StatsLogProcessor + account_server_conf = /etc/swift/account-server/1.conf + user = + +#. Add the following under [app:proxy-server] in `/etc/swift/proxy-server.conf`:: + + log_facility = LOG_LOCAL1 + +#. Create a `cron` job to run once per hour to create the stats logs. In + `/etc/cron.d/swift-stats-log-creator`:: + + 0 * * * * swift-account-stats-logger /etc/swift/log-processor.conf + +#. Create a `cron` job to run once per hour to upload the stats logs. In + `/etc/cron.d/swift-stats-log-uploader`:: + + 10 * * * * swift-log-uploader /etc/swift/log-processor.conf stats + +#. Create a `cron` job to run once per hour to upload the access logs. In + `/etc/cron.d/swift-access-log-uploader`:: + + 5 * * * * swift-log-uploader /etc/swift/log-processor.conf access + +#. Create a `cron` job to run once per hour to process the logs. In + `/etc/cron.d/swift-stats-processor`:: + + 30 * * * * swift-log-stats-collector /etc/swift/log-processor.conf + +After running for a few hours, you should start to see .csv files in the +log_processing_data container in the swift stats account that was created +earlier. This file will have one entry per account per hour for each account +with activity in that hour. One .csv file should be produced per hour. Note +that the stats will be delayed by at least two hours by default. This can be +changed with the new_log_cutoff variable in the config file. See +`log-processing.conf-sample` for more details. \ No newline at end of file diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample new file mode 100644 index 0000000000..11805add0b --- /dev/null +++ b/etc/log-processing.conf-sample @@ -0,0 +1,39 @@ +# plugin section format is named "log-processor-" + +[log-processor] +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +# container_name = log_processing_data +# proxy_server_conf = /etc/swift/proxy-server.conf +# log_facility = LOG_LOCAL0 +# log_level = INFO +# lookback_hours = 120 +# lookback_window = 120 +# user = swift + +[log-processor-access] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = log_data +source_filename_format = access-%Y%m%d%H +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.access_processor.AccessLogProcessor +# service ips is for client ip addresses that should be counted as servicenet +# service_ips = +# load balancer private ips is for load balancer ip addresses that should be +# counted as servicenet +# lb_private_ips = +# server_name = proxy +# user = swift +# warn_percent = 0.8 + +[log-processor-stats] +# log_dir = /var/log/swift/ +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 +container_name = account_stats +source_filename_format = stats-%Y%m%d%H_* +# new_log_cutoff = 7200 +# unlink_log = True +class_path = swift.stats.stats_processor.StatsLogProcessor +# account_server_conf = /etc/swift/account-server.conf +# user = swift \ No newline at end of file diff --git a/setup.py b/setup.py index 7a8898d643..4cb3ac576e 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,11 @@ setup( 'bin/swift-object-server', 'bin/swift-object-updater', 'bin/swift-proxy-server', 'bin/swift-ring-builder', 'bin/swift-stats-populate', - 'bin/swift-stats-report' + 'bin/swift-stats-report', + 'bin/swift-bench', + 'bin/swift-log-uploader', + 'bin/swift-log-stats-collector', + 'bin/swift-account-stats-logger', ], entry_points={ 'paste.app_factory': [ diff --git a/swift/common/bench.py b/swift/common/bench.py new file mode 100644 index 0000000000..bb1dbf3af2 --- /dev/null +++ b/swift/common/bench.py @@ -0,0 +1,236 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +import time +import random +from urlparse import urlparse +from contextlib import contextmanager + +import eventlet.pools +from eventlet.green.httplib import CannotSendRequest + +from swift.common.utils import TRUE_VALUES +from swift.common import client +from swift.common import direct_client + + +class ConnectionPool(eventlet.pools.Pool): + + def __init__(self, url, size): + self.url = url + eventlet.pools.Pool.__init__(self, size, size) + + def create(self): + return client.http_connection(self.url) + + +class Bench(object): + + def __init__(self, logger, conf, names): + self.logger = logger + self.user = conf.user + self.key = conf.key + self.auth_url = conf.auth + self.use_proxy = conf.use_proxy in TRUE_VALUES + if self.use_proxy: + url, token = client.get_auth(self.auth_url, self.user, self.key) + self.token = token + self.account = url.split('/')[-1] + if conf.url == '': + self.url = url + else: + self.url = conf.url + else: + self.token = 'SlapChop!' + self.account = conf.account + self.url = conf.url + self.ip, self.port = self.url.split('/')[2].split(':') + self.container_name = conf.container_name + + self.object_size = int(conf.object_size) + self.object_sources = conf.object_sources + self.files = [] + if self.object_sources: + self.object_sources = self.object_sources.split() + self.files = [file(f, 'rb').read() for f in self.object_sources] + + self.put_concurrency = int(conf.put_concurrency) + self.get_concurrency = int(conf.get_concurrency) + self.del_concurrency = int(conf.del_concurrency) + self.total_objects = int(conf.num_objects) + self.total_gets = int(conf.num_gets) + self.timeout = int(conf.timeout) + self.devices = conf.devices.split() + self.names = names + self.conn_pool = ConnectionPool(self.url, + max(self.put_concurrency, self.get_concurrency, + self.del_concurrency)) + + def _log_status(self, title): + total = time.time() - self.beginbeat + self.logger.info('%s %s [%s failures], %.01f/s' % ( + self.complete, title, self.failures, + (float(self.complete) / total), + )) + + @contextmanager + def connection(self): + try: + hc = self.conn_pool.get() + try: + yield hc + except CannotSendRequest: + self.logger.info("CannotSendRequest. Skipping...") + try: + hc.close() + except: + pass + self.failures += 1 + hc = self.conn_pool.create() + finally: + self.conn_pool.put(hc) + + def run(self): + pool = eventlet.GreenPool(self.concurrency) + events = [] + self.beginbeat = self.heartbeat = time.time() + self.heartbeat -= 13 # just to get the first report quicker + self.failures = 0 + self.complete = 0 + for i in xrange(self.total): + pool.spawn_n(self._run, i) + pool.waitall() + self._log_status(self.msg + ' **FINAL**') + + def _run(self, thread): + return + + +class BenchController(object): + + def __init__(self, logger, conf): + self.logger = logger + self.conf = conf + self.names = [] + self.delete = conf.delete in TRUE_VALUES + self.gets = int(conf.num_gets) + + def run(self): + puts = BenchPUT(self.logger, self.conf, self.names) + puts.run() + if self.gets: + gets = BenchGET(self.logger, self.conf, self.names) + gets.run() + if self.delete: + dels = BenchDELETE(self.logger, self.conf, self.names) + dels.run() + + +class BenchDELETE(Bench): + + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.del_concurrency + self.total = len(names) + self.msg = 'DEL' + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('DEL') + device, partition, name = self.names.pop() + with self.connection() as conn: + try: + if self.use_proxy: + client.delete_object(self.url, self.token, + self.container_name, name, http_conn=conn) + else: + node = {'ip': self.ip, 'port': self.port, 'device': device} + direct_client.direct_delete_object(node, partition, + self.account, self.container_name, name) + except client.ClientException, e: + self.logger.debug(str(e)) + self.failures += 1 + self.complete += 1 + + +class BenchGET(Bench): + + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.get_concurrency + self.total = self.total_gets + self.msg = 'GETS' + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('GETS') + device, partition, name = random.choice(self.names) + with self.connection() as conn: + try: + if self.use_proxy: + client.get_object(self.url, self.token, + self.container_name, name, http_conn=conn) + else: + node = {'ip': self.ip, 'port': self.port, 'device': device} + direct_client.direct_get_object(node, partition, + self.account, self.container_name, name) + except client.ClientException, e: + self.logger.debug(str(e)) + self.failures += 1 + self.complete += 1 + + +class BenchPUT(Bench): + + def __init__(self, logger, conf, names): + Bench.__init__(self, logger, conf, names) + self.concurrency = self.put_concurrency + self.total = self.total_objects + self.msg = 'PUTS' + if self.use_proxy: + with self.connection() as conn: + client.put_container(self.url, self.token, + self.container_name, http_conn=conn) + + def _run(self, thread): + if time.time() - self.heartbeat >= 15: + self.heartbeat = time.time() + self._log_status('PUTS') + name = uuid.uuid4().hex + if self.object_sources: + source = random.choice(self.files) + else: + source = '0' * self.object_size + device = random.choice(self.devices) + partition = str(random.randint(1, 3000)) + with self.connection() as conn: + try: + if self.use_proxy: + client.put_object(self.url, self.token, + self.container_name, name, source, + content_length=len(source), http_conn=conn) + else: + node = {'ip': self.ip, 'port': self.port, 'device': device} + direct_client.direct_put_object(node, partition, + self.account, self.container_name, name, source, + content_length=len(source)) + except client.ClientException, e: + self.logger.debug(str(e)) + self.failures += 1 + self.names.append((device, partition, name)) + self.complete += 1 diff --git a/swift/common/client.py b/swift/common/client.py index 22562ede37..06c3dab067 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -18,7 +18,7 @@ Cloud Files client library used internally """ import socket from cStringIO import StringIO -from httplib import HTTPConnection, HTTPException, HTTPSConnection +from httplib import HTTPException, HTTPSConnection from re import compile, DOTALL from tokenize import generate_tokens, STRING, NAME, OP from urllib import quote as _quote, unquote @@ -29,6 +29,8 @@ try: except: from time import sleep +from swift.common.bufferedhttp \ + import BufferedHTTPConnection as HTTPConnection def quote(value, safe='/'): """ diff --git a/swift/common/compressing_file_reader.py b/swift/common/compressing_file_reader.py new file mode 100644 index 0000000000..d6de9154eb --- /dev/null +++ b/swift/common/compressing_file_reader.py @@ -0,0 +1,73 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zlib +import struct + + +class CompressingFileReader(object): + ''' + Wraps a file object and provides a read method that returns gzip'd data. + + One warning: if read is called with a small value, the data returned may + be bigger than the value. In this case, the "compressed" data will be + bigger than the original data. To solve this, use a bigger read buffer. + + An example use case: + Given an uncompressed file on disk, provide a way to read compressed data + without buffering the entire file data in memory. Using this class, an + uncompressed log file could be uploaded as compressed data with chunked + transfer encoding. + + gzip header and footer code taken from the python stdlib gzip module + + :param file_obj: File object to read from + :param compresslevel: compression level + ''' + + def __init__(self, file_obj, compresslevel=9): + self._f = file_obj + self._compressor = zlib.compressobj(compresslevel, + zlib.DEFLATED, + -zlib.MAX_WBITS, + zlib.DEF_MEM_LEVEL, + 0) + self.done = False + self.first = True + self.crc32 = 0 + self.total_size = 0 + + def read(self, *a, **kw): + if self.done: + return '' + x = self._f.read(*a, **kw) + if x: + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL + self.total_size += len(x) + compressed = self._compressor.compress(x) + if not compressed: + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) + else: + compressed = self._compressor.flush(zlib.Z_FINISH) + crc32 = struct.pack("= 300: + raise ClientException( + 'Object server %s:%s direct PUT %s gave status %s' % + (node['ip'], node['port'], + repr('/%s/%s%s' % (node['device'], part, path)), + resp.status), + http_host=node['ip'], http_port=node['port'], + http_device=node['device'], http_status=resp.status, + http_reason=resp.reason) + return resp.getheader('etag').strip('"') + + def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers={}): """ diff --git a/swift/common/internal_proxy.py b/swift/common/internal_proxy.py new file mode 100644 index 0000000000..9951083ac6 --- /dev/null +++ b/swift/common/internal_proxy.py @@ -0,0 +1,210 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import webob +from urllib import quote, unquote +from json import loads as json_loads + +from swift.common.compressing_file_reader import CompressingFileReader +from swift.proxy.server import BaseApplication + + +class MemcacheStub(object): + + def get(self, *a, **kw): + return None + + def set(self, *a, **kw): + return None + + def incr(self, *a, **kw): + return 0 + + def delete(self, *a, **kw): + return None + + def set_multi(self, *a, **kw): + return None + + def get_multi(self, *a, **kw): + return [] + + +class InternalProxy(object): + """ + Set up a private instance of a proxy server that allows normal requests + to be made without having to actually send the request to the proxy. + This also doesn't log the requests to the normal proxy logs. + + :param proxy_server_conf: proxy server configuration dictionary + :param logger: logger to log requests to + :param retries: number of times to retry each request + """ + + def __init__(self, proxy_server_conf=None, logger=None, retries=0): + self.upload_app = BaseApplication(proxy_server_conf, + memcache=MemcacheStub(), + logger=logger) + self.retries = retries + + def upload_file(self, source_file, account, container, object_name, + compress=True, content_type='application/x-gzip', + etag=None): + """ + Upload a file to cloud files. + + :param source_file: path to or file like object to upload + :param account: account to upload to + :param container: container to upload to + :param object_name: name of object being uploaded + :param compress: if True, compresses object as it is uploaded + :param content_type: content-type of object + :param etag: etag for object to check successful upload + :returns: True if successful, False otherwise + """ + target_name = '/v1/%s/%s/%s' % (account, container, object_name) + + # create the container + if not self.create_container(account, container): + return False + + # upload the file to the account + req = webob.Request.blank(target_name, + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Transfer-Encoding': 'chunked'}) + if compress: + if hasattr(source_file, 'read'): + compressed_file = CompressingFileReader(source_file) + else: + compressed_file = CompressingFileReader( + open(source_file, 'rb')) + req.body_file = compressed_file + else: + if not hasattr(source_file, 'read'): + source_file = open(source_file, 'rb') + req.body_file = source_file + req.account = account + req.content_type = content_type + req.content_length = None # to make sure we send chunked data + if etag: + req.etag = etag + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + if not (200 <= resp.status_int < 300): + return False + return True + + def get_object(self, account, container, object_name): + """ + Get object. + + :param account: account name object is in + :param container: container name object is in + :param object_name: name of object to get + :returns: iterator for object data + """ + req = webob.Request.blank('/v1/%s/%s/%s' % + (account, container, object_name), + environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + return resp.status_int, resp.app_iter + + def create_container(self, account, container): + """ + Create container. + + :param account: account name to put the container in + :param container: container name to create + :returns: True if successful, otherwise False + """ + req = webob.Request.blank('/v1/%s/%s' % (account, container), + environ={'REQUEST_METHOD': 'PUT'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + return 200 <= resp.status_int < 300 + + def get_container_list(self, account, container, marker=None, limit=None, + prefix=None, delimiter=None, full_listing=True): + """ + Get container listing. + + :param account: account name for the container + :param container: container name to get the listing of + :param marker: marker query + :param limit: limit to query + :param prefix: prefix query + :param delimeter: delimeter for query + :param full_listing: if True, make enough requests to get all listings + :returns: list of objects + """ + if full_listing: + rv = [] + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + while listing: + rv.extend(listing) + if not delimiter: + marker = listing[-1]['name'] + else: + marker = listing[-1].get('name', listing[-1].get('subdir')) + listing = self.get_container_list(account, container, marker, + limit, prefix, delimiter, full_listing=False) + return rv + path = '/v1/%s/%s' % (account, container) + qs = 'format=json' + if marker: + qs += '&marker=%s' % quote(marker) + if limit: + qs += '&limit=%d' % limit + if prefix: + qs += '&prefix=%s' % quote(prefix) + if delimiter: + qs += '&delimiter=%s' % quote(delimiter) + path += '?%s' % qs + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) + req.account = account + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries <= self.retries: + resp = self.upload_app.handle_request( + self.upload_app.update_request(req)) + tries += 1 + if resp.status_int == 204: + return [] + if 200 <= resp.status_int < 300: + return json_loads(resp.body) diff --git a/swift/common/utils.py b/swift/common/utils.py index 59c9ee7c3c..f8feb73968 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -553,30 +553,42 @@ def cache_from_env(env): return item_from_env(env, 'swift.cache') -def readconf(conf, section_name, log_name=None): +def readconf(conf, section_name=None, log_name=None, defaults=None): """ Read config file and return config items as a dict :param conf: path to config file - :param section_name: config section to read + :param section_name: config section to read (will return all sections if + not defined) :param log_name: name to be used with logging (will use section_name if not defined) + :param defaults: dict of default values to pre-populate the config with :returns: dict of config items """ - c = ConfigParser() + if defaults is None: + defaults = {} + c = ConfigParser(defaults) if not c.read(conf): print "Unable to read config file %s" % conf sys.exit(1) - if c.has_section(section_name): - conf = dict(c.items(section_name)) - else: - print "Unable to find %s config section in %s" % (section_name, conf) - sys.exit(1) - if "log_name" not in conf: - if log_name is not None: - conf['log_name'] = log_name + if section_name: + if c.has_section(section_name): + conf = dict(c.items(section_name)) else: - conf['log_name'] = section_name + print "Unable to find %s config section in %s" % (section_name, + conf) + sys.exit(1) + if "log_name" not in conf: + if log_name is not None: + conf['log_name'] = log_name + else: + conf['log_name'] = section_name + else: + conf = {} + for s in c.sections(): + conf.update({s: dict(c.items(s))}) + if 'log_name' not in conf: + conf['log_name'] = log_name return conf diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 306f0e2980..5628517264 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -95,7 +95,7 @@ def run_wsgi(conf_file, app_section, *args, **kwargs): # pragma: no cover retry_until = time.time() + 30 while not sock and time.time() < retry_until: try: - sock = listen(bind_addr) + sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096))) if 'cert_file' in conf: sock = ssl.wrap_socket(sock, certfile=conf['cert_file'], keyfile=conf['key_file']) diff --git a/swift/stats/__init__.py b/swift/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py new file mode 100644 index 0000000000..5d8766b9df --- /dev/null +++ b/swift/stats/access_processor.py @@ -0,0 +1,239 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +from urllib import unquote +import copy + +from swift.common.utils import split_path, get_logger + +month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() + + +class AccessLogProcessor(object): + """Transform proxy server access logs""" + + def __init__(self, conf): + self.server_name = conf.get('server_name', 'proxy') + self.lb_private_ips = [x.strip() for x in \ + conf.get('lb_private_ips', '').split(',')\ + if x.strip()] + self.service_ips = [x.strip() for x in \ + conf.get('service_ips', '').split(',')\ + if x.strip()] + self.warn_percent = float(conf.get('warn_percent', '0.8')) + self.logger = get_logger(conf) + + def log_line_parser(self, raw_log): + '''given a raw access log line, return a dict of the good parts''' + d = {} + try: + (_, + server, + client_ip, + lb_ip, + timestamp, + method, + request, + http_version, + code, + referrer, + user_agent, + auth_token, + bytes_in, + bytes_out, + etag, + trans_id, + headers, + processing_time) = (unquote(x) for x in raw_log[16:].split(' ')) + except ValueError: + self.logger.debug('Bad line data: %s' % repr(raw_log)) + return {} + if server != self.server_name: + # incorrect server name in log line + self.logger.debug('Bad server name: found "%s" expected "%s"' \ + % (server, self.server_name)) + return {} + (version, + account, + container_name, + object_name) = split_path(request, 2, 4, True) + if container_name is not None: + container_name = container_name.split('?', 1)[0] + if object_name is not None: + object_name = object_name.split('?', 1)[0] + account = account.split('?', 1)[0] + query = None + if '?' in request: + request, query = request.split('?', 1) + args = query.split('&') + # Count each query argument. This is used later to aggregate + # the number of format, prefix, etc. queries. + for q in args: + if '=' in q: + k, v = q.split('=', 1) + else: + k = q + # Certain keys will get summmed in stats reporting + # (format, path, delimiter, etc.). Save a "1" here + # to indicate that this request is 1 request for + # its respective key. + d[k] = 1 + d['client_ip'] = client_ip + d['lb_ip'] = lb_ip + d['method'] = method + d['request'] = request + if query: + d['query'] = query + d['http_version'] = http_version + d['code'] = code + d['referrer'] = referrer + d['user_agent'] = user_agent + d['auth_token'] = auth_token + d['bytes_in'] = bytes_in + d['bytes_out'] = bytes_out + d['etag'] = etag + d['trans_id'] = trans_id + d['processing_time'] = processing_time + day, month, year, hour, minute, second = timestamp.split('/') + d['day'] = day + month = ('%02s' % month_map.index(month)).replace(' ', '0') + d['month'] = month + d['year'] = year + d['hour'] = hour + d['minute'] = minute + d['second'] = second + d['tz'] = '+0000' + d['account'] = account + d['container_name'] = container_name + d['object_name'] = object_name + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) + d['code'] = int(d['code']) + return d + + def process(self, obj_stream, account, container, object_name): + '''generate hourly groupings of data from one access log file''' + hourly_aggr_info = {} + total_lines = 0 + bad_lines = 0 + for line in obj_stream: + line_data = self.log_line_parser(line) + total_lines += 1 + if not line_data: + bad_lines += 1 + continue + account = line_data['account'] + container_name = line_data['container_name'] + year = line_data['year'] + month = line_data['month'] + day = line_data['day'] + hour = line_data['hour'] + bytes_out = line_data['bytes_out'] + bytes_in = line_data['bytes_in'] + method = line_data['method'] + code = int(line_data['code']) + object_name = line_data['object_name'] + client_ip = line_data['client_ip'] + + op_level = None + if not container_name: + op_level = 'account' + elif container_name and not object_name: + op_level = 'container' + elif object_name: + op_level = 'object' + + aggr_key = (account, year, month, day, hour) + d = hourly_aggr_info.get(aggr_key, {}) + if line_data['lb_ip'] in self.lb_private_ips: + source = 'service' + else: + source = 'public' + + if line_data['client_ip'] in self.service_ips: + source = 'service' + + d[(source, 'bytes_out')] = d.setdefault(( + source, 'bytes_out'), 0) + bytes_out + d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ + bytes_in + + d['format_query'] = d.setdefault('format_query', 0) + \ + line_data.get('format', 0) + d['marker_query'] = d.setdefault('marker_query', 0) + \ + line_data.get('marker', 0) + d['prefix_query'] = d.setdefault('prefix_query', 0) + \ + line_data.get('prefix', 0) + d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ + line_data.get('delimiter', 0) + path = line_data.get('path', 0) + d['path_query'] = d.setdefault('path_query', 0) + path + + code = '%dxx' % (code / 100) + key = (source, op_level, method, code) + d[key] = d.setdefault(key, 0) + 1 + + hourly_aggr_info[aggr_key] = d + if bad_lines > (total_lines * self.warn_percent): + name = '/'.join([account, container, object_name]) + self.logger.warning('I found a bunch of bad lines in %s '\ + '(%d bad, %d total)' % (name, bad_lines, total_lines)) + return hourly_aggr_info + + def keylist_mapping(self): + source_keys = 'service public'.split() + level_keys = 'account container object'.split() + verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() + code_keys = '2xx 4xx 5xx'.split() + + keylist_mapping = { + # : or + 'service_bw_in': ('service', 'bytes_in'), + 'service_bw_out': ('service', 'bytes_out'), + 'public_bw_in': ('public', 'bytes_in'), + 'public_bw_out': ('public', 'bytes_out'), + 'account_requests': set(), + 'container_requests': set(), + 'object_requests': set(), + 'service_request': set(), + 'public_request': set(), + 'ops_count': set(), + } + for verb in verb_keys: + keylist_mapping[verb] = set() + for code in code_keys: + keylist_mapping[code] = set() + for source in source_keys: + for level in level_keys: + for verb in verb_keys: + for code in code_keys: + keylist_mapping['account_requests'].add( + (source, 'account', verb, code)) + keylist_mapping['container_requests'].add( + (source, 'container', verb, code)) + keylist_mapping['object_requests'].add( + (source, 'object', verb, code)) + keylist_mapping['service_request'].add( + ('service', level, verb, code)) + keylist_mapping['public_request'].add( + ('public', level, verb, code)) + keylist_mapping[verb].add( + (source, level, verb, code)) + keylist_mapping[code].add( + (source, level, verb, code)) + keylist_mapping['ops_count'].add( + (source, level, verb, code)) + return keylist_mapping diff --git a/swift/stats/account_stats.py b/swift/stats/account_stats.py new file mode 100644 index 0000000000..ddf4192119 --- /dev/null +++ b/swift/stats/account_stats.py @@ -0,0 +1,111 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from paste.deploy import appconfig +import shutil +import hashlib + +from swift.account.server import DATADIR as account_server_data_dir +from swift.common.db import AccountBroker +from swift.common.internal_proxy import InternalProxy +from swift.common.utils import renamer, get_logger, readconf, mkdirs +from swift.common.constraints import check_mount +from swift.common.daemon import Daemon + + +class AccountStat(Daemon): + """ + Extract storage stats from account databases on the account + storage nodes + """ + + def __init__(self, stats_conf): + super(AccountStat, self).__init__(stats_conf) + target_dir = stats_conf.get('log_dir', '/var/log/swift') + account_server_conf_loc = stats_conf.get('account_server_conf', + '/etc/swift/account-server.conf') + server_conf = appconfig('config:%s' % account_server_conf_loc, + name='account-server') + filename_format = stats_conf['source_filename_format'] + if filename_format.count('*') > 1: + raise Exception('source filename format should have at max one *') + self.filename_format = filename_format + self.target_dir = target_dir + mkdirs(self.target_dir) + self.devices = server_conf.get('devices', '/srv/node') + self.mount_check = server_conf.get('mount_check', 'true').lower() in \ + ('true', 't', '1', 'on', 'yes', 'y') + self.logger = get_logger(stats_conf, 'swift-account-stats-logger') + + def run_once(self): + self.logger.info("Gathering account stats") + start = time.time() + self.find_and_process() + self.logger.info("Gathering account stats complete (%0.2f minutes)" % + ((time.time() - start) / 60)) + + def find_and_process(self): + src_filename = time.strftime(self.filename_format) + working_dir = os.path.join(self.target_dir, '.stats_tmp') + shutil.rmtree(working_dir, ignore_errors=True) + mkdirs(working_dir) + tmp_filename = os.path.join(working_dir, src_filename) + hasher = hashlib.md5() + with open(tmp_filename, 'wb') as statfile: + # csv has the following columns: + # Account Name, Container Count, Object Count, Bytes Used + for device in os.listdir(self.devices): + if self.mount_check and not check_mount(self.devices, device): + self.logger.error("Device %s is not mounted, skipping." % + device) + continue + accounts = os.path.join(self.devices, + device, + account_server_data_dir) + if not os.path.exists(accounts): + self.logger.debug("Path %s does not exist, skipping." % + accounts) + continue + for root, dirs, files in os.walk(accounts, topdown=False): + for filename in files: + if filename.endswith('.db'): + db_path = os.path.join(root, filename) + broker = AccountBroker(db_path) + if not broker.is_deleted(): + (account_name, + _, _, _, + container_count, + object_count, + bytes_used, + _, _) = broker.get_info() + line_data = '"%s",%d,%d,%d\n' % ( + account_name, container_count, + object_count, bytes_used) + statfile.write(line_data) + hasher.update(line_data) + file_hash = hasher.hexdigest() + hash_index = src_filename.find('*') + if hash_index < 0: + # if there is no * in the target filename, the uploader probably + # won't work because we are crafting a filename that doesn't + # fit the pattern + src_filename = '_'.join([src_filename, file_hash]) + else: + parts = src_filename[:hash_index], src_filename[hash_index + 1:] + src_filename = ''.join([parts[0], file_hash, parts[1]]) + renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) + shutil.rmtree(working_dir, ignore_errors=True) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py new file mode 100644 index 0000000000..6fd6c68597 --- /dev/null +++ b/swift/stats/log_processor.py @@ -0,0 +1,424 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ConfigParser import ConfigParser +import zlib +import time +import datetime +import cStringIO +import collections +from paste.deploy import appconfig +import multiprocessing +import Queue +import cPickle +import hashlib + +from swift.common.internal_proxy import InternalProxy +from swift.common.exceptions import ChunkReadTimeout +from swift.common.utils import get_logger, readconf +from swift.common.daemon import Daemon + + +class BadFileDownload(Exception): + pass + + +class LogProcessor(object): + """Load plugins, process logs""" + + def __init__(self, conf, logger): + if isinstance(logger, tuple): + self.logger = get_logger(*logger) + else: + self.logger = logger + + self.conf = conf + self._internal_proxy = None + + # load the processing plugins + self.plugins = {} + plugin_prefix = 'log-processor-' + for section in (x for x in conf if x.startswith(plugin_prefix)): + plugin_name = section[len(plugin_prefix):] + plugin_conf = conf.get(section, {}) + self.plugins[plugin_name] = plugin_conf + class_path = self.plugins[plugin_name]['class_path'] + import_target, class_name = class_path.rsplit('.', 1) + module = __import__(import_target, fromlist=[import_target]) + klass = getattr(module, class_name) + self.plugins[plugin_name]['instance'] = klass(plugin_conf) + self.logger.debug('Loaded plugin "%s"' % plugin_name) + + @property + def internal_proxy(self): + if self._internal_proxy is None: + stats_conf = self.conf.get('log-processor', {}) + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig( + 'config:%s' % proxy_server_conf_loc, + name='proxy-server') + self._internal_proxy = InternalProxy(proxy_server_conf, + self.logger, + retries=3) + else: + return self._internal_proxy + + def process_one_file(self, plugin_name, account, container, object_name): + self.logger.info('Processing %s/%s/%s with plugin "%s"' % (account, + container, + object_name, + plugin_name)) + # get an iter of the object data + compressed = object_name.endswith('.gz') + stream = self.get_object_data(account, container, object_name, + compressed=compressed) + # look up the correct plugin and send the stream to it + return self.plugins[plugin_name]['instance'].process(stream, + account, + container, + object_name) + + def get_data_list(self, start_date=None, end_date=None, + listing_filter=None): + total_list = [] + for plugin_name, data in self.plugins.items(): + account = data['swift_account'] + container = data['container_name'] + listing = self.get_container_listing(account, + container, + start_date, + end_date) + for object_name in listing: + # The items in this list end up being passed as positional + # parameters to process_one_file. + x = (plugin_name, account, container, object_name) + if x not in listing_filter: + total_list.append(x) + return total_list + + def get_container_listing(self, swift_account, container_name, + start_date=None, end_date=None, + listing_filter=None): + ''' + Get a container listing, filtered by start_date, end_date, and + listing_filter. Dates, if given, should be in YYYYMMDDHH format + ''' + search_key = None + if start_date is not None: + date_parts = [] + try: + year, start_date = start_date[:4], start_date[4:] + if year: + date_parts.append(year) + month, start_date = start_date[:2], start_date[2:] + if month: + date_parts.append(month) + day, start_date = start_date[:2], start_date[2:] + if day: + date_parts.append(day) + hour, start_date = start_date[:2], start_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + search_key = '/'.join(date_parts) + end_key = None + if end_date is not None: + date_parts = [] + try: + year, end_date = end_date[:4], end_date[4:] + if year: + date_parts.append(year) + month, end_date = end_date[:2], end_date[2:] + if month: + date_parts.append(month) + day, end_date = end_date[:2], end_date[2:] + if day: + date_parts.append(day) + hour, end_date = end_date[:2], end_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + end_key = '/'.join(date_parts) + container_listing = self.internal_proxy.get_container_list( + swift_account, + container_name, + marker=search_key) + results = [] + if container_listing is not None: + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if end_key and name > end_key: + break + if name not in listing_filter: + results.append(name) + return results + + def get_object_data(self, swift_account, container_name, object_name, + compressed=False): + '''reads an object and yields its lines''' + code, o = self.internal_proxy.get_object(swift_account, + container_name, + object_name) + if code < 200 or code >= 300: + return + last_part = '' + last_compressed_part = '' + # magic in the following zlib.decompressobj argument is courtesy of + # Python decompressing gzip chunk-by-chunk + # http://stackoverflow.com/questions/2423866 + d = zlib.decompressobj(16 + zlib.MAX_WBITS) + try: + for chunk in o: + if compressed: + try: + chunk = d.decompress(chunk) + except zlib.error: + self.logger.debug('Bad compressed data for %s/%s/%s' % + (swift_account, + container_name, + object_name)) + raise BadFileDownload() # bad compressed data + parts = chunk.split('\n') + parts[0] = last_part + parts[0] + for part in parts[:-1]: + yield part + last_part = parts[-1] + if last_part: + yield last_part + except ChunkReadTimeout: + raise BadFileDownload() + + def generate_keylist_mapping(self): + keylist = {} + for plugin in self.plugins: + plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() + if not plugin_keylist: + continue + for k, v in plugin_keylist.items(): + o = keylist.get(k) + if o: + if isinstance(o, set): + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = set(o) + if isinstance(v, set): + o.update(v) + else: + o.update([v]) + else: + o = v + keylist[k] = o + return keylist + + +class LogProcessorDaemon(Daemon): + """ + Gather raw log data and farm proccessing to generate a csv that is + uploaded to swift. + """ + + def __init__(self, conf): + c = conf.get('log-processor') + super(LogProcessorDaemon, self).__init__(c) + self.total_conf = conf + self.logger = get_logger(c) + self.log_processor = LogProcessor(conf, self.logger) + self.lookback_hours = int(c.get('lookback_hours', '120')) + self.lookback_window = int(c.get('lookback_window', + str(self.lookback_hours))) + self.log_processor_account = c['swift_account'] + self.log_processor_container = c.get('container_name', + 'log_processing_data') + self.worker_count = int(c.get('worker_count', '1')) + + def run_once(self): + self.logger.info("Beginning log processing") + start = time.time() + if self.lookback_hours == 0: + lookback_start = None + lookback_end = None + else: + delta_hours = datetime.timedelta(hours=self.lookback_hours) + lookback_start = datetime.datetime.now() - delta_hours + lookback_start = lookback_start.strftime('%Y%m%d%H') + if self.lookback_window == 0: + lookback_end = None + else: + delta_window = datetime.timedelta(hours=self.lookback_window) + lookback_end = datetime.datetime.now() - \ + delta_hours + \ + delta_window + lookback_end = lookback_end.strftime('%Y%m%d%H') + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + try: + # Note: this file (or data set) will grow without bound. + # In practice, if it becomes a problem (say, after many months of + # running), one could manually prune the file to remove older + # entries. Automatically pruning on each run could be dangerous. + # There is not a good way to determine when an old entry should be + # pruned (lookback_hours could be set to anything and could change) + processed_files_stream = self.log_processor.get_object_data( + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz', + compressed=True) + buf = '\n'.join(x for x in processed_files_stream) + if buf: + already_processed_files = cPickle.loads(buf) + else: + already_processed_files = set() + except: + already_processed_files = set() + self.logger.debug('found %d processed files' % \ + len(already_processed_files)) + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, + already_processed_files) + self.logger.info('loaded %d files to process' % len(logs_to_process)) + if not logs_to_process: + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time() - start) / 60)) + return + + # map + processor_args = (self.total_conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process, + self.worker_count) + + #reduce + aggr_data = {} + processed_files = already_processed_files + for item, data in results: + # since item contains the plugin and the log name, new plugins will + # "reprocess" the file and the results will be in the final csv. + processed_files.add(item) + for k, d in data.items(): + existing_data = aggr_data.get(k, {}) + for i, j in d.items(): + current = existing_data.get(i, 0) + # merging strategy for key collisions is addition + # processing plugins need to realize this + existing_data[i] = current + j + aggr_data[k] = existing_data + + # group + # reduce a large number of keys in aggr_data[k] to a small number of + # output keys + keylist_mapping = self.log_processor.generate_keylist_mapping() + final_info = collections.defaultdict(dict) + for account, data in aggr_data.items(): + for key, mapping in keylist_mapping.items(): + if isinstance(mapping, (list, set)): + value = 0 + for k in mapping: + try: + value += data[k] + except KeyError: + pass + else: + try: + value = data[mapping] + except KeyError: + value = 0 + final_info[account][key] = value + + # output + sorted_keylist_mapping = sorted(keylist_mapping) + columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) + out_buf = [columns] + for (account, year, month, day, hour), d in final_info.items(): + data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) + row = [data_ts] + row.append('%s' % account) + for k in sorted_keylist_mapping: + row.append('%s' % d[k]) + out_buf.append(','.join(row)) + out_buf = '\n'.join(out_buf) + h = hashlib.md5(out_buf).hexdigest() + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h + f = cStringIO.StringIO(out_buf) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + upload_name) + + # cleanup + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + 'processed_files.pickle.gz') + + self.logger.info("Log processing done (%0.2f minutes)" % + ((time.time() - start) / 60)) + + +def multiprocess_collate(processor_args, logs_to_process, worker_count): + '''yield hourly data from logs_to_process''' + results = [] + in_queue = multiprocessing.Queue() + out_queue = multiprocessing.Queue() + for _ in range(worker_count): + p = multiprocessing.Process(target=collate_worker, + args=(processor_args, + in_queue, + out_queue)) + p.start() + results.append(p) + for x in logs_to_process: + in_queue.put(x) + for _ in range(worker_count): + in_queue.put(None) + count = 0 + while True: + try: + item, data = out_queue.get_nowait() + count += 1 + if data: + yield item, data + if count >= len(logs_to_process): + # this implies that one result will come from every request + break + except Queue.Empty: + time.sleep(.1) + for r in results: + r.join() + + +def collate_worker(processor_args, in_queue, out_queue): + '''worker process for multiprocess_collate''' + p = LogProcessor(*processor_args) + while True: + try: + item = in_queue.get_nowait() + if item is None: + break + except Queue.Empty: + time.sleep(.1) + else: + ret = p.process_one_file(*item) + out_queue.put((item, ret)) diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py new file mode 100644 index 0000000000..a8cc92739f --- /dev/null +++ b/swift/stats/log_uploader.py @@ -0,0 +1,170 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement +import os +import hashlib +import time +import gzip +import glob +from paste.deploy import appconfig + +from swift.common.internal_proxy import InternalProxy +from swift.common.daemon import Daemon +from swift.common import utils + + +class LogUploader(Daemon): + ''' + Given a local directory, a swift account, and a container name, LogParser + will upload all files in the local directory to the given account/ + container. All but the newest files will be uploaded, and the files' md5 + sum will be computed. The hash is used to prevent duplicate data from + being uploaded multiple times in different files (ex: log lines). Since + the hash is computed, it is also used as the uploaded object's etag to + ensure data integrity. + + Note that after the file is successfully uploaded, it will be unlinked. + + The given proxy server config is used to instantiate a proxy server for + the object uploads. + ''' + + def __init__(self, uploader_conf, plugin_name): + super(LogUploader, self).__init__(uploader_conf) + log_dir = uploader_conf.get('log_dir', '/var/log/swift/') + swift_account = uploader_conf['swift_account'] + container_name = uploader_conf['container_name'] + source_filename_format = uploader_conf['source_filename_format'] + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, + name='proxy-server') + new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200')) + unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \ + ('true', 'on', '1', 'yes') + self.unlink_log = unlink_log + self.new_log_cutoff = new_log_cutoff + if not log_dir.endswith('/'): + log_dir = log_dir + '/' + self.log_dir = log_dir + self.swift_account = swift_account + self.container_name = container_name + self.filename_format = source_filename_format + self.internal_proxy = InternalProxy(proxy_server_conf) + log_name = 'swift-log-uploader-%s' % plugin_name + self.logger = utils.get_logger(uploader_conf, plugin_name) + + def run_once(self): + self.logger.info("Uploading logs") + start = time.time() + self.upload_all_logs() + self.logger.info("Uploading logs complete (%0.2f minutes)" % + ((time.time() - start) / 60)) + + def upload_all_logs(self): + i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] + i.sort() + year_offset = month_offset = day_offset = hour_offset = None + base_offset = len(self.log_dir) + for start, c in i: + offset = base_offset + start + if c == '%Y': + year_offset = offset, offset + 4 + # Add in the difference between len(%Y) and the expanded + # version of %Y (????). This makes sure the codes after this + # one will align properly in the final filename. + base_offset += 2 + elif c == '%m': + month_offset = offset, offset + 2 + elif c == '%d': + day_offset = offset, offset + 2 + elif c == '%H': + hour_offset = offset, offset + 2 + if not (year_offset and month_offset and day_offset and hour_offset): + # don't have all the parts, can't upload anything + return + glob_pattern = self.filename_format + glob_pattern = glob_pattern.replace('%Y', '????', 1) + glob_pattern = glob_pattern.replace('%m', '??', 1) + glob_pattern = glob_pattern.replace('%d', '??', 1) + glob_pattern = glob_pattern.replace('%H', '??', 1) + filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) + current_hour = int(time.strftime('%H')) + today = int(time.strftime('%Y%m%d')) + self.internal_proxy.create_container(self.swift_account, + self.container_name) + for filename in filelist: + try: + # From the filename, we need to derive the year, month, day, + # and hour for the file. These values are used in the uploaded + # object's name, so they should be a reasonably accurate + # representation of the time for which the data in the file was + # collected. The file's last modified time is not a reliable + # representation of the data in the file. For example, an old + # log file (from hour A) may be uploaded or moved into the + # log_dir in hour Z. The file's modified time will be for hour + # Z, and therefore the object's name in the system will not + # represent the data in it. + # If the filename doesn't match the format, it shouldn't be + # uploaded. + year = filename[slice(*year_offset)] + month = filename[slice(*month_offset)] + day = filename[slice(*day_offset)] + hour = filename[slice(*hour_offset)] + except IndexError: + # unexpected filename format, move on + self.logger.error("Unexpected log: %s" % filename) + continue + if ((time.time() - os.stat(filename).st_mtime) < + self.new_log_cutoff): + # don't process very new logs + self.logger.debug( + "Skipping log: %s (< %d seconds old)" % (filename, + self.new_log_cutoff)) + continue + self.upload_one_log(filename, year, month, day, hour) + + def upload_one_log(self, filename, year, month, day, hour): + if os.path.getsize(filename) == 0: + self.logger.debug("Log %s is 0 length, skipping" % filename) + return + self.logger.debug("Processing log: %s" % filename) + filehash = hashlib.md5() + already_compressed = True if filename.endswith('.gz') else False + opener = gzip.open if already_compressed else open + f = opener(filename, 'rb') + try: + for line in f: + # filter out bad lines here? + filehash.update(line) + finally: + f.close() + filehash = filehash.hexdigest() + # By adding a hash to the filename, we ensure that uploaded files + # have unique filenames and protect against uploading one file + # more than one time. By using md5, we get an etag for free. + target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) + if self.internal_proxy.upload_file(filename, + self.swift_account, + self.container_name, + target_filename, + compress=(not already_compressed)): + self.logger.debug("Uploaded log %s to %s" % + (filename, target_filename)) + if self.unlink_log: + os.unlink(filename) + else: + self.logger.error("ERROR: Upload of log %s failed!" % filename) diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py new file mode 100644 index 0000000000..6caaae7840 --- /dev/null +++ b/swift/stats/stats_processor.py @@ -0,0 +1,68 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.common.utils import get_logger + + +class StatsLogProcessor(object): + """Transform account storage stat logs""" + + def __init__(self, conf): + self.logger = get_logger(conf) + + def process(self, obj_stream, account, container, object_name): + '''generate hourly groupings of data from one stats log file''' + account_totals = {} + year, month, day, hour, _ = object_name.split('/') + for line in obj_stream: + if not line: + continue + try: + (account, + container_count, + object_count, + bytes_used) = line.split(',') + except (IndexError, ValueError): + # bad line data + self.logger.debug('Bad line data: %s' % repr(line)) + continue + account = account.strip('"') + container_count = int(container_count.strip('"')) + object_count = int(object_count.strip('"')) + bytes_used = int(bytes_used.strip('"')) + aggr_key = (account, year, month, day, hour) + d = account_totals.get(aggr_key, {}) + d['replica_count'] = d.setdefault('replica_count', 0) + 1 + d['container_count'] = d.setdefault('container_count', 0) + \ + container_count + d['object_count'] = d.setdefault('object_count', 0) + \ + object_count + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ + bytes_used + account_totals[aggr_key] = d + return account_totals + + def keylist_mapping(self): + ''' + returns a dictionary of final keys mapped to source keys + ''' + keylist_mapping = { + # : or + 'bytes_used': 'bytes_used', + 'container_count': 'container_count', + 'object_count': 'object_count', + 'replica_count': 'replica_count', + } + return keylist_mapping diff --git a/test/unit/account/test_replicator.py b/test/unit/account/test_replicator.py new file mode 100644 index 0000000000..6b3d045eaa --- /dev/null +++ b/test/unit/account/test_replicator.py @@ -0,0 +1,32 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from swift.account import replicator + + +class TestReplicator(unittest.TestCase): + """ + swift.account.replicator is currently just a subclass with some class + variables overridden, but at least this test stub will ensure proper Python + syntax. + """ + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/middleware/test_auth.py b/test/unit/common/middleware/test_auth.py index a7ec9199a2..800ecb4cb7 100644 --- a/test/unit/common/middleware/test_auth.py +++ b/test/unit/common/middleware/test_auth.py @@ -67,25 +67,33 @@ def mock_http_connect(response, headers=None, with_exc=False): self.headers = headers if self.headers is None: self.headers = {} + def getresponse(self): if self.with_exc: raise Exception('test') return self + def getheader(self, header): return self.headers[header] + def read(self, amt=None): return '' + def close(self): return + return lambda *args, **kwargs: FakeConn(response, headers, with_exc) class Logger(object): + def __init__(self): self.error_value = None self.exception_value = None + def error(self, msg, *args, **kwargs): self.error_value = (msg, args, kwargs) + def exception(self, msg, *args, **kwargs): _, exc, _ = sys.exc_info() self.exception_value = (msg, @@ -99,7 +107,7 @@ class FakeApp(object): def __call__(self, env, start_response): self.i_was_called = True - req = Request(env) + req = Request.blank('', environ=env) if 'swift.authorize' in env: resp = env['swift.authorize'](req) if resp: @@ -110,6 +118,7 @@ class FakeApp(object): def start_response(*args): pass + class TestAuth(unittest.TestCase): def setUp(self): @@ -418,6 +427,5 @@ class TestAuth(unittest.TestCase): self.assert_(resp.startswith('403'), resp) - if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_bench.py b/test/unit/common/test_bench.py new file mode 100644 index 0000000000..7b75aba79e --- /dev/null +++ b/test/unit/common/test_bench.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.common import bench + + +class TestBench(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/test_compressing_file_reader.py b/test/unit/common/test_compressing_file_reader.py new file mode 100644 index 0000000000..5394a97a72 --- /dev/null +++ b/test/unit/common/test_compressing_file_reader.py @@ -0,0 +1,34 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Tests for swift.common.compressing_file_reader """ + +import unittest +import cStringIO + +from swift.common.compressing_file_reader import CompressingFileReader + +class TestCompressingFileReader(unittest.TestCase): + + def test_read(self): + plain = 'obj\ndata' + s = cStringIO.StringIO(plain) + expected = '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xcaO\xca\xe2JI,'\ + 'I\x04\x00\x00\x00\xff\xff\x03\x00P(\xa8\x1f\x08\x00\x00'\ + '\x00' + x = CompressingFileReader(s) + compressed = ''.join(iter(lambda: x.read(), '')) + self.assertEquals(compressed, expected) + self.assertEquals(x.read(), '') diff --git a/test/unit/common/test_daemon.py b/test/unit/common/test_daemon.py new file mode 100644 index 0000000000..e2db43caa6 --- /dev/null +++ b/test/unit/common/test_daemon.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.common import daemon + + +class TestDaemon(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/test_direct_client.py b/test/unit/common/test_direct_client.py index a925c118bb..35ed07ffd7 100644 --- a/test/unit/common/test_direct_client.py +++ b/test/unit/common/test_direct_client.py @@ -17,7 +17,10 @@ import unittest -class TestAuditor(unittest.TestCase): +from swift.common import direct_client + + +class TestDirectClient(unittest.TestCase): def test_placeholder(self): pass diff --git a/test/unit/common/test_exceptions.py b/test/unit/common/test_exceptions.py index bfb251b139..35a5801e77 100644 --- a/test/unit/common/test_exceptions.py +++ b/test/unit/common/test_exceptions.py @@ -18,7 +18,8 @@ import unittest from swift.common import exceptions -class TestAuditor(unittest.TestCase): + +class TestExceptions(unittest.TestCase): def test_placeholder(self): pass diff --git a/test/unit/common/test_internal_proxy.py b/test/unit/common/test_internal_proxy.py new file mode 100644 index 0000000000..248bf1cf23 --- /dev/null +++ b/test/unit/common/test_internal_proxy.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.common import internal_proxy + + +class TestInternalProxy(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 344cee4ec8..92be1077c0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -247,5 +247,33 @@ class TestUtils(unittest.TestCase): self.assert_(callable( utils.load_libc_function('some_not_real_function'))) + def test_readconf(self): + conf = '''[section1] +foo = bar + +[section2] +log_name = yarr''' + f = open('/tmp/test', 'wb') + f.write(conf) + f.close() + result = utils.readconf('/tmp/test') + expected = {'log_name': None, + 'section1': {'foo': 'bar'}, + 'section2': {'log_name': 'yarr'}} + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1') + expected = {'log_name': 'section1', 'foo': 'bar'} + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section2').get('log_name') + expected = 'yarr' + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name') + expected = 'foo' + self.assertEquals(result, expected) + result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'}) + expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'} + self.assertEquals(result, expected) + os.unlink('/tmp/test') + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py new file mode 100644 index 0000000000..8f7a032893 --- /dev/null +++ b/test/unit/container/test_replicator.py @@ -0,0 +1,32 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from swift.container import replicator + + +class TestReplicator(unittest.TestCase): + """ + swift.container.replicator is currently just a subclass with some class + variables overridden, but at least this test stub will ensure proper Python + syntax. + """ + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/__init__.py b/test/unit/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/unit/stats/test_access_processor.py b/test/unit/stats/test_access_processor.py new file mode 100644 index 0000000000..47013ca8ae --- /dev/null +++ b/test/unit/stats/test_access_processor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import access_processor + + +class TestAccessProcessor(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_account_stats.py b/test/unit/stats/test_account_stats.py new file mode 100644 index 0000000000..e318739dda --- /dev/null +++ b/test/unit/stats/test_account_stats.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import account_stats + + +class TestAccountStats(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py new file mode 100644 index 0000000000..4ff73eccf3 --- /dev/null +++ b/test/unit/stats/test_log_processor.py @@ -0,0 +1,227 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from swift.stats import log_processor + +class DumbLogger(object): + def __getattr__(self, n): + return self.foo + + def foo(self, *a, **kw): + pass + +class DumbInternalProxy(object): + def get_container_list(self, account, container, marker=None): + n = '2010/03/14/13/obj1' + if marker is None or n > marker: + return [{'name': n}] + else: + return [] + + def get_object(self, account, container, object_name): + code = 200 + if object_name.endswith('.gz'): + # same data as below, compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' + else: + def data(): + yield 'obj\n' + yield 'data' + return code, data() + +class TestLogProcessor(unittest.TestCase): + + access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\ + '09/Jul/2010/04/14/30 GET '\ + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' + stats_test_line = 'account,1,2,3' + proxy_config = {'log-processor': { + + } + } + + def test_access_log_line_parser(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) + self.assertEquals(result, {'code': 200, + 'processing_time': '0.0262', + 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', + 'month': '07', + 'second': '30', + 'year': '2010', + 'query': 'format=json&foo', + 'tz': '+0000', + 'http_version': 'HTTP/1.0', + 'object_name': 'bar', + 'etag': '-', + 'foo': 1, + 'method': 'GET', + 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', + 'client_ip': '1.2.3.4', + 'format': 1, + 'bytes_out': 95, + 'container_name': 'foo', + 'day': '09', + 'minute': '14', + 'account': 'acct', + 'hour': '04', + 'referrer': '-', + 'request': '/v1/acct/foo/bar', + 'user_agent': 'curl', + 'bytes_in': 6, + 'lb_ip': '4.5.6.7'}) + + def test_process_one_access_file(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + def get_object_data(*a, **kw): + return [self.access_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('access', 'a', 'c', 'o') + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(result, expected) + + def test_get_container_listing(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = p.get_container_listing('a', 'foo') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', listing_filter=expected) + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031412', + end_date='2010031414') + expected = ['2010/03/14/13/obj1'] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031414') + expected = [] + self.assertEquals(result, expected) + result = p.get_container_listing('a', 'foo', start_date='2010031410', + end_date='2010031412') + expected = [] + self.assertEquals(result, expected) + + def test_get_object_data(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + result = list(p.get_object_data('a', 'c', 'o', False)) + expected = ['obj','data'] + self.assertEquals(result, expected) + result = list(p.get_object_data('a', 'c', 'o.gz', True)) + self.assertEquals(result, expected) + + def test_get_stat_totals(self): + stats_proxy_config = self.proxy_config.copy() + stats_proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.stats_test_line] + p.get_object_data = get_object_data + result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') + expected = {('account', 'y', 'm', 'd', 'h'): + {'replica_count': 1, + 'object_count': 2, + 'container_count': 1, + 'bytes_used': 3}} + self.assertEquals(result, expected) + + def test_generate_keylist_mapping(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + result = p.generate_keylist_mapping() + expected = {} + print p.plugins + self.assertEquals(result, expected) + + def test_generate_keylist_mapping_with_dummy_plugins(self): + class Plugin1(object): + def keylist_mapping(self): + return {'a': 'b', 'c': 'd', 'e': ['f', 'g']} + class Plugin2(object): + def keylist_mapping(self): + return {'a': '1', 'e': '2', 'h': '3'} + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p.plugins['plugin1'] = {'instance': Plugin1()} + p.plugins['plugin2'] = {'instance': Plugin2()} + result = p.generate_keylist_mapping() + expected = {'a': set(['b', '1']), 'c': 'd', 'e': set(['2', 'f', 'g']), + 'h': '3'} + self.assertEquals(result, expected) + + def test_access_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) + + def test_stats_keylist_mapping_format(self): + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-stats': { + 'class_path': + 'swift.stats.stats_processor.StatsLogProcessor' + }}) + p = log_processor.LogProcessor(proxy_config, DumbLogger()) + mapping = p.generate_keylist_mapping() + for k, v in mapping.items(): + # these only work for Py2.7+ + #self.assertIsInstance(k, str) + self.assertTrue(isinstance(k, str), type(k)) diff --git a/test/unit/stats/test_log_uploader.py b/test/unit/stats/test_log_uploader.py new file mode 100644 index 0000000000..8e889ad918 --- /dev/null +++ b/test/unit/stats/test_log_uploader.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import log_uploader + + +class TestLogUploader(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/stats/test_stats_processor.py b/test/unit/stats/test_stats_processor.py new file mode 100644 index 0000000000..4720d1f035 --- /dev/null +++ b/test/unit/stats/test_stats_processor.py @@ -0,0 +1,29 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: Tests + +import unittest +from swift.stats import stats_processor + + +class TestStatsProcessor(unittest.TestCase): + + def test_placeholder(self): + pass + + +if __name__ == '__main__': + unittest.main()