Can run swift-bench across multiple cores/servers.
You run one or more swift-bench-client processes like this:
    $ swift-bench-client 127.0.0.1 20001
    $ swift-bench-client 127.0.0.1 20002
Then you run swift-bench with a new option, --bench-clients (-b), which is
specified once for each swift-bench-client:
    $ swift-bench -b 127.0.0.1:20001 -b 127.0.0.1:20002
You get log lines from each client (interleaved) along with a final report
for all clients:
    127.0.0.1:20002 swift-bench-server 2012-08-25 22:44:06,148 INFO Auth version: 1.0
    127.0.0.1:20001 swift-bench-server 2012-08-25 22:44:06,148 INFO Auth version: 1.0
    127.0.0.1:20001 swift-bench-server 2012-08-25 22:44:12,249 INFO 83 PUTS [0 failures], 41.5/s
    127.0.0.1:20002 swift-bench-server 2012-08-25 22:44:14,430 INFO 74 PUTS [0 failures], 34.3/s
    ...
    127.0.0.1:20002 swift-bench-server 2012-08-25 22:45:18,942 INFO Auth version: 1.0
    127.0.0.1:20002 swift-bench-server 2012-08-25 22:45:20,946 INFO 238 DEL [2 failures], 118.9/s
    swift-bench 2012-08-25 22:45:27,549 INFO 2000 PUTS **FINAL** [0 failures], 56.8/s
    swift-bench 2012-08-25 22:45:27,550 INFO 30000 GETS **FINAL** [50 failures], 974.6/s
    swift-bench 2012-08-25 22:45:27,550 INFO 2000 DEL **FINAL** [20 failures], 237.1/s
The concurrency, PUT count, and GET count config settings are divided by
the number of bench_clients.  In other words, the same volume of work is
attempted (vs. not specifying --bench-clients), but it can now span
servers and CPU cores.
Benchmark containers are created (if use_proxy = yes) and deleted (if
delete = yes), with appropriate concurrency, in the initiating
swift-bench process, not any of the swift-bench-client processes.
Change-Id: Idbf31a23093244ab357a9bf77e6031257774f24a
			
			
This commit is contained in:
		
				
					committed by
					
						
						Gerrit Code Review
					
				
			
			
				
	
			
			
			
						parent
						
							c509ac2371
						
					
				
				
					commit
					ed3b12d05c
				
			@@ -21,7 +21,8 @@ import signal
 | 
			
		||||
import uuid
 | 
			
		||||
from optparse import OptionParser
 | 
			
		||||
 | 
			
		||||
from swift.common.bench import BenchController
 | 
			
		||||
from swift.common.bench import (BenchController, DistributedBenchController,
 | 
			
		||||
                                create_containers, delete_containers)
 | 
			
		||||
from swift.common.utils import readconf, LogAdapter
 | 
			
		||||
 | 
			
		||||
# The defaults should be sufficient to run swift-bench on a SAIO
 | 
			
		||||
