Merge pull request #759 from datastax/python-jenkin-fixes
Python jenkin fixes
This commit is contained in:
@@ -63,4 +63,4 @@ MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
|
||||
|
||||
notwindows = unittest.skipUnless(not "Windows" in platform.system(), "This test is not adequate for windows")
|
||||
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
|
||||
notmonkeypatch = unittest.skipUnless(MONKEY_PATCH_LOOP, "Skpping this test because monkey patching is required")
|
||||
notmonkeypatch = unittest.skipUnless(MONKEY_PATCH_LOOP, "Skipping this test because monkey patching is required")
|
||||
@@ -529,11 +529,17 @@ def setup_keyspace(ipformat=None, wait=True):
|
||||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}'''
|
||||
execute_with_long_wait_retry(session, ddl)
|
||||
|
||||
ddl = '''
|
||||
ddl_3f = '''
|
||||
CREATE TABLE test3rf.test (
|
||||
k int PRIMARY KEY,
|
||||
v int )'''
|
||||
execute_with_long_wait_retry(session, ddl)
|
||||
execute_with_long_wait_retry(session, ddl_3f)
|
||||
|
||||
ddl_1f = '''
|
||||
CREATE TABLE test1rf.test (
|
||||
k int PRIMARY KEY,
|
||||
v int )'''
|
||||
execute_with_long_wait_retry(session, ddl_1f)
|
||||
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
@@ -675,7 +681,7 @@ class BasicSharedKeyspaceUnitTestCase(BasicKeyspaceUnitTestCase):
|
||||
drop_keyspace_shutdown_cluster(cls.ks_name, cls.session, cls.cluster)
|
||||
|
||||
|
||||
class BasicSharedKeyspaceUnitTestCaseWTable(BasicSharedKeyspaceUnitTestCase):
|
||||
class BasicSharedKeyspaceUnitTestCaseRF1(BasicSharedKeyspaceUnitTestCase):
|
||||
"""
|
||||
This is basic unit test case that can be leveraged to scope a keyspace to a specific test class.
|
||||
creates a keyspace named after the testclass with a rf of 1, and a table named after the class
|
||||
@@ -695,16 +701,6 @@ class BasicSharedKeyspaceUnitTestCaseRF2(BasicSharedKeyspaceUnitTestCase):
|
||||
self.common_setup(2)
|
||||
|
||||
|
||||
class BasicSharedKeyspaceUnitTestCaseWTable(BasicSharedKeyspaceUnitTestCase):
|
||||
"""
|
||||
This is basic unit test case that can be leveraged to scope a keyspace to a specific test class.
|
||||
creates a keyspace named after the testc lass with a rf of 2, and a table named after the class
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(self):
|
||||
self.common_setup(3, True, True, True)
|
||||
|
||||
|
||||
class BasicSharedKeyspaceUnitTestCaseRF3(BasicSharedKeyspaceUnitTestCase):
|
||||
"""
|
||||
This is basic unit test case that can be leveraged to scope a keyspace to a specific test class.
|
||||
@@ -715,14 +711,14 @@ class BasicSharedKeyspaceUnitTestCaseRF3(BasicSharedKeyspaceUnitTestCase):
|
||||
self.common_setup(3)
|
||||
|
||||
|
||||
class BasicSharedKeyspaceUnitTestCaseRF3WTable(BasicSharedKeyspaceUnitTestCase):
|
||||
class BasicSharedKeyspaceUnitTestCaseRF3WM(BasicSharedKeyspaceUnitTestCase):
|
||||
"""
|
||||
This is basic unit test case that can be leveraged to scope a keyspace to a specific test class.
|
||||
creates a keyspace named after the test class with a rf of 3 and a table named after the class
|
||||
creates a keyspace named after the test class with a rf of 3 with metrics enabled
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(self):
|
||||
self.common_setup(3, True)
|
||||
self.common_setup(3, True, True, True)
|
||||
|
||||
|
||||
class BasicSharedKeyspaceUnitTestCaseWFunctionTable(BasicSharedKeyspaceUnitTestCase):
|
||||
|
||||
@@ -11,6 +11,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
try:
|
||||
import unittest2 as unittest
|
||||
except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
try:
|
||||
from ccmlib import common
|
||||
except ImportError as e:
|
||||
|
||||
@@ -12,11 +12,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys,logging, traceback, time
|
||||
import sys,logging, traceback, time, re
|
||||
|
||||
from cassandra import (ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,
|
||||
FunctionFailure, ProtocolVersion)
|
||||
from cassandra.cluster import Cluster, NoHostAvailable
|
||||
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from cassandra.query import SimpleStatement
|
||||
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
|
||||
@@ -134,7 +135,7 @@ class ClientExceptionTests(unittest.TestCase):
|
||||
# Ensure all nodes not on the list, but that are currently set to failing are enabled
|
||||
for node in self.nodes_currently_failing:
|
||||
if node not in failing_nodes:
|
||||
node.stop(wait_other_notice=True, gently=False)
|
||||
node.stop(wait_other_notice=True, gently=True)
|
||||
node.start(wait_for_binary_proto=True, wait_other_notice=True)
|
||||
self.nodes_currently_failing.remove(node)
|
||||
|
||||
@@ -324,22 +325,28 @@ class TimeoutTimerTest(unittest.TestCase):
|
||||
"""
|
||||
|
||||
# self.node1, self.node2, self.node3 = get_cluster().nodes.values()
|
||||
self.node1 = get_node(1)
|
||||
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
self.session = self.cluster.connect()
|
||||
|
||||
node1 = ExecutionProfile(
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1'])
|
||||
)
|
||||
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, execution_profiles={EXEC_PROFILE_DEFAULT: node1})
|
||||
self.session = self.cluster.connect(wait_for_all_pools=True)
|
||||
|
||||
self.control_connection_host_number = 1
|
||||
self.node_to_stop = get_node(self.control_connection_host_number)
|
||||
|
||||
ddl = '''
|
||||
CREATE TABLE test3rf.timeout (
|
||||
k int PRIMARY KEY,
|
||||
v int )'''
|
||||
self.session.execute(ddl)
|
||||
self.node1.pause()
|
||||
self.node_to_stop.pause()
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Shutdown cluster and resume node1
|
||||
"""
|
||||
self.node1.resume()
|
||||
self.node_to_stop.resume()
|
||||
self.session.execute("DROP TABLE test3rf.timeout")
|
||||
self.cluster.shutdown()
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ import struct, time, logging, sys, traceback
|
||||
|
||||
from cassandra import ConsistencyLevel, Unavailable, OperationTimedOut, ReadTimeout, ReadFailure, \
|
||||
WriteTimeout, WriteFailure
|
||||
from cassandra.cluster import Cluster, NoHostAvailable, Session
|
||||
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from cassandra.metadata import murmur3
|
||||
from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy,
|
||||
@@ -475,9 +475,19 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
'(k1, k2, i) '
|
||||
'VALUES '
|
||||
'(?, ?, ?)' % table)
|
||||
session.execute(prepared.bind((1, 2, 3)))
|
||||
bound = prepared.bind((1, 2, 3))
|
||||
result = session.execute(bound)
|
||||
self.assertIn(result.response_future.attempted_hosts[0],
|
||||
cluster.metadata.get_replicas(keyspace, bound.routing_key))
|
||||
|
||||
# There could be race condition with querying a node
|
||||
# which doesn't yet have the data so we query one of
|
||||
# the replicas
|
||||
results = session.execute(SimpleStatement('SELECT * FROM %s WHERE k1 = 1 AND k2 = 2' % table,
|
||||
routing_key=bound.routing_key))
|
||||
self.assertIn(results.response_future.attempted_hosts[0],
|
||||
cluster.metadata.get_replicas(keyspace, bound.routing_key))
|
||||
|
||||
results = session.execute('SELECT * FROM %s WHERE k1 = 1 AND k2 = 2' % table)
|
||||
self.assertTrue(results[0].i)
|
||||
|
||||
cluster.shutdown()
|
||||
|
||||
@@ -134,9 +134,10 @@ class SchemaTests(unittest.TestCase):
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=0.001)
|
||||
session = cluster.connect(wait_for_all_pools=True)
|
||||
|
||||
rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}")
|
||||
self.check_and_wait_for_agreement(session, rs, False)
|
||||
rs = session.execute("CREATE TABLE test_schema_disagreement.cf (key int PRIMARY KEY, value int)")
|
||||
rs = session.execute(SimpleStatement("CREATE TABLE test_schema_disagreement.cf (key int PRIMARY KEY, value int)",
|
||||
consistency_level=ConsistencyLevel.ALL))
|
||||
self.check_and_wait_for_agreement(session, rs, False)
|
||||
rs = session.execute("DROP KEYSPACE test_schema_disagreement")
|
||||
self.check_and_wait_for_agreement(session, rs, False)
|
||||
@@ -145,9 +146,10 @@ class SchemaTests(unittest.TestCase):
|
||||
# These should have schema agreement
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=100)
|
||||
session = cluster.connect()
|
||||
rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
rs = session.execute("CREATE KEYSPACE test_schema_disagreement WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}")
|
||||
self.check_and_wait_for_agreement(session, rs, True)
|
||||
rs = session.execute("CREATE TABLE test_schema_disagreement.cf (key int PRIMARY KEY, value int)")
|
||||
rs = session.execute(SimpleStatement("CREATE TABLE test_schema_disagreement.cf (key int PRIMARY KEY, value int)",
|
||||
consistency_level=ConsistencyLevel.ALL))
|
||||
self.check_and_wait_for_agreement(session, rs, True)
|
||||
rs = session.execute("DROP KEYSPACE test_schema_disagreement")
|
||||
self.check_and_wait_for_agreement(session, rs, True)
|
||||
|
||||
@@ -192,13 +192,23 @@ class ClusterTests(unittest.TestCase):
|
||||
|
||||
@test_category connection
|
||||
"""
|
||||
# Test with empty list
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
with self.assertRaises(NoHostAvailable):
|
||||
Session(Cluster(protocol_version=PROTOCOL_VERSION), [])
|
||||
Session(cluster, [])
|
||||
cluster.shutdown()
|
||||
|
||||
# Test with only invalid
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
with self.assertRaises(NoHostAvailable):
|
||||
Session(Cluster(protocol_version=PROTOCOL_VERSION), [Host("1.2.3.4", SimpleConvictionPolicy)])
|
||||
session = Session(Cluster(protocol_version=PROTOCOL_VERSION), [Host(x, SimpleConvictionPolicy) for x in
|
||||
Session(cluster, [Host("1.2.3.4", SimpleConvictionPolicy)])
|
||||
cluster.shutdown()
|
||||
|
||||
# Test with valid and invalid hosts
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
Session(cluster, [Host(x, SimpleConvictionPolicy) for x in
|
||||
("127.0.0.1", "127.0.0.2", "1.2.3.4")])
|
||||
session.shutdown()
|
||||
cluster.shutdown()
|
||||
|
||||
def test_protocol_negotiation(self):
|
||||
"""
|
||||
@@ -282,15 +292,15 @@ class ClusterTests(unittest.TestCase):
|
||||
session = cluster.connect()
|
||||
result = session.execute(
|
||||
"""
|
||||
INSERT INTO test3rf.test (k, v) VALUES (8889, 8889)
|
||||
INSERT INTO test1rf.test (k, v) VALUES (8889, 8889)
|
||||
""")
|
||||
self.assertFalse(result)
|
||||
|
||||
result = session.execute("SELECT * FROM test3rf.test")
|
||||
self.assertEqual([(8889, 8889)], result)
|
||||
result = session.execute("SELECT * FROM test1rf.test")
|
||||
self.assertEqual([(8889, 8889)], result, "Rows in ResultSet are {0}".format(result.current_rows))
|
||||
|
||||
# test_connect_on_keyspace
|
||||
session2 = cluster.connect('test3rf')
|
||||
session2 = cluster.connect('test1rf')
|
||||
result2 = session2.execute("SELECT * FROM test")
|
||||
self.assertEqual(result, result2)
|
||||
cluster.shutdown()
|
||||
|
||||
@@ -17,7 +17,6 @@ try:
|
||||
except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
from nose.tools import nottest
|
||||
from functools import partial
|
||||
from six.moves import range
|
||||
import sys
|
||||
@@ -38,7 +37,6 @@ from tests.integration import use_singledc, PROTOCOL_VERSION, get_node, CASSANDR
|
||||
|
||||
try:
|
||||
from cassandra.io.libevreactor import LibevConnection
|
||||
from cassandra.io.libevreactor import _cleanup as libev__cleanup
|
||||
except ImportError:
|
||||
LibevConnection = None
|
||||
|
||||
@@ -401,45 +399,3 @@ class LibevConnectionTests(ConnectionTests, unittest.TestCase):
|
||||
raise unittest.SkipTest(
|
||||
'libev does not appear to be installed properly')
|
||||
ConnectionTests.setUp(self)
|
||||
|
||||
@nottest
|
||||
def test_watchers_are_finished(self):
|
||||
"""
|
||||
Test for asserting that watchers are closed in LibevConnection
|
||||
|
||||
It will open a connection to the Cluster and then abruptly clean it simulating,
|
||||
a process termination without calling cluster.shutdown(), which would trigger
|
||||
LibevConnection._libevloop._cleanup. Then it will check the watchers have been closed
|
||||
Finally it will restore the LibevConnection reactor so it doesn't affect
|
||||
the rest of the tests
|
||||
|
||||
@since 3.10
|
||||
@jira_ticket PYTHON-747
|
||||
@expected_result the watchers are closed
|
||||
|
||||
@test_category connection
|
||||
"""
|
||||
|
||||
# conn._write_watcher and conn._read_watcher will be closed
|
||||
# when the request is finished so it may not be _cleanup the
|
||||
# one who ends up cleaning them everytime.
|
||||
for _ in range(10):
|
||||
cluster = Cluster(connection_class=LibevConnection)
|
||||
session = cluster.connect(wait_for_all_pools=True)
|
||||
|
||||
session.execute_async("SELECT * FROM system.local LIMIT 1")
|
||||
# We have to make a copy because the connections shouldn't
|
||||
# be alive when we verify them
|
||||
live_connections = set(LibevConnection._libevloop._live_conns)
|
||||
|
||||
# This simulates the process ending without cluster.shutdown()
|
||||
# being called, then with atexit _cleanup for libevreactor would
|
||||
# be called
|
||||
libev__cleanup(weakref.ref(LibevConnection._libevloop))
|
||||
|
||||
for conn in live_connections:
|
||||
for watcher in (conn._write_watcher, conn._read_watcher):
|
||||
self.assertTrue(watcher is None or not watcher.is_active())
|
||||
|
||||
cluster.shutdown()
|
||||
LibevConnection._libevloop = None
|
||||
|
||||
@@ -28,7 +28,7 @@ from cassandra.protocol import SyntaxException
|
||||
from cassandra.cluster import Cluster, NoHostAvailable
|
||||
from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION, execute_until_pass
|
||||
from greplin import scales
|
||||
from tests.integration import BasicSharedKeyspaceUnitTestCaseWTable, BasicExistingKeyspaceUnitTestCase, local
|
||||
from tests.integration import BasicSharedKeyspaceUnitTestCaseRF3WM, BasicExistingKeyspaceUnitTestCase, local
|
||||
|
||||
def setup_module():
|
||||
use_singledc()
|
||||
@@ -146,7 +146,7 @@ class MetricsTests(unittest.TestCase):
|
||||
# Sometimes this commands continues with the other nodes having not noticed
|
||||
# 1 is down, and a Timeout error is returned instead of an Unavailable
|
||||
get_node(1).stop(wait=True, wait_other_notice=True)
|
||||
|
||||
time.sleep(5)
|
||||
try:
|
||||
# Test write
|
||||
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
|
||||
@@ -179,7 +179,7 @@ class MetricsTests(unittest.TestCase):
|
||||
# pass
|
||||
|
||||
|
||||
class MetricsNamespaceTest(BasicSharedKeyspaceUnitTestCaseWTable):
|
||||
class MetricsNamespaceTest(BasicSharedKeyspaceUnitTestCaseRF3WM):
|
||||
@local
|
||||
def test_metrics_per_cluster(self):
|
||||
"""
|
||||
|
||||
@@ -508,7 +508,8 @@ class PreparedStatementArgTest(unittest.TestCase):
|
||||
batch_statement = BatchStatement(consistency_level=ConsistencyLevel.ONE)
|
||||
batch_statement.add(insert_statement, (1, 2))
|
||||
session.execute(batch_statement)
|
||||
select_results = session.execute("SELECT * FROM %s WHERE k = 1" % table)
|
||||
select_results = session.execute(SimpleStatement("SELECT * FROM %s WHERE k = 1" % table,
|
||||
consistency_level=ConsistencyLevel.ALL))
|
||||
first_row = select_results[0][:2]
|
||||
self.assertEqual((1, 2), first_row)
|
||||
|
||||
|
||||
@@ -20,25 +20,24 @@ import errno
|
||||
import math
|
||||
from mock import patch, Mock
|
||||
import os
|
||||
import weakref
|
||||
import six
|
||||
from six import BytesIO
|
||||
from socket import error as socket_error
|
||||
import sys
|
||||
import time
|
||||
|
||||
from cassandra.io.libevreactor import _cleanup as libev__cleanup
|
||||
from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
||||
ConnectionException, ProtocolError)
|
||||
|
||||
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
|
||||
SupportedMessage, ReadyMessage, ServerError)
|
||||
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
|
||||
from tests.unit.io.utils import TimerCallback
|
||||
from tests.unit.io.utils import submit_and_wait_for_completion
|
||||
|
||||
from tests import is_monkey_patched
|
||||
|
||||
|
||||
try:
|
||||
from cassandra.io.libevreactor import LibevConnection
|
||||
from cassandra.io.libevreactor import LibevConnection, LibevLoop
|
||||
except ImportError:
|
||||
LibevConnection = None # noqa
|
||||
|
||||
@@ -296,3 +295,37 @@ class LibevConnectionTest(unittest.TestCase):
|
||||
|
||||
self.assertTrue(c.connected_event.is_set())
|
||||
self.assertFalse(c.is_defunct)
|
||||
|
||||
def test_watchers_are_finished(self, *args):
|
||||
"""
|
||||
Test for asserting that watchers are closed in LibevConnection
|
||||
|
||||
This test simulates a process termination without calling cluster.shutdown(), which would trigger
|
||||
LibevConnection._libevloop._cleanup. It will check the watchers have been closed
|
||||
Finally it will restore the LibevConnection reactor so it doesn't affect
|
||||
the rest of the tests
|
||||
|
||||
@since 3.10
|
||||
@jira_ticket PYTHON-747
|
||||
@expected_result the watchers are closed
|
||||
|
||||
@test_category connection
|
||||
"""
|
||||
with patch.object(LibevConnection._libevloop, "_thread"), \
|
||||
patch.object(LibevConnection._libevloop, "notify"):
|
||||
|
||||
self.make_connection()
|
||||
|
||||
# We have to make a copy because the connections shouldn't
|
||||
# be alive when we verify them
|
||||
live_connections = set(LibevConnection._libevloop._live_conns)
|
||||
|
||||
# This simulates the process ending without cluster.shutdown()
|
||||
# being called, then with atexit _cleanup for libevreactor would
|
||||
# be called
|
||||
libev__cleanup(weakref.ref(LibevConnection._libevloop))
|
||||
for conn in live_connections:
|
||||
for watcher in (conn._write_watcher, conn._read_watcher):
|
||||
self.assertTrue(watcher.stop.mock_calls)
|
||||
|
||||
LibevConnection._libevloop._shutdown = False
|
||||
|
||||
Reference in New Issue
Block a user