diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 3ea397f0..4c3a3ab0 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -203,7 +203,7 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True): if start: log.debug("Starting ccm %s cluster", cluster_name) cluster.start(wait_for_binary_proto=True, wait_other_notice=True) - setup_test_keyspace() + setup_keyspace() CCM_CLUSTER = cluster except Exception: @@ -230,7 +230,7 @@ def teardown_package(): log.warn('Did not find cluster: %s' % cluster_name) -def setup_test_keyspace(): +def setup_keyspace(): # wait for nodes to startup time.sleep(10) diff --git a/tests/integration/long/test_custom_payload.py b/tests/integration/long/test_custom_payload.py new file mode 100644 index 00000000..f131a5e7 --- /dev/null +++ b/tests/integration/long/test_custom_payload.py @@ -0,0 +1,201 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from cassandra.query import (SimpleStatement, BatchStatement, BatchType) +from cassandra.cluster import Cluster + +from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace + + +def setup_module(): + """ + We need some custom setup for this module. All unit tests in this module + require protocol >=4. We won't bother going through the setup required unless that is the + protocol version we are using. + """ + + # If we aren't at protocol v 4 or greater don't waste time setting anything up, all tests will be skipped + if PROTOCOL_VERSION >= 4: + # Don't start the ccm cluster until we get the custom jvm argument specified + use_singledc(start=False) + ccm_cluster = get_cluster() + # if needed stop CCM cluster + ccm_cluster.stop() + # This will enable the Mirroring query handler which will echo our custom payload k,v pairs back to us + jmv_args = [ + " -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"] + ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=jmv_args) + # wait for nodes to startup + setup_keyspace() + + +def teardown_module(): + """ + The rests of the tests don't need our custom payload query handle so stop the cluster so we + don't impact other tests + """ + + ccm_cluster = get_cluster() + if ccm_cluster is not None: + ccm_cluster.stop() + + +class CustomPayloadTests(unittest.TestCase): + + def setUp(self): + """ + Test is skipped if run with cql version <4 + """ + + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest( + "Native protocol 4,0+ is required for custom payloads, currently using %r" + % (PROTOCOL_VERSION,)) + + self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect() + + def tearDown(self): + + self.cluster.shutdown() + + def test_custom_query_basic(self): + """ + Test to validate that custom payloads work with simple queries + + creates a simple query and ensures that custom payloads are passed to C*. A custom + query provider is used with C* so we can validate that same custom payloads are sent back + with the results + + + @since 2.6 + @jira_ticket PYTHON-280 + @expected_result valid custom payloads should be sent and received + + @test_category queries:custom_payload + """ + + # Create a simple query statement a + query = "SELECT * FROM system.local" + statement = SimpleStatement(query) + # Validate that various types of custom payloads are sent and received okay + self.validate_various_custom_payloads(statement=statement) + + def test_custom_query_batching(self): + """ + Test to validate that custom payloads work with batch queries + + creates a batch query and ensures that custom payloads are passed to C*. A custom + query provider is used with C* so we can validate that same custom payloads are sent back + with the results + + + @since 2.6 + @jira_ticket PYTHON-280 + @expected_result valid custom payloads should be sent and received + + @test_category queries:custom_payload + """ + + # Construct Batch Statement + batch = BatchStatement(BatchType.LOGGED) + for i in range(10): + batch.add(SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)"), (i, i)) + + # Validate that various types of custom payloads are sent and received okay + self.validate_various_custom_payloads(statement=batch) + + def test_custom_query_prepared(self): + """ + Test to validate that custom payloads work with prepared queries + + creates a batch query and ensures that custom payloads are passed to C*. A custom + query provider is used with C* so we can validate that same custom payloads are sent back + with the results + + + @since 2.6 + @jira_ticket PYTHON-280 + @expected_result valid custom payloads should be sent and received + + @test_category queries:custom_payload + """ + + # Construct prepared statement + prepared = self.session.prepare( + """ + INSERT INTO test3rf.test (k, v) VALUES (?, ?) + """) + + bound = prepared.bind((1, None)) + + # Validate that various custom payloads are validated correctly + self.validate_various_custom_payloads(statement=bound) + + def validate_various_custom_payloads(self, statement): + """ + This is a utility method that given a statement will attempt + to submit the statement with various custom payloads. It will + validate that the custom payloads are sent and received correctly. + + @param statement The statement to validate the custom queries in conjunction with + """ + + # Simple key value + custom_payload = {'test': 'test_return'} + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + # no key value + custom_payload = {'': ''} + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + # Space value + custom_payload = {' ': ' '} + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + # Long key value pair + key_value = "x" * 10000 + custom_payload = {key_value: key_value} + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + # Max supported value key pairs according C* binary protocol v4 should be 65534 (unsigned short max value) + for i in range(65534): + custom_payload[str(i)] = str(i) + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + # Add one custom payload to this is too many key value pairs and should fail + custom_payload[str(65535)] = str(65535) + with self.assertRaises(ValueError): + self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload) + + def execute_async_validate_custom_payload(self, statement, custom_payload): + """ + This is just a simple method that submits a statement with a payload, and validates + that the custom payload we submitted matches the one that we got back + @param statement The statement to execute + @param custom_payload The custom payload to submit with + """ + + # Submit the statement with our custom payload. Validate the one + # we receive from the server matches + response_future = self.session.execute_async(statement, custom_payload=custom_payload) + response_future.result(timeout=10.0) + returned_custom_payload = response_future.custom_payload + self.assertEqual(custom_payload, returned_custom_payload) \ No newline at end of file diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 7f84f9ab..6b139c03 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -26,7 +26,7 @@ from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement, from cassandra.cluster import Cluster from cassandra.policies import HostDistance -from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions +from tests.integration import use_singledc, PROTOCOL_VERSION def setup_module(): @@ -93,40 +93,38 @@ class QueryTests(unittest.TestCase): Test to validate that client trace contains client ip information. creates a simple query and ensures that the client trace information is present. This will - only be the case if the c* version is 3.0 or greater + only be the case if the c* version is 2.2 or greater - - @since 3.0 + @since 2.2 @jira_ticket PYTHON-235 - @expected_result client address should be present in C* >= 3, otherwise should be none. + @expected_result client address should be present in C* >= 2.2, otherwise should be none. @test_category tracing -+ """ - #The current version on the trunk doesn't have the version set to 3.0 yet. - #For now we will use the protocol version. Once they update the version on C* trunk - #we can use the C*. See below - #self._cass_version, self._cql_version = get_server_versions() - #if self._cass_version < (3, 0): - # raise unittest.SkipTest("Client IP was not present in trace until C* 3.0") + """ + if PROTOCOL_VERSION < 4: - raise unittest.SkipTest( - "Protocol 4+ is required for client ip tracing, currently testing against %r" - % (PROTOCOL_VERSION,)) + raise unittest.SkipTest( + "Protocol 4+ is required for client ip tracing, currently testing against %r" + % (PROTOCOL_VERSION,)) cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = cluster.connect() + # Make simple query with trace enabled query = "SELECT * FROM system.local" statement = SimpleStatement(query) response_future = session.execute_async(statement, trace=True) - response_future.result(10.0) + response_future.result(timeout=10.0) current_host = response_future._current_host.address + # Fetch the client_ip from the trace. - trace = response_future.get_query_trace(2.0) + trace = response_future.get_query_trace(max_wait=2.0) client_ip = trace.client - # Ensure that ip is set for c* >3 - self.assertIsNotNone(client_ip,"Client IP was not set in trace with C* > 3.0") - self.assertEqual(client_ip,current_host,"Client IP from trace did not match the expected value") + + # Ensure that ip is set for c* >2.2 + self.assertIsNotNone(client_ip, "Client IP was not set in trace with C* > 2.2") + self.assertEqual(client_ip, current_host, "Client IP from trace did not match the expected value") + cluster.shutdown() @@ -547,3 +545,4 @@ class BatchStatementDefaultRoutingKeyTests(unittest.TestCase): self.assertIsNotNone(batch.routing_key) self.assertEqual(batch.routing_key, self.prepared.bind((1, 0)).routing_key) + diff --git a/tox.ini b/tox.ini index 6c4aa31a..d694e7ab 100644 --- a/tox.ini +++ b/tox.ini @@ -10,8 +10,8 @@ deps = nose [testenv] deps = {[base]deps} + sure==1.2.3 blist - sure setenv = USE_CASS_EXTERNAL=1 commands = {envpython} setup.py build_ext --inplace nosetests --verbosity=2 tests/unit/