@@ -49,6 +50,8 @@ CONF_DEFAULTS = {
 | 
			
		||||
    'devices': 'sdb1',  # space-sep list
 | 
			
		||||
    'log_level': 'INFO',
 | 
			
		||||
    'timeout': '10',
 | 
			
		||||
    'auth_version': '1.0',
 | 
			
		||||
    'bench_clients': [],
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SAIO_DEFAULTS = {
 | 
			
		||||
@@ -81,6 +84,13 @@ if __name__ == '__main__':
 | 
			
		||||
                      help='User name for obtaining an auth token')
 | 
			
		||||
    parser.add_option('-K', '--key', dest='key',
 | 
			
		||||
                      help='Key for obtaining an auth token')
 | 
			
		||||
    parser.add_option('-b', '--bench-clients', action='append',
 | 
			
		||||
                      metavar='<ip>:<port>',
 | 
			
		||||
                      help=('A string of the form "<ip>:<port>" which matches '
 | 
			
		||||
                            'the arguments supplied to a swift-bench-client '
 | 
			
		||||
                            'process.  This argument must be specified '
 | 
			
		||||
                            'once per swift-bench-client you want to '
 | 
			
		||||
                            'utilize.'))
 | 
			
		||||
    parser.add_option('-u', '--url', dest='url',
 | 
			
		||||
                      help='Storage URL')
 | 
			
		||||
    parser.add_option('-c', '--concurrency', dest='concurrency',
 | 
			
		||||
@@ -125,6 +135,8 @@ if __name__ == '__main__':
 | 
			
		||||
        options.put_concurrency = options.concurrency
 | 
			
		||||
        options.get_concurrency = options.concurrency
 | 
			
		||||
        options.del_concurrency = options.concurrency
 | 
			
		||||
    options.containers = ['%s_%d' % (options.container_name, i)
 | 
			
		||||
                          for i in xrange(int(options.num_containers))]
 | 
			
		||||
 | 
			
		||||
    def sigterm(signum, frame):
 | 
			
		||||
        sys.exit('Termination signal received.')
 | 
			
		||||
@@ -145,5 +157,13 @@ if __name__ == '__main__':
 | 
			
		||||
                                  '%(message)s')
 | 
			
		||||
    loghandler.setFormatter(logformat)
 | 
			
		||||
 | 
			
		||||
    controller = BenchController(logger, options)
 | 
			
		||||
    if options.use_proxy:
 | 
			
		||||
        create_containers(logger, options)
 | 
			
		||||
 | 
			
		||||
    controller_class = DistributedBenchController if options.bench_clients \
 | 
			
		||||
        else BenchController
 | 
			
		||||
    controller = controller_class(logger, options)
 | 
			
		||||
    controller.run()
 | 
			
		||||
 | 
			
		||||
    if options.delete:
 | 
			
		||||
        delete_containers(logger, options)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										59
									
								
								bin/swift-bench-client
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										59
									
								
								bin/swift-bench-client
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
#!/usr/bin/env python
 | 
			
		||||
# Copyright (c) 2010-2012 OpenStack, LLC.
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
# you may not use this file except in compliance with the License.
 | 
			
		||||
# You may obtain a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | 
			
		||||
# implied.
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import sys
 | 
			
		||||
import signal
 | 
			
		||||
from optparse import OptionParser
 | 
			
		||||
 | 
			
		||||
from swift.common.bench import BenchServer
 | 
			
		||||
from swift.common.utils import LogAdapter
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    usage = "usage: %prog <ip> <port>"
 | 
			
		||||
    usage += "\n\nRun a client for distributed swift-bench runs."
 | 
			
		||||
    parser = OptionParser(usage=usage)
 | 
			
		||||
    parser.add_option('-o', '--log-level', dest='log_level',
 | 
			
		||||
                      default='info',
 | 
			
		||||
                      help='Logging level (debug, info, etc)')
 | 
			
		||||
 | 
			
		||||
    if len(sys.argv) != 3:
 | 
			
		||||
        parser.print_help()
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    options, args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    logger = logging.getLogger()
 | 
			
		||||
    logger.setLevel({
 | 
			
		||||
        'debug': logging.DEBUG,
 | 
			
		||||
        'info': logging.INFO,
 | 
			
		||||
        'warning': logging.WARNING,
 | 
			
		||||
        'error': logging.ERROR,
 | 
			
		||||
        'critical': logging.CRITICAL}.get(
 | 
			
		||||
            options.log_level.lower(), logging.INFO))
 | 
			
		||||
    loghandler = logging.StreamHandler()
 | 
			
		||||
    logger.addHandler(loghandler)
 | 
			
		||||
    logger = LogAdapter(logger, 'swift-bench-client')
 | 
			
		||||
    logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
 | 
			
		||||
                                  '%(message)s')
 | 
			
		||||
    loghandler.setFormatter(logformat)
 | 
			
		||||
 | 
			
		||||
    def sigterm(signum, frame):
 | 
			
		||||
        sys.exit('Termination signal received.')
 | 
			
		||||
    signal.signal(signal.SIGTERM, sigterm)
 | 
			
		||||
    signal.signal(signal.SIGINT, sigterm)
 | 
			
		||||
 | 
			
		||||
    server = BenchServer(logger, args[0], args[1])
 | 
			
		||||
    server.run()
 | 
			
		||||
							
								
								
									
										1
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								setup.py
									
									
									
									
									
								
							@@ -47,6 +47,7 @@ setup(
 | 
			
		||||
        'bin/swift-account-replicator',
 | 
			
		||||
        'bin/swift-account-server',
 | 
			
		||||
        'bin/swift-bench',
 | 
			
		||||
        'bin/swift-bench-client',
 | 
			
		||||
        'bin/swift-container-auditor',
 | 
			
		||||
        'bin/swift-container-replicator',
 | 
			
		||||
        'bin/swift-container-server',
 | 
			
		||||
 
 | 
			
		||||
@@ -13,21 +13,61 @@
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
 | 
			
		||||
import re
 | 
			
		||||
import sys
 | 
			
		||||
import uuid
 | 
			
		||||
import time
 | 
			
		||||
import random
 | 
			
		||||
import signal
 | 
			
		||||
import socket
 | 
			
		||||
import logging
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
from optparse import Values
 | 
			
		||||
 | 
			
		||||
import eventlet
 | 
			
		||||
import eventlet.pools
 | 
			
		||||
from eventlet.green.httplib import CannotSendRequest
 | 
			
		||||
 | 
			
		||||
from swift.common.utils import TRUE_VALUES
 | 
			
		||||
from swift.common.utils import TRUE_VALUES, LogAdapter
 | 
			
		||||
import swiftclient as client
 | 
			
		||||
from swift.common import direct_client
 | 
			
		||||
from swift.common.http import HTTP_CONFLICT
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import simplejson as json
 | 
			
		||||
except ImportError:
 | 
			
		||||
    import json
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _func_on_containers(logger, conf, concurrency_key, func):
 | 
			
		||||
    """Run a function on each container with concurrency."""
 | 
			
		||||
 | 
			
		||||
    bench = Bench(logger, conf, [])
 | 
			
		||||
    pool = eventlet.GreenPool(int(getattr(conf, concurrency_key)))
 | 
			
		||||
    for container in conf.containers:
 | 
			
		||||
        pool.spawn_n(func, bench.url, bench.token, container)
 | 
			
		||||
    pool.waitall()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def delete_containers(logger, conf):
 | 
			
		||||
    """Utility function to delete benchmark containers."""
 | 
			
		||||
 | 
			
		||||
    def _deleter(url, token, container):
 | 
			
		||||
        try:
 | 
			
		||||
            client.delete_container(url, token, container)
 | 
			
		||||
        except client.ClientException, e:
 | 
			
		||||
            if e.http_status != HTTP_CONFLICT:
 | 
			
		||||
                logger.warn("Unable to delete container '%s'. "
 | 
			
		||||
                    "Got http status '%d'." % (container, e.http_status))
 | 
			
		||||
 | 
			
		||||
    _func_on_containers(logger, conf, 'del_concurrency', _deleter)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def create_containers(logger, conf):
 | 
			
		||||
    """Utility function to create benchmark containers."""
 | 
			
		||||
 | 
			
		||||
    _func_on_containers(logger, conf, 'put_concurrency', client.put_container)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ConnectionPool(eventlet.pools.Pool):
 | 
			
		||||
 | 
			
		||||
@@ -39,6 +79,62 @@ class ConnectionPool(eventlet.pools.Pool):
 | 
			
		||||
        return client.http_connection(self.url)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BenchServer(object):
 | 
			
		||||
    """
 | 
			
		||||
    A BenchServer binds to an IP/port and listens for bench jobs.  A bench
 | 
			
		||||
    job consists of the normal conf "dict" encoded in JSON, terminated with an
 | 
			
		||||
    EOF.  The log level is at least INFO, but DEBUG may also be specified in
 | 
			
		||||
    the conf dict.
 | 
			
		||||
 | 
			
		||||
    The server will wait forever for jobs, running them one at a time.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, logger, bind_ip, bind_port):
 | 
			
		||||
        self.logger = logger
 | 
			
		||||
        self.bind_ip = bind_ip
 | 
			
		||||
        self.bind_port = int(bind_port)
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
        self.logger.info('Binding to %s:%s', self.bind_ip, self.bind_port)
 | 
			
		||||
        s.bind((self.bind_ip, self.bind_port))
 | 
			
		||||
        s.listen(20)
 | 
			
		||||
        while True:
 | 
			
		||||
            client, address = s.accept()
 | 
			
		||||
            self.logger.debug('Accepting connection from %s:%s', *address)
 | 
			
		||||
            client_file = client.makefile('rb+', 1)
 | 
			
		||||
            json_data = client_file.read()
 | 
			
		||||
            conf = Values(json.loads(json_data))
 | 
			
		||||
 | 
			
		||||
            self.logger.info(
 | 
			
		||||
                'Starting run for %s:%s [put/get/del_concurrency: %s/%s/%s, '
 | 
			
		||||
                'num_objects: %s, num_gets: %s]', address[0], address[1],
 | 
			
		||||
                conf.put_concurrency, conf.get_concurrency,
 | 
			
		||||
                conf.del_concurrency, conf.num_objects, conf.num_gets)
 | 
			
		||||
 | 
			
		||||
            logger = logging.getLogger('bench-server')
 | 
			
		||||
            level = logging.DEBUG if conf.log_level.lower() == 'debug' \
 | 
			
		||||
                else logging.INFO
 | 
			
		||||
            logger.setLevel(level)
 | 
			
		||||
            loghandler = logging.StreamHandler(stream=client_file)
 | 
			
		||||
            logformat = logging.Formatter(
 | 
			
		||||
                '%(server)s %(asctime)s %(levelname)s %(message)s')
 | 
			
		||||
            loghandler.setFormatter(logformat)
 | 
			
		||||
            logger.addHandler(loghandler)
 | 
			
		||||
            logger = LogAdapter(logger, 'swift-bench-server')
 | 
			
		||||
 | 
			
		||||
            controller = BenchController(logger, conf)
 | 
			
		||||
            try:
 | 
			
		||||
                controller.run()
 | 
			
		||||
            except socket.error:
 | 
			
		||||
                logger.warning('Socket error', exc_info=1)
 | 
			
		||||
 | 
			
		||||
            logger.logger.removeHandler(loghandler)
 | 
			
		||||
            client_file.close()
 | 
			
		||||
            client.close()
 | 
			
		||||
 | 
			
		||||
            self.logger.info('...bench run completed; waiting for next run.')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Bench(object):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, logger, conf, names):
 | 
			
		||||
@@ -64,8 +160,6 @@ class Bench(object):
 | 
			
		||||
            self.account = conf.account
 | 
			
		||||
            self.url = conf.url
 | 
			
		||||
            self.ip, self.port = self.url.split('/')[2].split(':')
 | 
			
		||||
        self.containers = ['%s_%d' % (conf.container_name, i)
 | 
			
		||||
            for i in xrange(int(conf.num_containers))]
 | 
			
		||||
 | 
			
		||||
        self.object_size = int(conf.object_size)
 | 
			
		||||
        self.object_sources = conf.object_sources
 | 
			
		||||
@@ -129,6 +223,88 @@ class Bench(object):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DistributedBenchController(object):
 | 
			
		||||
    """
 | 
			
		||||
    This class manages a distributed swift-bench run.  For this Controller
 | 
			
		||||
    class to make sense, the conf.bench_clients list must contain at least one
 | 
			
		||||
    entry.
 | 
			
		||||
 | 
			
		||||
    The idea is to split the configured load between one or more
 | 
			
		||||
    swift-bench-client processes, each of which use eventlet for concurrency.
 | 
			
		||||
    We deliberately take a simple, naive approach with these limitations:
 | 
			
		||||
        1) Concurrency, num_objects, and num_gets are spread evenly between the
 | 
			
		||||
           swift-bench-client processes.  With a low concurrency to
 | 
			
		||||
           swift-bench-client count ratio, rounding may result in a greater
 | 
			
		||||
           than desired aggregate concurrency.
 | 
			
		||||
        2) Each swift-bench-client process runs independently so some may
 | 
			
		||||
           finish up before others, i.e. the target aggregate concurrency is
 | 
			
		||||
           not necessarily present the whole time.  This may bias aggregate
 | 
			
		||||
           reported rates lower than a more efficient architecture.
 | 
			
		||||
        3) Because of #2, some swift-bench-client processes may be running GETs
 | 
			
		||||
           while others are still runinng their PUTs.  Because of this
 | 
			
		||||
           potential skew, distributed runs will not isolate one operation at a
 | 
			
		||||
           time like a single swift-bench run will.
 | 
			
		||||
        3) Reported aggregate rates are simply the sum of each
 | 
			
		||||
           swift-bench-client process reported FINAL number.  That's probably
 | 
			
		||||
           inaccurate somehow.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, logger, conf):
 | 
			
		||||
        self.logger = logger
 | 
			
		||||
        # ... INFO 1000 PUTS **FINAL** [0 failures], 34.9/s
 | 
			
		||||
        self.final_re = re.compile(
 | 
			
		||||
            'INFO (\d+) (.*) \*\*FINAL\*\* \[(\d+) failures\], (\d+\.\d+)/s')
 | 
			
		||||
        self.clients = conf.bench_clients
 | 
			
		||||
        del conf.bench_clients
 | 
			
		||||
        for k in ['put_concurrency', 'get_concurrency', 'del_concurrency',
 | 
			
		||||
                  'num_objects', 'num_gets']:
 | 
			
		||||
            setattr(conf, k, max(1, int(getattr(conf, k)) / len(self.clients)))
 | 
			
		||||
        self.conf = conf
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        eventlet.patcher.monkey_patch(socket=True)
 | 
			
		||||
        pool = eventlet.GreenPool(size=len(self.clients))
 | 
			
		||||
        pile = eventlet.GreenPile(pool)
 | 
			
		||||
        for client in self.clients:
 | 
			
		||||
            pile.spawn(self.do_run, client)
 | 
			
		||||
        results = {
 | 
			
		||||
            'PUTS': dict(count=0, failures=0, rate=0.0),
 | 
			
		||||
            'GETS': dict(count=0, failures=0, rate=0.0),
 | 
			
		||||
            'DEL': dict(count=0, failures=0, rate=0.0),
 | 
			
		||||
        }
 | 
			
		||||
        for result in pile:
 | 
			
		||||
            for k, v in result.iteritems():
 | 
			
		||||
                target = results[k]
 | 
			
		||||
                target['count'] += int(v['count'])
 | 
			
		||||
                target['failures'] += int(v['failures'])
 | 
			
		||||
                target['rate'] += float(v['rate'])
 | 
			
		||||
        for k in ['PUTS', 'GETS', 'DEL']:
 | 
			
		||||
            v = results[k]
 | 
			
		||||
            self.logger.info('%d %s **FINAL** [%d failures], %.1f/s' % (
 | 
			
		||||
                v['count'], k, v['failures'], v['rate']))
 | 
			
		||||
 | 
			
		||||
    def do_run(self, client):
 | 
			
		||||
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
        ip, port = client.split(':')
 | 
			
		||||
        s.connect((ip, int(port)))
 | 
			
		||||
        s.sendall(json.dumps(self.conf.__dict__))
 | 
			
		||||
        s.shutdown(socket.SHUT_WR)
 | 
			
		||||
        s_file = s.makefile('rb', 1)
 | 
			
		||||
        result = {}
 | 
			
		||||
        for line in s_file:
 | 
			
		||||
            match = self.final_re.search(line)
 | 
			
		||||
            if match:
 | 
			
		||||
                g = match.groups()
 | 
			
		||||
                result[g[1]] = {
 | 
			
		||||
                    'count': g[0],
 | 
			
		||||
                    'failures': g[2],
 | 
			
		||||
                    'rate': g[3],
 | 
			
		||||
                }
 | 
			
		||||
            else:
 | 
			
		||||
                sys.stderr.write('%s %s' % (client, line))
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BenchController(object):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, logger, conf):
 | 
			
		||||
@@ -177,16 +353,6 @@ class BenchDELETE(Bench):
 | 
			
		||||
        self.total = len(names)
 | 
			
		||||
        self.msg = 'DEL'
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        Bench.run(self)
 | 
			
		||||
        for container in self.containers:
 | 
			
		||||
            try:
 | 
			
		||||
                client.delete_container(self.url, self.token, container)
 | 
			
		||||
            except client.ClientException, e:
 | 
			
		||||
                if e.http_status != HTTP_CONFLICT:
 | 
			
		||||
                    self._log_status("Unable to delete container '%s'. " \
 | 
			
		||||
                        "Got http status '%d'." % (container, e.http_status))
 | 
			
		||||
 | 
			
		||||
    def _run(self, thread):
 | 
			
		||||
        if time.time() - self.heartbeat >= 15:
 | 
			
		||||
            self.heartbeat = time.time()
 | 
			
		||||
@@ -242,11 +408,7 @@ class BenchPUT(Bench):
 | 
			
		||||
        self.concurrency = self.put_concurrency
 | 
			
		||||
        self.total = self.total_objects
 | 
			
		||||
        self.msg = 'PUTS'
 | 
			
		||||
        if self.use_proxy:
 | 
			
		||||
            with self.connection() as conn:
 | 
			
		||||
                for container_name in self.containers:
 | 
			
		||||
                    client.put_container(self.url, self.token,
 | 
			
		||||
                        container_name, http_conn=conn)
 | 
			
		||||
        self.containers = conf.containers
 | 
			
		||||
 | 
			
		||||
    def _run(self, thread):
 | 
			
		||||
        if time.time() - self.heartbeat >= 15:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user