Easy profiling of benchmarks

This commit is contained in:
Tyler Hobbs
2013-11-22 18:00:09 -06:00
parent 95537a7eb9
commit f01df9ad48
6 changed files with 89 additions and 112 deletions

View File

@@ -1,8 +1,11 @@
from cProfile import Profile
import logging
import os.path
import sys
from threading import Thread
import time
from optparse import OptionParser
from greplin import scales
dirname = os.path.dirname(os.path.abspath(__file__))
@@ -68,7 +71,7 @@ def teardown(hosts):
session.execute("DROP KEYSPACE " + KEYSPACE)
def benchmark(run_fn):
def benchmark(thread_class):
options, args = parse_options()
for conn_class in options.supported_reactors:
setup(options.hosts)
@@ -87,10 +90,24 @@ def benchmark(run_fn):
""".format(table=TABLE))
values = {'key': 'key', 'a': 'a', 'b': 'b'}
per_thread = options.num_ops / options.threads
threads = []
log.debug("Beginning inserts...")
start = time.time()
try:
run_fn(session, query, values, options.num_ops, options.threads)
for i in range(options.threads):
thread = thread_class(i, session, query, values, per_thread, options.profile)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
end = time.time()
finally:
teardown(options.hosts)
@@ -137,6 +154,9 @@ def parse_options():
help='enable and print metrics for operations')
parser.add_option('-l', '--log-level', default='info',
help='logging level: debug, info, warning, or error')
parser.add_option('-p', '--profile', action='store_true', dest='profile',
help='Profile the run')
options, args = parser.parse_args()
options.hosts = options.hosts.split(',')
@@ -154,3 +174,24 @@ def parse_options():
options.supported_reactors = supported_reactors
return options, args
class BenchmarkThread(Thread):
def __init__(self, thread_num, session, query, values, num_queries, profile):
Thread.__init__(self)
self.thread_num = thread_num
self.session = session
self.query = query
self.values = values
self.num_queries = num_queries
self.profiler = Profile() if profile else None
def start_profile(self):
if self.profiler:
self.profiler.enable()
def finish_profile(self):
if self.profiler:
self.profiler.disable()
self.profiler.dump_stats('profile-%d' % self.thread_num)

View File

@@ -1,24 +1,20 @@
from base import benchmark
import logging
from itertools import count
from threading import Event, Thread
import logging
from threading import Event
from base import benchmark, BenchmarkThread
log = logging.getLogger(__name__)
initial = object()
class Runner(Thread):
class Runner(BenchmarkThread):
def __init__(self, session, query, values, num_queries, *args, **kwargs):
self.session = session
self.query = query
self.values = values
self.num_queries = num_queries
def __init__(self, *args, **kwargs):
BenchmarkThread.__init__(self, *args, **kwargs)
self.num_started = count()
self.num_finished = count()
self.event = Event()
Thread.__init__(self)
def handle_error(self, exc):
log.error("Error on insert: %r", exc)
@@ -36,27 +32,15 @@ class Runner(Thread):
future.add_callbacks(self.insert_next, self.handle_error)
def run(self):
self.start_profile()
for i in range(120):
self.insert_next(initial)
self.event.wait()
def execute(session, query, values, num_queries, num_threads):
per_thread = num_queries / num_threads
threads = []
for i in range(num_threads):
thread = Runner(session, query, values, per_thread)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
self.finish_profile()
if __name__ == "__main__":
benchmark(execute)
benchmark(Runner)

View File

@@ -1,19 +1,18 @@
import logging
import Queue
from threading import Thread
from base import benchmark
from base import benchmark, BenchmarkThread
log = logging.getLogger(__name__)
def execute(session, query, values, num_queries, num_threads):
class Runner(BenchmarkThread):
per_thread = num_queries / num_threads
def run():
def run(self):
futures = Queue.Queue(maxsize=121)
for i in range(per_thread):
self.start_profile()
for i in range(self.num_queries):
if i > 0 and i % 120 == 0:
# clear the existing queue
while True:
@@ -22,7 +21,7 @@ def execute(session, query, values, num_queries, num_threads):
except Queue.Empty:
break
future = session.execute_async(query, values)
future = self.session.execute_async(self.query, self.values)
futures.put_nowait(future)
while True:
@@ -31,19 +30,8 @@ def execute(session, query, values, num_queries, num_threads):
except Queue.Empty:
break
threads = []
for i in range(num_threads):
thread = Thread(target=run)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
self.finish_profile()
if __name__ == "__main__":
benchmark(execute)
benchmark(Runner)

View File

@@ -1,24 +1,23 @@
import logging
import Queue
from threading import Thread
from base import benchmark
from base import benchmark, BenchmarkThread
log = logging.getLogger(__name__)
def execute(session, query, values, num_queries, num_threads):
class Runner(BenchmarkThread):
per_thread = num_queries / num_threads
def run():
def run(self):
futures = Queue.Queue(maxsize=121)
for i in range(per_thread):
self.start_profile()
for i in range(self.num_queries):
if i >= 120:
old_future = futures.get_nowait()
old_future.result()
future = session.execute_async(query, values)
future = self.session.execute_async(self.query, self.values)
futures.put_nowait(future)
while True:
@@ -27,19 +26,8 @@ def execute(session, query, values, num_queries, num_threads):
except Queue.Empty:
break
threads = []
for i in range(num_threads):
thread = Thread(target=run)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
self.finish_profile
if __name__ == "__main__":
benchmark(execute)
benchmark(Runner)

View File

@@ -1,37 +1,25 @@
import logging
from threading import Thread
from base import benchmark
from base import benchmark, BenchmarkThread
log = logging.getLogger(__name__)
def execute(session, query, values, num_queries, num_threads):
class Runner(BenchmarkThread):
per_thread = num_queries / num_threads
def run():
def run(self):
futures = []
for i in range(per_thread):
future = session.execute_async(query, values)
self.start_profile()
for i in range(self.num_queries):
future = self.session.execute_async(self.query, self.values)
futures.append(future)
for future in futures:
future.result()
threads = []
for i in range(num_threads):
thread = Thread(target=run)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
self.finish_profile()
if __name__ == "__main__":
benchmark(execute)
benchmark(Runner)

View File

@@ -1,27 +1,15 @@
from threading import Thread
from base import benchmark, BenchmarkThread
from base import benchmark
class Runner(BenchmarkThread):
def execute(session, query, values, num_queries, num_threads):
def run(self):
self.start_profile()
per_thread = num_queries / num_threads
for i in xrange(self.num_queries):
self.session.execute(self.query, self.values)
def run():
for i in xrange(per_thread):
session.execute(query, values)
self.finish_profile()
threads = []
for i in range(num_threads):
thread = Thread(target=run)
thread.daemon = True
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
while thread.is_alive():
thread.join(timeout=0.5)
if __name__ == "__main__":
benchmark(execute)
benchmark(Runner)