Refactored swift-bench to reuse client.py and direct_client.py
This commit is contained in:
parent
b4a964e32d
commit
143e9efb0a
|
@ -27,8 +27,9 @@ CONF_DEFAULTS = {
|
|||
'container_name': uuid.uuid4().hex,
|
||||
'use_proxy': 'yes',
|
||||
'url': '',
|
||||
'devices': 'sdb',
|
||||
'logging_level': 'INFO',
|
||||
'account': '',
|
||||
'devices': 'sdb1',
|
||||
'log_level': 'INFO',
|
||||
'timeout': '10',
|
||||
}
|
||||
|
||||
|
@ -93,7 +94,7 @@ if __name__ == '__main__':
|
|||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL}.get(
|
||||
options.logging_level, logging.INFO))
|
||||
options.log_level.lower(), logging.INFO))
|
||||
loghandler = logging.StreamHandler()
|
||||
logformat = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
||||
loghandler.setFormatter(logformat)
|
||||
|
|
|
@ -2,33 +2,28 @@ import uuid
|
|||
import time
|
||||
import random
|
||||
from urlparse import urlparse
|
||||
from contextlib import contextmanager
|
||||
|
||||
import eventlet.pools
|
||||
from eventlet.green.httplib import HTTPSConnection, \
|
||||
HTTPResponse, CannotSendRequest
|
||||
from eventlet.green.httplib import CannotSendRequest
|
||||
|
||||
from swift.common.bufferedhttp \
|
||||
import BufferedHTTPConnection as HTTPConnection
|
||||
from swift.common.utils import TRUE_VALUES
|
||||
from swift.common import client
|
||||
from swift.common import direct_client
|
||||
|
||||
|
||||
class ConnectionPool(eventlet.pools.Pool):
|
||||
|
||||
def __init__(self, url, size):
|
||||
self.url = url
|
||||
self.url_parsed = urlparse(self.url)
|
||||
eventlet.pools.Pool.__init__(self, size, size)
|
||||
|
||||
def create(self):
|
||||
if self.url_parsed[0] == 'https':
|
||||
hc = HTTPSConnection(self.url_parsed[1])
|
||||
elif self.url_parsed[0] == 'http':
|
||||
hc = HTTPConnection(self.url_parsed[1])
|
||||
else:
|
||||
raise Exception("Can't handle %s" % self.url_parsed[0])
|
||||
return hc
|
||||
return client.http_connection(self.url)
|
||||
|
||||
|
||||
class Bench(object):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
self.logger = logger
|
||||
self.user = conf.user
|
||||
|
@ -36,33 +31,18 @@ class Bench(object):
|
|||
self.auth_url = conf.auth
|
||||
self.use_proxy = conf.use_proxy in TRUE_VALUES
|
||||
if self.use_proxy:
|
||||
# Get the auth token
|
||||
parsed = urlparse(self.auth_url)
|
||||
if parsed.scheme == 'http':
|
||||
hc = HTTPConnection(parsed.netloc)
|
||||
elif parsed.scheme == 'https':
|
||||
hc = HTTPSConnection(parsed.netloc)
|
||||
else:
|
||||
raise ClientException(
|
||||
'Cannot handle protocol scheme %s for url %s' %
|
||||
(parsed.scheme, self.auth_url))
|
||||
hc_args = ('GET', parsed.path, None,
|
||||
{'X-Auth-User': self.user, 'X-Auth-Key': self.key})
|
||||
hc.request(*hc_args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
if hcr.status != 204:
|
||||
raise Exception("Could not authenticate (%s)" % hcr.status)
|
||||
self.token = hcr.getheader('x-auth-token')
|
||||
self.account = hcr.getheader('x-storage-url').split('/')[-1]
|
||||
url, token = client.get_auth(self.auth_url, self.user, self.key)
|
||||
self.token = token
|
||||
self.account = url.split('/')[-1]
|
||||
if conf.url == '':
|
||||
self.url = hcr.getheader('x-storage-url')
|
||||
self.url = url
|
||||
else:
|
||||
self.url = conf.url
|
||||
else:
|
||||
self.token = 'SlapChop!'
|
||||
self.account = conf.account
|
||||
self.url = conf.url
|
||||
self.ip, self.port = self.url.split('/')[2].split(':')
|
||||
self.container_name = conf.container_name
|
||||
|
||||
self.object_size = int(conf.object_size)
|
||||
|
@ -78,7 +58,6 @@ class Bench(object):
|
|||
self.total_objects = int(conf.num_objects)
|
||||
self.total_gets = int(conf.num_gets)
|
||||
self.timeout = int(conf.timeout)
|
||||
self.url_parsed = urlparse(self.url)
|
||||
self.devices = conf.devices.split()
|
||||
self.names = names
|
||||
self.conn_pool = ConnectionPool(self.url,
|
||||
|
@ -92,24 +71,12 @@ class Bench(object):
|
|||
(float(self.complete) / total),
|
||||
))
|
||||
|
||||
def _create_connection(self):
|
||||
if self.url_parsed[0] == 'https':
|
||||
hc = HTTPSConnection(self.url_parsed[1])
|
||||
elif self.url_parsed[0] == 'http':
|
||||
hc = HTTPConnection(self.url_parsed[1])
|
||||
else:
|
||||
raise Exception("Can't handle %s" % self.url_parsed[0])
|
||||
return hc
|
||||
|
||||
def _send_request(self, *args):
|
||||
hc = self.conn_pool.get()
|
||||
@contextmanager
|
||||
def connection(self):
|
||||
try:
|
||||
start = time.time()
|
||||
hc = self.conn_pool.get()
|
||||
try:
|
||||
hc.request(*args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
hcr.close()
|
||||
yield hc
|
||||
except CannotSendRequest:
|
||||
self.logger.info("CannotSendRequest. Skipping...")
|
||||
try:
|
||||
|
@ -117,16 +84,7 @@ class Bench(object):
|
|||
except:
|
||||
pass
|
||||
self.failures += 1
|
||||
hc = self._create_connection()
|
||||
return
|
||||
total = time.time() - start
|
||||
self.logger.debug("%s %s: %04f" %
|
||||
(args[0], args[1], total))
|
||||
if hcr.status < 200 or hcr.status > 299:
|
||||
self.failures += 1
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
hc = self.conn_pool.create()
|
||||
finally:
|
||||
self.conn_pool.put(hc)
|
||||
|
||||
|
@ -147,6 +105,7 @@ class Bench(object):
|
|||
|
||||
|
||||
class BenchController(object):
|
||||
|
||||
def __init__(self, logger, conf):
|
||||
self.logger = logger
|
||||
self.conf = conf
|
||||
|
@ -166,6 +125,7 @@ class BenchController(object):
|
|||
|
||||
|
||||
class BenchDELETE(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.del_concurrency
|
||||
|
@ -176,24 +136,24 @@ class BenchDELETE(Bench):
|
|||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('DEL')
|
||||
device, partition, path = self.names.pop()
|
||||
headers = {
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
'X-ID': str(uuid.uuid4()),
|
||||
'X-Auth-Token': self.token,
|
||||
}
|
||||
if self.use_proxy:
|
||||
hc_args = ('DELETE', "/v1/%s/%s/%s" %
|
||||
(self.account, self.container_name, path), '', headers)
|
||||
else:
|
||||
hc_args = ('DELETE', "/%s/%s/%s/%s/%s" %
|
||||
(device, partition, self.account, self.container_name, path),
|
||||
'', headers)
|
||||
self._send_request(*hc_args)
|
||||
device, partition, name = self.names.pop()
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.delete_object(self.url, self.token,
|
||||
self.container_name, name, http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_delete_object(node, partition,
|
||||
self.account, self.container_name, name)
|
||||
except client.ClientException, e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchGET(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.get_concurrency
|
||||
|
@ -205,42 +165,32 @@ class BenchGET(Bench):
|
|||
self.heartbeat = time.time()
|
||||
self._log_status('GETS')
|
||||
device, partition, name = random.choice(self.names)
|
||||
headers = {
|
||||
'X-Auth-Token': self.token,
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
}
|
||||
if self.use_proxy:
|
||||
hc_args = ('GET', '/v1/%s/%s/%s' %
|
||||
(self.account, self.container_name, name), '', headers)
|
||||
else:
|
||||
hc_args = ('GET', '/%s/%s/%s/%s/%s' %
|
||||
(device, partition, self.account, self.container_name, name),
|
||||
'', headers)
|
||||
self._send_request(*hc_args)
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.get_object(self.url, self.token,
|
||||
self.container_name, name, http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_get_object(node, partition,
|
||||
self.account, self.container_name, name)
|
||||
except client.ClientException, e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchPUT(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.put_concurrency
|
||||
self.total = self.total_objects
|
||||
self.msg = 'PUTS'
|
||||
if self.use_proxy:
|
||||
# Create the container
|
||||
if self.url.startswith('http://'):
|
||||
hc = HTTPConnection(self.url.split('/')[2])
|
||||
else:
|
||||
hc = HTTPSConnection(self.url.split('/')[2])
|
||||
hc_args = ('PUT',
|
||||
'/v1/%s/%s' % (self.account, self.container_name),
|
||||
None, {'X-Auth-Token': self.token})
|
||||
hc.request(*hc_args)
|
||||
hcr = hc.getresponse()
|
||||
hcrd = hcr.read()
|
||||
if hcr.status < 200 or hcr.status > 299:
|
||||
raise Exception('Could not create container %s: code: %s' %
|
||||
(self.container_name, hcr.status))
|
||||
with self.connection() as conn:
|
||||
client.put_container(self.url, self.token,
|
||||
self.container_name, http_conn=conn)
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
|
@ -251,21 +201,21 @@ class BenchPUT(Bench):
|
|||
source = random.choice(self.files)
|
||||
else:
|
||||
source = '0' * self.object_size
|
||||
headers = {
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-ID': str(uuid.uuid4()),
|
||||
'X-Auth-Token': self.token,
|
||||
'X-Timestamp': "%013.05f" % time.time(),
|
||||
}
|
||||
device = random.choice(self.devices)
|
||||
partition = str(random.randint(1, 3000))
|
||||
if self.use_proxy:
|
||||
hc_args = ('PUT', '/v1/%s/%s/%s' %
|
||||
(self.account, self.container_name, name), source, headers)
|
||||
else:
|
||||
hc_args = ('PUT', '/%s/%s/%s/%s/%s' %
|
||||
(device, partition, self.account, self.container_name, name),
|
||||
source, headers)
|
||||
if self._send_request(*hc_args):
|
||||
self.names.append((device, partition, name))
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.put_object(self.url, self.token,
|
||||
self.container_name, name, source,
|
||||
content_length=len(source), http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_put_object(node, partition,
|
||||
self.account, self.container_name, name, source,
|
||||
content_length=len(source))
|
||||
except client.ClientException, e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
self.names.append((device, partition, name))
|
||||
self.complete += 1
|
||||
|
|
Loading…
Reference in New Issue