From d12f6c0a3b71f577f43911cd7475731bfcb7a5ff Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 09:11:37 -0400 Subject: [PATCH 1/7] do not use protocol version and set_core_connections_per_host in setup/teardown --- benchmarks/base.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 812db42a..ec8242b8 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -63,8 +63,7 @@ TABLE = "testtable" def setup(hosts): log.info("Using 'cassandra' package from %s", cassandra.__path__) - cluster = Cluster(hosts, protocol_version=1) - cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) + cluster = Cluster(hosts) try: session = cluster.connect() @@ -91,8 +90,7 @@ def setup(hosts): def teardown(hosts): - cluster = Cluster(hosts, protocol_version=1) - cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) + cluster = Cluster(hosts) session = cluster.connect() session.execute("DROP KEYSPACE " + KEYSPACE) cluster.shutdown() From a01d8d8b70eee509b46826ac694e6bb6a07a960a Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 10:04:34 -0400 Subject: [PATCH 2/7] Add an cli option to specify the number of columns for the schema --- benchmarks/base.py | 49 ++++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index ec8242b8..39912be4 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -60,10 +60,10 @@ KEYSPACE = "testkeyspace" + str(int(time.time())) TABLE = "testtable" -def setup(hosts): +def setup(options): log.info("Using 'cassandra' package from %s", cassandra.__path__) - cluster = Cluster(hosts) + cluster = Cluster(options.hosts, protocol_version=options.protocol_version) try: session = cluster.connect() @@ -77,20 +77,21 @@ def setup(hosts): session.set_keyspace(KEYSPACE) log.debug("Creating table...") - session.execute(""" - CREATE TABLE %s ( + create_table_query = """ + CREATE TABLE {} ( thekey text, - col1 text, - col2 text, - PRIMARY KEY (thekey, col1) - ) - """ % TABLE) + """ + for i in range(options.num_columns): + create_table_query += "col{} text,\n".format(i) + create_table_query += "PRIMARY KEY (thekey))" + + session.execute(create_table_query.format(TABLE)) finally: cluster.shutdown() -def teardown(hosts): - cluster = Cluster(hosts) +def teardown(options): + cluster = Cluster(options.hosts, protocol_version=options.protocol_version) session = cluster.connect() session.execute("DROP KEYSPACE " + KEYSPACE) cluster.shutdown() @@ -99,7 +100,7 @@ def teardown(hosts): def benchmark(thread_class): options, args = parse_options() for conn_class in options.supported_reactors: - setup(options.hosts) + setup(options) log.info("==== %s ====" % (conn_class.__name__,)) kwargs = {'metrics_enabled': options.enable_metrics, @@ -112,10 +113,18 @@ def benchmark(thread_class): log.debug("Sleeping for two seconds...") time.sleep(2.0) - query = session.prepare(""" - INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?) - """.format(table=TABLE)) - values = ('key', 'a', 'b') + # Generate the INSERT query + insert_query = "INSERT INTO {} (thekey".format(TABLE) + for i in range(options.num_columns): + insert_query += ", col{}".format(i) + + insert_query += ") VALUES ('{}'".format('key') + + for i in range(options.num_columns): + insert_query += ", '{}'".format(i) + insert_query += ")" + + values = None per_thread = options.num_ops // options.threads threads = [] @@ -125,7 +134,7 @@ def benchmark(thread_class): try: for i in range(options.threads): thread = thread_class( - i, session, query, values, per_thread, + i, session, insert_query, values, per_thread, cluster.protocol_version, options.profile) thread.daemon = True threads.append(thread) @@ -140,7 +149,7 @@ def benchmark(thread_class): end = time.time() finally: cluster.shutdown() - teardown(options.hosts) + teardown(options) total = end - start log.info("Total time: %0.2fs" % total) @@ -188,8 +197,10 @@ def parse_options(): help='logging level: debug, info, warning, or error') parser.add_option('-p', '--profile', action='store_true', dest='profile', help='Profile the run') - parser.add_option('--protocol-version', type='int', dest='protocol_version', + parser.add_option('--protocol-version', type='int', dest='protocol_version', default=4, help='Native protocol version to use') + parser.add_option('-c', '--num-columns', type='int', dest='num_columns', default=2, + help='Specify the number of columns for the schema') options, args = parser.parse_args() From aaf78afb3a9bf2442f944cf315d7d861dadb1a0f Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 10:16:57 -0400 Subject: [PATCH 3/7] Add an cli option to specify the keyspace name and the ability to keep the data after the benchmark --- benchmarks/base.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 39912be4..b3b0fb04 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -71,10 +71,10 @@ def setup(options): session.execute(""" CREATE KEYSPACE %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } - """ % KEYSPACE) + """ % options.keyspace) log.debug("Setting keyspace...") - session.set_keyspace(KEYSPACE) + session.set_keyspace(options.keyspace) log.debug("Creating table...") create_table_query = """ @@ -93,7 +93,8 @@ def setup(options): def teardown(options): cluster = Cluster(options.hosts, protocol_version=options.protocol_version) session = cluster.connect() - session.execute("DROP KEYSPACE " + KEYSPACE) + if not options.keep_data: + session.execute("DROP KEYSPACE " + options.keyspace) cluster.shutdown() @@ -108,7 +109,7 @@ def benchmark(thread_class): if options.protocol_version: kwargs['protocol_version'] = options.protocol_version cluster = Cluster(options.hosts, **kwargs) - session = cluster.connect(KEYSPACE) + session = cluster.connect(options.keyspace) log.debug("Sleeping for two seconds...") time.sleep(2.0) @@ -201,6 +202,10 @@ def parse_options(): help='Native protocol version to use') parser.add_option('-c', '--num-columns', type='int', dest='num_columns', default=2, help='Specify the number of columns for the schema') + parser.add_option('-k', '--keyspace', type='str', dest='keyspace', default=KEYSPACE, + help='Specify the keyspace name for the schema') + parser.add_option('--keep-data', action='store_true', dest='keep_data', default=False, + help='Keep the data after the benchmark') options, args = parser.parse_args() From 1d52f7afdd85d00e550f383c8aacf7f851e443d1 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 10:43:07 -0400 Subject: [PATCH 4/7] Add an cli option to specify the column type for the schema --- benchmarks/base.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index b3b0fb04..6178d80f 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -19,6 +19,7 @@ import sys from threading import Thread import time from optparse import OptionParser +import uuid from greplin import scales @@ -59,6 +60,14 @@ except ImportError as exc: KEYSPACE = "testkeyspace" + str(int(time.time())) TABLE = "testtable" +COLUMN_VALUES = { + 'int': 42, + 'text': "'42'", + 'float': 42.0, + 'uuid': uuid.uuid4(), + 'timestamp': "'2016-02-03 04:05+0000'" +} + def setup(options): log.info("Using 'cassandra' package from %s", cassandra.__path__) @@ -82,7 +91,7 @@ def setup(options): thekey text, """ for i in range(options.num_columns): - create_table_query += "col{} text,\n".format(i) + create_table_query += "col{} {},\n".format(i, options.column_type) create_table_query += "PRIMARY KEY (thekey))" session.execute(create_table_query.format(TABLE)) @@ -122,7 +131,7 @@ def benchmark(thread_class): insert_query += ") VALUES ('{}'".format('key') for i in range(options.num_columns): - insert_query += ", '{}'".format(i) + insert_query += ", {}".format(COLUMN_VALUES[options.column_type]) insert_query += ")" values = None @@ -206,6 +215,9 @@ def parse_options(): help='Specify the keyspace name for the schema') parser.add_option('--keep-data', action='store_true', dest='keep_data', default=False, help='Keep the data after the benchmark') + parser.add_option('--column-type', type='str', dest='column_type', default='text', + help='Specify the column type for the schema (supported: int, text, float, uuid, timestamp)') + options, args = parser.parse_args() From 857bcbfcff55b18f2fed6bbff1e0b9d050261205 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 10:49:09 -0400 Subject: [PATCH 5/7] Reuse keyspace and table if they exists --- benchmarks/base.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 6178d80f..4e173f89 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -77,12 +77,16 @@ def setup(options): session = cluster.connect() log.debug("Creating keyspace...") - session.execute(""" - CREATE KEYSPACE %s - WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } - """ % options.keyspace) + try: + session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } + """ % options.keyspace) + + log.debug("Setting keyspace...") + except cassandra.AlreadyExists: + log.debug("Keyspace already exists") - log.debug("Setting keyspace...") session.set_keyspace(options.keyspace) log.debug("Creating table...") @@ -94,7 +98,11 @@ def setup(options): create_table_query += "col{} {},\n".format(i, options.column_type) create_table_query += "PRIMARY KEY (thekey))" - session.execute(create_table_query.format(TABLE)) + try: + session.execute(create_table_query.format(TABLE)) + except cassandra.AlreadyExists: + log.debug("Table already exists.") + finally: cluster.shutdown() From 3673fe0da82f66ad41631575930d6c8eb817428a Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 11:34:13 -0400 Subject: [PATCH 6/7] all requests now use a different partition key --- benchmarks/base.py | 2 +- benchmarks/callback_full_pipeline.py | 6 ++++-- benchmarks/future_batches.py | 3 ++- benchmarks/future_full_pipeline.py | 3 ++- benchmarks/future_full_throttle.py | 5 +++-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 4e173f89..074c61d1 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -136,7 +136,7 @@ def benchmark(thread_class): for i in range(options.num_columns): insert_query += ", col{}".format(i) - insert_query += ") VALUES ('{}'".format('key') + insert_query += ") VALUES ('{key}'" for i in range(options.num_columns): insert_query += ", {}".format(COLUMN_VALUES[options.column_type]) diff --git a/benchmarks/callback_full_pipeline.py b/benchmarks/callback_full_pipeline.py index 3736991b..da043d82 100644 --- a/benchmarks/callback_full_pipeline.py +++ b/benchmarks/callback_full_pipeline.py @@ -41,8 +41,10 @@ class Runner(BenchmarkThread): if next(self.num_finished) >= self.num_queries: self.event.set() - if next(self.num_started) <= self.num_queries: - future = self.session.execute_async(self.query, self.values, timeout=None) + i = next(self.num_started) + if i <= self.num_queries: + key = "{}-{}".format(self.thread_num, i) + future = self.session.execute_async(self.query.format(key=key), timeout=None) future.add_callbacks(self.insert_next, self.insert_next) def run(self): diff --git a/benchmarks/future_batches.py b/benchmarks/future_batches.py index 91c250bc..c134d520 100644 --- a/benchmarks/future_batches.py +++ b/benchmarks/future_batches.py @@ -35,7 +35,8 @@ class Runner(BenchmarkThread): except queue.Empty: break - future = self.session.execute_async(self.query, self.values) + key = "{}-{}".format(self.thread_num, i) + future = self.session.execute_async(self.query.format(key=key)) futures.put_nowait(future) while True: diff --git a/benchmarks/future_full_pipeline.py b/benchmarks/future_full_pipeline.py index 40682e04..b2b2c7d1 100644 --- a/benchmarks/future_full_pipeline.py +++ b/benchmarks/future_full_pipeline.py @@ -31,7 +31,8 @@ class Runner(BenchmarkThread): old_future = futures.get_nowait() old_future.result() - future = self.session.execute_async(self.query, self.values) + key = "{}-{}".format(self.thread_num, i) + future = self.session.execute_async(self.query.format(key=key)) futures.put_nowait(future) while True: diff --git a/benchmarks/future_full_throttle.py b/benchmarks/future_full_throttle.py index 27d87442..a98bfcec 100644 --- a/benchmarks/future_full_throttle.py +++ b/benchmarks/future_full_throttle.py @@ -25,8 +25,9 @@ class Runner(BenchmarkThread): self.start_profile() - for _ in range(self.num_queries): - future = self.session.execute_async(self.query, self.values) + for i in range(self.num_queries): + key = "{}-{}".format(self.thread_num, i) + future = self.session.execute_async(self.query.format(key=key)) futures.append(future) for future in futures: From 86a793f0510dddb0cd92ebcd8f0f248eeb7ea2d8 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 26 Apr 2016 12:42:24 -0400 Subject: [PATCH 7/7] Add read benchmark support --- benchmarks/base.py | 31 +++++++++++++++++----------- benchmarks/callback_full_pipeline.py | 2 +- benchmarks/future_batches.py | 2 +- benchmarks/future_full_pipeline.py | 2 +- benchmarks/future_full_throttle.py | 2 +- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 074c61d1..e9184697 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -131,28 +131,30 @@ def benchmark(thread_class): log.debug("Sleeping for two seconds...") time.sleep(2.0) - # Generate the INSERT query - insert_query = "INSERT INTO {} (thekey".format(TABLE) - for i in range(options.num_columns): - insert_query += ", col{}".format(i) - insert_query += ") VALUES ('{key}'" + # Generate the query + if options.read: + query = "SELECT * FROM {} WHERE thekey = '{{key}}'".format(TABLE) + else: + query = "INSERT INTO {} (thekey".format(TABLE) + for i in range(options.num_columns): + query += ", col{}".format(i) - for i in range(options.num_columns): - insert_query += ", {}".format(COLUMN_VALUES[options.column_type]) - insert_query += ")" - - values = None + query += ") VALUES ('{key}'" + for i in range(options.num_columns): + query += ", {}".format(COLUMN_VALUES[options.column_type]) + query += ")" + values = None # we don't use that anymore. Keeping it in case we go back to prepared statements. per_thread = options.num_ops // options.threads threads = [] - log.debug("Beginning inserts...") + log.debug("Beginning {}...".format('reads' if options.read else 'inserts')) start = time.time() try: for i in range(options.threads): thread = thread_class( - i, session, insert_query, values, per_thread, + i, session, query, values, per_thread, cluster.protocol_version, options.profile) thread.daemon = True threads.append(thread) @@ -225,6 +227,8 @@ def parse_options(): help='Keep the data after the benchmark') parser.add_option('--column-type', type='str', dest='column_type', default='text', help='Specify the column type for the schema (supported: int, text, float, uuid, timestamp)') + parser.add_option('--read', action='store_true', dest='read', default=False, + help='Read mode') options, args = parser.parse_args() @@ -269,6 +273,9 @@ class BenchmarkThread(Thread): if self.profiler: self.profiler.enable() + def run_query(self, key, **kwargs): + return self.session.execute_async(self.query.format(key=key), **kwargs) + def finish_profile(self): if self.profiler: self.profiler.disable() diff --git a/benchmarks/callback_full_pipeline.py b/benchmarks/callback_full_pipeline.py index da043d82..a7990d80 100644 --- a/benchmarks/callback_full_pipeline.py +++ b/benchmarks/callback_full_pipeline.py @@ -44,7 +44,7 @@ class Runner(BenchmarkThread): i = next(self.num_started) if i <= self.num_queries: key = "{}-{}".format(self.thread_num, i) - future = self.session.execute_async(self.query.format(key=key), timeout=None) + future = self.run_query(key, timeout=None) future.add_callbacks(self.insert_next, self.insert_next) def run(self): diff --git a/benchmarks/future_batches.py b/benchmarks/future_batches.py index c134d520..c8305369 100644 --- a/benchmarks/future_batches.py +++ b/benchmarks/future_batches.py @@ -36,7 +36,7 @@ class Runner(BenchmarkThread): break key = "{}-{}".format(self.thread_num, i) - future = self.session.execute_async(self.query.format(key=key)) + future = self.run_query(key) futures.put_nowait(future) while True: diff --git a/benchmarks/future_full_pipeline.py b/benchmarks/future_full_pipeline.py index b2b2c7d1..ecc2ce6f 100644 --- a/benchmarks/future_full_pipeline.py +++ b/benchmarks/future_full_pipeline.py @@ -32,7 +32,7 @@ class Runner(BenchmarkThread): old_future.result() key = "{}-{}".format(self.thread_num, i) - future = self.session.execute_async(self.query.format(key=key)) + future = self.run_query(key) futures.put_nowait(future) while True: diff --git a/benchmarks/future_full_throttle.py b/benchmarks/future_full_throttle.py index a98bfcec..2e47b19f 100644 --- a/benchmarks/future_full_throttle.py +++ b/benchmarks/future_full_throttle.py @@ -27,7 +27,7 @@ class Runner(BenchmarkThread): for i in range(self.num_queries): key = "{}-{}".format(self.thread_num, i) - future = self.session.execute_async(self.query.format(key=key)) + future = self.run_query(key) futures.append(future) for future in futures: