diff --git a/benchmarks/base.py b/benchmarks/base.py index a1e5d8a4..d1c58895 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -65,7 +65,7 @@ def teardown(): def benchmark(run_fn): for conn_class in supported_reactors: setup() - log.info("Testing %s" % (conn_class.__name__,)) + log.info("==== %s ====" % (conn_class.__name__,)) cluster = Cluster(['127.0.0.1']) cluster.connection_class = conn_class diff --git a/benchmarks/single_thread_callback_full_pipeline.py b/benchmarks/single_thread_callback_full_pipeline.py index fa7cbf56..248a90e9 100644 --- a/benchmarks/single_thread_callback_full_pipeline.py +++ b/benchmarks/single_thread_callback_full_pipeline.py @@ -1,13 +1,11 @@ from base import benchmark import logging -from collections import deque from itertools import count from threading import Event log = logging.getLogger(__name__) -futures = deque() initial = object() def execute(session, query, values, num_queries): diff --git a/benchmarks/single_thread_future_batches.py b/benchmarks/single_thread_future_batches.py new file mode 100644 index 00000000..8396916a --- /dev/null +++ b/benchmarks/single_thread_future_batches.py @@ -0,0 +1,32 @@ +from base import benchmark + +import logging +import Queue + +log = logging.getLogger(__name__) + +def execute(session, query, values, num_queries): + + futures = Queue.Queue(maxsize=121) + + for i in range(num_queries): + if i > 0 and i % 120 == 0: + # clear the existing queue + while True: + try: + futures.get_nowait().result() + except Queue.Empty: + break + + future = session.execute_async(query, values) + futures.put_nowait(future) + + while True: + try: + futures.get_nowait().result() + except Queue.Empty: + break + + +if __name__ == "__main__": + benchmark(execute) diff --git a/benchmarks/single_thread_future_full_pipeline.py b/benchmarks/single_thread_future_full_pipeline.py new file mode 100644 index 00000000..a4b11ebc --- /dev/null +++ b/benchmarks/single_thread_future_full_pipeline.py @@ -0,0 +1,28 @@ +from base import benchmark + +import logging +import Queue + +log = logging.getLogger(__name__) + +def execute(session, query, values, num_queries): + + futures = Queue.Queue(maxsize=121) + + for i in range(num_queries): + if i >= 120: + old_future = futures.get_nowait() + old_future.result() + + future = session.execute_async(query, values) + futures.put_nowait(future) + + while True: + try: + futures.get_nowait().result() + except Queue.Empty: + break + + +if __name__ == "__main__": + benchmark(execute) diff --git a/benchmarks/single_thread_future_full_throttle.py b/benchmarks/single_thread_future_full_throttle.py new file mode 100644 index 00000000..18484dac --- /dev/null +++ b/benchmarks/single_thread_future_full_throttle.py @@ -0,0 +1,20 @@ +from base import benchmark + +import logging + +log = logging.getLogger(__name__) + +def execute(session, query, values, num_queries): + + futures = [] + + for i in range(num_queries): + future = session.execute_async(query, values) + futures.append(future) + + for future in futures: + future.result() + + +if __name__ == "__main__": + benchmark(execute)