35deeef798
Currently, Zaqar bench doesn't pass insecure or cert when build Zaqar client which cause it doesn't work with SSL. This patch fix it and also deprecates the 'verbose' option which is replaced with 'debug'. Closes-Bug: #1607124 Change-Id: I7411b9a310abb5e51f91bac0766a6fe7d684741c
196 lines
6.0 KiB
Python
196 lines
6.0 KiB
Python
# Copyright (c) 2014 Rackspace, Inc.
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import multiprocessing as mp
|
|
import random
|
|
import sys
|
|
import time
|
|
|
|
from gevent import monkey as curious_george
|
|
curious_george.patch_all(thread=False, select=False)
|
|
import gevent
|
|
import marktime
|
|
from zaqarclient.transport import errors
|
|
|
|
from zaqar.bench import config
|
|
from zaqar.bench import helpers
|
|
|
|
CONF = config.conf
|
|
|
|
|
|
def claim_delete(queues, stats, test_duration, ttl, grace, limit):
|
|
"""Consumer Worker
|
|
|
|
The Consumer Worker continuously claims and deletes messages
|
|
for the specified duration. The time taken for each claim and
|
|
delete is recorded for calculating throughput and latency.
|
|
"""
|
|
|
|
end = time.time() + test_duration
|
|
claim_total_elapsed = 0
|
|
delete_total_elapsed = 0
|
|
total_failed_requests = 0
|
|
claim_total_requests = 0
|
|
delete_total_requests = 0
|
|
|
|
while time.time() < end:
|
|
# NOTE(kgriffs): Distribute requests across all queues evenly.
|
|
queue = random.choice(queues)
|
|
|
|
try:
|
|
marktime.start('claim_message')
|
|
|
|
claim = queue.claim(ttl=ttl, grace=grace, limit=limit)
|
|
|
|
claim_total_elapsed += marktime.stop('claim_message').seconds
|
|
claim_total_requests += 1
|
|
|
|
except errors.TransportError as ex:
|
|
sys.stderr.write("Could not claim messages : {0}\n".format(ex))
|
|
total_failed_requests += 1
|
|
|
|
else:
|
|
for msg in claim:
|
|
try:
|
|
marktime.start('delete_message')
|
|
|
|
msg.delete()
|
|
|
|
elapsed = marktime.stop('delete_message').seconds
|
|
delete_total_elapsed += elapsed
|
|
delete_total_requests += 1
|
|
|
|
except errors.TransportError as ex:
|
|
msg = "Could not delete messages: {0}\n".format(ex)
|
|
sys.stderr.write(msg)
|
|
total_failed_requests += 1
|
|
|
|
total_requests = (claim_total_requests +
|
|
delete_total_requests +
|
|
total_failed_requests)
|
|
|
|
stats.put({
|
|
'total_requests': total_requests,
|
|
'claim_total_requests': claim_total_requests,
|
|
'delete_total_requests': delete_total_requests,
|
|
'claim_total_elapsed': claim_total_elapsed,
|
|
'delete_total_elapsed': delete_total_elapsed,
|
|
})
|
|
|
|
|
|
def load_generator(stats, num_workers, num_queues,
|
|
test_duration, url, ttl, grace, limit):
|
|
|
|
cli = helpers.get_new_client()
|
|
queues = []
|
|
for queue_name in helpers.queue_names:
|
|
queues.append(cli.queue(queue_name))
|
|
|
|
gevent.joinall([
|
|
gevent.spawn(claim_delete,
|
|
queues, stats, test_duration, ttl, grace, limit)
|
|
|
|
for _ in range(num_workers)
|
|
])
|
|
|
|
|
|
def crunch(stats):
|
|
total_requests = 0
|
|
claim_total_elapsed = 0.0
|
|
delete_total_elapsed = 0.0
|
|
claim_total_requests = 0
|
|
delete_total_requests = 0
|
|
|
|
while not stats.empty():
|
|
entry = stats.get_nowait()
|
|
total_requests += entry['total_requests']
|
|
claim_total_elapsed += entry['claim_total_elapsed']
|
|
delete_total_elapsed += entry['delete_total_elapsed']
|
|
claim_total_requests += entry['claim_total_requests']
|
|
delete_total_requests += entry['delete_total_requests']
|
|
|
|
return (total_requests, claim_total_elapsed, delete_total_elapsed,
|
|
claim_total_requests, delete_total_requests)
|
|
|
|
|
|
def run(upstream_queue):
|
|
num_procs = CONF.consumer_processes
|
|
num_workers = CONF.consumer_workers
|
|
num_queues = CONF.num_queues
|
|
|
|
# Stats that will be reported
|
|
duration = 0
|
|
total_requests = 0
|
|
successful_requests = 0
|
|
claim_total_requests = 0
|
|
delete_total_requests = 0
|
|
throughput = 0
|
|
claim_latency = 0
|
|
delete_latency = 0
|
|
|
|
# Performance test
|
|
if num_procs and num_workers:
|
|
stats = mp.Queue()
|
|
# TODO(TheSriram) : Make ttl and grace configurable
|
|
args = (stats, num_workers, num_queues, CONF.time, CONF.server_url,
|
|
300, 200, CONF.messages_per_claim)
|
|
|
|
procs = [mp.Process(target=load_generator, args=args)
|
|
for _ in range(num_procs)]
|
|
|
|
if CONF.debug:
|
|
print('\nStarting consumers (cp={0}, cw={1})...'.format(
|
|
num_procs, num_workers))
|
|
|
|
start = time.time()
|
|
|
|
for each_proc in procs:
|
|
each_proc.start()
|
|
|
|
for each_proc in procs:
|
|
each_proc.join()
|
|
|
|
(total_requests, claim_total_elapsed, delete_total_elapsed,
|
|
claim_total_requests, delete_total_requests) = crunch(stats)
|
|
|
|
successful_requests = claim_total_requests + delete_total_requests
|
|
duration = time.time() - start
|
|
|
|
# NOTE(kgriffs): Duration should never be zero
|
|
throughput = successful_requests / duration
|
|
|
|
if claim_total_requests:
|
|
claim_latency = (1000 * claim_total_elapsed /
|
|
claim_total_requests)
|
|
|
|
if delete_total_requests:
|
|
delete_latency = (1000 * delete_total_elapsed /
|
|
delete_total_requests)
|
|
|
|
upstream_queue.put({
|
|
'consumer': {
|
|
'duration_sec': duration,
|
|
'total_reqs': total_requests,
|
|
'claim_total_requests': claim_total_requests,
|
|
'successful_reqs': successful_requests,
|
|
'messages_processed': delete_total_requests,
|
|
'reqs_per_sec': throughput,
|
|
'ms_per_claim': claim_latency,
|
|
'ms_per_delete': delete_latency,
|
|
}
|
|
})
|