Merge branch 'master' into 2.0

Conflicts:
	cassandra/__init__.py
	tests/integration/standard/test_connection.py
This commit is contained in:
Tyler Hobbs
2014-04-22 18:02:04 -05:00
8 changed files with 94 additions and 25 deletions

View File

@@ -1,7 +1,28 @@
1.1.0
1.1.2
=====
In Progress
Bug Fixes
---------
* Update token metadata (for TokenAware calculations) when a node is removed
from the ring
* Fix file handle leak with gevent reactor due to blocking Greenlet kills when
closing excess connections
* Avoid handling a node coming up multiple times due to a reconnection attempt
succeeding close to the same time that an UP notification is pushed
1.1.1
=====
April 16, 2014
Bug Fixes
---------
* Fix unconditional import of nose in setup.py (github #111)
1.1.0
=====
April 16, 2014
Features
--------
* Gevent is now supported through monkey-patching the stdlib (PYTHON-7,

View File

@@ -49,7 +49,7 @@ you can use `freenode's web-based client <http://webchat.freenode.net/?channels=
Features to be Added
--------------------
* C extension for encoding/decoding messages
* Twisted, gevent support
* Twisted support
* Python 3 support
* IPv6 Support

View File

@@ -100,7 +100,7 @@ def benchmark(thread_class):
time.sleep(2.0)
query = session.prepare("""
INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?))
INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)
""".format(table=TABLE))
values = ('key', 'a', 'b')

View File

@@ -604,7 +604,7 @@ class Cluster(object):
host._handle_node_up_condition.acquire()
while host._currently_handling_node_up:
host._handle_node_up_condition.wait()
host.handling_up_down = True
host._currently_handling_node_up = True
host._handle_node_up_condition.release()
if host.is_up:
@@ -770,6 +770,7 @@ class Cluster(object):
session.on_remove(host)
for listener in self.listeners:
listener.on_remove(host)
self.control_connection.on_remove(host)
def signal_connection_failure(self, host, connection_exc, is_host_addition):
is_down = host.signal_connection_failure(connection_exc)
@@ -1716,10 +1717,10 @@ class ControlConnection(object):
if change_type == "UP":
if host is None:
# this is the first time we've seen the node
self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map)
self._cluster.scheduler.schedule(2, self.refresh_node_list_and_token_map)
else:
# this will be run by the scheduler
self._cluster.scheduler.schedule(1, self._cluster.on_up, host)
self._cluster.scheduler.schedule(2, self._cluster.on_up, host)
elif change_type == "DOWN":
# Note that there is a slight risk we can receive the event late and thus
# mark the host down even though we already had reconnected successfully.

View File

@@ -98,9 +98,9 @@ class GeventConnection(Connection):
log.debug("Closing connection (%s) to %s" % (id(self), self.host))
if self._read_watcher:
self._read_watcher.kill()
self._read_watcher.kill(block=False)
if self._write_watcher:
self._write_watcher.kill()
self._write_watcher.kill(block=False)
if self._socket:
self._socket.close()
log.debug("Closed socket to %s" % (self.host,))
@@ -122,8 +122,9 @@ class GeventConnection(Connection):
next_msg = self._write_queue.get()
run_select()
except Exception as exc:
log.debug("Exception during write select() for %s: %s", self, exc)
self.defunct(exc)
if not self.is_closed:
log.debug("Exception during write select() for %s: %s", self, exc)
self.defunct(exc)
return
try:
@@ -139,8 +140,9 @@ class GeventConnection(Connection):
try:
run_select()
except Exception as exc:
log.debug("Exception during read select() for %s: %s", self, exc)
self.defunct(exc)
if not self.is_closed:
log.debug("Exception during read select() for %s: %s", self, exc)
self.defunct(exc)
return
try:

View File

@@ -17,9 +17,11 @@ import sys
import ez_setup
ez_setup.use_setuptools()
run_gevent_nosetests = False
if __name__ == '__main__' and sys.argv[1] == "gevent_nosetests":
from gevent.monkey import patch_all
patch_all()
run_gevent_nosetests = True
from setuptools import setup
from distutils.command.build_ext import build_ext
@@ -39,8 +41,6 @@ try:
except ImportError:
has_subprocess = False
from nose.commands import nosetests
from cassandra import __version__
long_description = ""
@@ -48,8 +48,12 @@ with open("README.rst") as f:
long_description = f.read()
class gevent_nosetests(nosetests):
description = "run nosetests with gevent monkey patching"
gevent_nosetests = None
if run_gevent_nosetests:
from nose.commands import nosetests
class gevent_nosetests(nosetests):
description = "run nosetests with gevent monkey patching"
class DocCommand(Command):
@@ -169,7 +173,10 @@ On OSX, via homebrew:
def run_setup(extensions):
kw = {'cmdclass': {'doc': DocCommand, 'gevent_nosetests': gevent_nosetests}}
kw = {'cmdclass': {'doc': DocCommand}}
if gevent_nosetests is not None:
kw['cmdclass']['gevent_nosetests'] = gevent_nosetests
if extensions:
kw['cmdclass']['build_ext'] = build_extensions
kw['ext_modules'] = extensions

View File

@@ -367,14 +367,29 @@ class LoadBalancingPolicyTests(unittest.TestCase):
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
stop(2)
wait_for_down(cluster, 2, wait=True)
try:
self._query(session, keyspace, use_prepared=use_prepared)
self.fail()
except Unavailable:
pass
self.coordinator_stats.reset_counts()
start(2)
wait_for_up(cluster, 2, wait=True)
decommission(2)
wait_for_down(cluster, 2, wait=True)
self._query(session, keyspace, use_prepared=use_prepared)
self.coordinator_stats.assert_query_count_equals(self, 1, 6)
results = set([
self.coordinator_stats.get_query_count(1),
self.coordinator_stats.get_query_count(3)
])
self.assertEqual(results, set([0, 12]))
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
def test_token_aware_composite_key(self):
use_singledc()

View File

@@ -23,7 +23,8 @@ from functools import partial
import sys
from threading import Thread, Event
from cassandra import ConsistencyLevel
from cassandra import ConsistencyLevel, OperationTimedOut
from cassandra.cluster import NoHostAvailable
from cassandra.decoder import QueryMessage
from cassandra.io.asyncorereactor import AsyncoreConnection
@@ -37,11 +38,33 @@ class ConnectionTest(object):
klass = None
def get_connection(self):
"""
Helper method to solve automated testing issues within Jenkins.
Officially patched under the 2.0 branch through
17998ef72a2fe2e67d27dd602b6ced33a58ad8ef, but left as is for the
1.0 branch due to possible regressions for fixing an
automated testing edge-case.
"""
conn = None
e = None
for i in xrange(5):
try:
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
break
except (OperationTimedOut, NoHostAvailable) as e:
continue
if conn:
return conn
else:
raise e
def test_single_connection(self):
"""
Test a single connection with sequential requests.
"""
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
event = Event()
@@ -64,7 +87,7 @@ class ConnectionTest(object):
"""
Test a single connection with pipelined requests.
"""
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
responses = [False] * 100
event = Event()
@@ -86,7 +109,7 @@ class ConnectionTest(object):
"""
Test multiple connections with pipelined requests.
"""
conns = [self.klass.factory(protocol_version=PROTOCOL_VERSION) for i in range(5)]
conns = [self.get_connection() for i in range(5)]
events = [Event() for i in range(5)]
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
@@ -117,7 +140,7 @@ class ConnectionTest(object):
num_threads = 5
event = Event()
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
def cb(all_responses, thread_responses, request_num, *args, **kwargs):
@@ -174,7 +197,7 @@ class ConnectionTest(object):
threads = []
for i in range(num_conns):
conn = self.klass.factory(protocol_version=PROTOCOL_VERSION)
conn = self.get_connection()
t = Thread(target=send_msgs, args=(conn, events[i]))
threads.append(t)