From 43e4ebf40a8046610b274c2b1a1bfaf05870888d Mon Sep 17 00:00:00 2001 From: Colin Stolley Date: Fri, 6 Jun 2014 23:57:04 -0500 Subject: [PATCH 01/10] Added twisted-based event loop This should address PYTHON-8. - added unit tests - import twistedreactor in cluster if libev is not available - fixed unittest import and added copyright notice --- cassandra/cluster.py | 7 +- cassandra/io/twistedreactor.py | 279 +++++++++++++++++++++++++++ tests/unit/io/test_twistedreactor.py | 199 +++++++++++++++++++ 3 files changed, 483 insertions(+), 2 deletions(-) create mode 100644 cassandra/io/twistedreactor.py create mode 100644 tests/unit/io/test_twistedreactor.py diff --git a/cassandra/cluster.py b/cassandra/cluster.py index bc5f0814..0c6a94ee 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -66,14 +66,17 @@ from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement, named_tuple_factory, dict_factory) # default to gevent when we are monkey patched, otherwise if libev is available, use that as the -# default because it's faster than asyncore +# default because it's fastest. Try twisted otherwise, then fallback to asyncore. if 'gevent.monkey' in sys.modules: from cassandra.io.geventreactor import GeventConnection as DefaultConnection else: try: from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA except ImportError: - from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA + try: + from cassandra.io.twistedreactor import TwistedConnection as DefaultConnection # NOQA + except ImportError: + from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA # Forces load of utf8 encoding module to avoid deadlock that occurs # if code that is being imported tries to import the module in a seperate diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py new file mode 100644 index 00000000..442200c9 --- /dev/null +++ b/cassandra/io/twistedreactor.py @@ -0,0 +1,279 @@ +# Copyright 2013-2014 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Module that implements an event loop based on twisted +( https://twistedmatrix.com ). +""" +from twisted.internet import reactor, protocol +from threading import Event, Thread +from functools import partial +import logging +import weakref +import atexit +import os + +from six import BytesIO + +from cassandra import OperationTimedOut +from cassandra.connection import Connection, ConnectionShutdown +from cassandra.protocol import RegisterMessage +from cassandra.marshal import int32_unpack + + +log = logging.getLogger(__name__) + + +def _cleanup(cleanup_weakref): + try: + cleanup_weakref() + except ReferenceError: + return + + +class TwistedConnectionProtocol(protocol.Protocol): + """ + Twisted Protocol class for handling data received and connection + made events. + """ + def dataReceived(self, data): + """ + Callback function that is called when data has been received + on the connection. + + Reaches back to the Connection object and queues the data for + processing. + """ + self.transport.connector.factory.conn._iobuf.write(data) + self.transport.connector.factory.conn.handle_read() + + def connectionMade(self): + """ + Callback function that is called when a connection has succeeded. + + Reaches back to the Connection object and confirms that the connection + is ready. + """ + self.transport.connector.factory.conn.client_connection_made() + + +class TwistedConnectionClientFactory(protocol.ClientFactory): + def __init__(self, connection): + # ClientFactory does not define __init__() in parent classes + # and does not inherit from object. + self.conn = connection + + def buildProtocol(self, addr): + """ + Twisted function that defines which kind of protocol to use + in the ClientFactory. + """ + return TwistedConnectionProtocol() + + def clientConnectionFailed(self, connector, reason): + """ + Overridden twisted callback which is called when the + connection attempt fails. + """ + log.debug("Connect failed: %s", reason) + self.conn.close() + + def clientConnectionLost(self, connector, reason): + """ + Overridden twisted callback which is called when the + connection goes away (cleanly or otherwise). + """ + log.debug("Connect lost: %s", reason) + self.conn.close() + + +class TwistedConnection(Connection): + """ + An implementation of :class:`.Connection` that utilizes the + Twisted event loop. + """ + + _total_reqd_bytes = 0 + + @classmethod + def factory(cls, *args, **kwargs): + """ + A factory function which returns connections which have + succeeded in connecting and are ready for service (or + raises an exception otherwise). + """ + timeout = kwargs.pop('timeout', 5.0) + conn = cls(*args, **kwargs) + conn.connected_event.wait(timeout) + if conn.last_error: + raise conn.last_error + elif not conn.connected_event.is_set(): + conn.close() + raise OperationTimedOut("Timed out creating connection") + else: + return conn + + def __init__(self, *args, **kwargs): + """ + Initialization method. + + Note that we can't call reactor methods directly here because + it's not thread-safe, so we schedule the reactor/connection + stuff to be run from the event loop thread when it gets the + chance. + """ + Connection.__init__(self, *args, **kwargs) + + self.connected_event = Event() + self._iobuf = BytesIO() + self._thread = None + self.is_closed = True + self.connector = None + + self._callbacks = {} + reactor.callFromThread(self.add_connection) + + if not reactor.running: # XXX: might want a lock here? + self._thread = Thread(target=reactor.run, + name="cassandra_driver_event_loop", + kwargs={'installSignalHandlers': False}) + self._thread.daemon = True + self._thread.start() + atexit.register(partial(_cleanup, weakref.ref(self))) + + def add_connection(self): + """ + Convenience function to connect and store the resulting + connector. + """ + self.connector = reactor.connectTCP( + host=self.host, port=self.port, + factory=TwistedConnectionClientFactory(self)) + + def client_connection_made(self): + """ + Called by twisted protocol when a connection attempt has + succeeded. + """ + with self.lock: + self.is_closed = False + self._send_options_message() + + def close(self): + """ + Disconnect and error-out all callbacks. + """ + with self.lock: + if self.is_closed: + return + self.is_closed = True + + log.debug("Closing connection (%s) to %s", id(self), self.host) + self.connector.disconnect() + log.debug("Closed socket to %s", self.host) + + if not self.is_defunct: + self.error_all_callbacks( + ConnectionShutdown("Connection to %s was closed" % self.host)) + # don't leave in-progress operations hanging + self.connected_event.set() + + def handle_close(self): + """ + Not used, but kept for consistency with other reactors. + """ + self.close() + + def handle_write(self): + """ + This function is not needed, so if it is called, blow up. + """ + raise RuntimeError("handle_write() should not be called" + "in TwistedConnection") + + def handle_read(self): + """ + Process the incoming data buffer. + """ + while True: + pos = self._iobuf.tell() + if pos < 8 or (self._total_reqd_bytes > 0 and + pos < self._total_reqd_bytes): + # we don't have a complete header yet or we + # already saw a header, but we don't have a + # complete message yet + return + else: + # have enough for header, read body len from header + self._iobuf.seek(4) + body_len = int32_unpack(self._iobuf.read(4)) + + # seek to end to get length of current buffer + self._iobuf.seek(0, os.SEEK_END) + pos = self._iobuf.tell() + + if pos >= body_len + 8: + # read message header and body + self._iobuf.seek(0) + msg = self._iobuf.read(8 + body_len) + + # leave leftover in current buffer + leftover = self._iobuf.read() + self._iobuf = BytesIO() + self._iobuf.write(leftover) + + self._total_reqd_bytes = 0 + self.process_msg(msg, body_len) + else: + self._total_reqd_bytes = body_len + 8 + return + + def push(self, data): + """ + This function is called when outgoing data should be queued + for sending. + + Note that we can't call transport.write() directly because + it is not thread-safe, so we schedule it to run from within + the event loop when it gets the chance. + """ + reactor.callFromThread(self.connector.transport.write, data) + + def _cleanup(self): + if self._thread: + reactor.callFromThread(reactor.stop) + self._thread.join(timeout=1.0) + if self._thread.is_alive(): + log.warning("Event loop thread could not be joined, so " + "shutdown may not be clean. Please call " + "Cluster.shutdown() to avoid this.") + log.debug("Event loop thread was joined") + + def register_watcher(self, event_type, callback, register_timeout=None): + """ + Register a callback for a given event type. + """ + self._push_watchers[event_type].add(callback) + self.wait_for_response( + RegisterMessage(event_list=[event_type]), + timeout=register_timeout) + + def register_watchers(self, type_callback_dict, register_timeout=None): + """ + Register multiple callback/event type pairs, expressed as a dict. + """ + for event_type, callback in type_callback_dict.items(): + self._push_watchers[event_type].add(callback) + self.wait_for_response( + RegisterMessage(event_list=type_callback_dict.keys()), + timeout=register_timeout) diff --git a/tests/unit/io/test_twistedreactor.py b/tests/unit/io/test_twistedreactor.py new file mode 100644 index 00000000..6772a35a --- /dev/null +++ b/tests/unit/io/test_twistedreactor.py @@ -0,0 +1,199 @@ +# Copyright 2013-2014 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + import unittest2 as unittest +except ImportError: + import unittest +from mock import Mock, patch + +try: + from twisted.test import proto_helpers + from cassandra.io import twistedreactor +except ImportError: + twistedreactor = None + + +class TestTwistedProtocol(unittest.TestCase): + def setUp(self): + if twistedreactor is None: + raise unittest.SkipTest("Twisted libraries not available") + self.tr = proto_helpers.StringTransportWithDisconnection() + self.tr.connector = Mock() + self.mock_connection = Mock() + self.tr.connector.factory = twistedreactor.TwistedConnectionClientFactory( + self.mock_connection) + self.obj_ut = twistedreactor.TwistedConnectionProtocol() + self.tr.protocol = self.obj_ut + + def tearDown(self): + pass + + def test_makeConnection(self): + """ + Verify that the protocol class notifies the connection + object that a successful connection was made. + """ + self.obj_ut.makeConnection(self.tr) + self.assertTrue(self.mock_connection.client_connection_made.called) + + def test_receiving_data(self): + """ + Verify that the dataReceived() callback writes the data to + the connection object's buffer and calls handle_read(). + """ + self.obj_ut.makeConnection(self.tr) + self.obj_ut.dataReceived('foobar') + self.assertTrue(self.mock_connection.handle_read.called) + self.mock_connection._iobuf.write.assert_called_with("foobar") + + +class TestTwistedClientFactory(unittest.TestCase): + def setUp(self): + if twistedreactor is None: + raise unittest.SkipTest("Twisted libraries not available") + self.mock_connection = Mock() + self.obj_ut = twistedreactor.TwistedConnectionClientFactory( + self.mock_connection) + + def test_client_connection_failed(self): + """ + Verify that connection failed causes the connection object to close. + """ + self.obj_ut.clientConnectionFailed(None, 'a test') + self.mock_connection.close.assert_called_with() + + def test_client_connection_lost(self): + """ + Verify that connection lost causes the connection object to close. + """ + self.obj_ut.clientConnectionLost(None, 'a test') + self.mock_connection.close.assert_called_with() + + +class TestTwistedConnection(unittest.TestCase): + def setUp(self): + if twistedreactor is None: + raise unittest.SkipTest("Twisted libraries not available") + self.reactor_cft_patcher = patch( + 'twisted.internet.reactor.callFromThread') + self.reactor_running_patcher = patch( + 'twisted.internet.reactor.running', False) + self.reactor_run_patcher = patch('twisted.internet.reactor.run') + self.mock_reactor_cft = self.reactor_cft_patcher.start() + self.mock_reactor_run = self.reactor_run_patcher.start() + self.obj_ut = twistedreactor.TwistedConnection('1.2.3.4', + cql_version='3.0.1') + + def tearDown(self): + self.reactor_cft_patcher.stop() + self.reactor_run_patcher.stop() + if self.obj_ut._thread: + self.obj_ut._thread.join() + + def test_connection_initialization(self): + """ + Verify that __init__() works correctly. + """ + self.mock_reactor_cft.assert_called_with(self.obj_ut.add_connection) + self.obj_ut._thread.join() # make sure thread exits before checking + self.mock_reactor_run.assert_called_with(installSignalHandlers=False) + + @patch('twisted.internet.reactor.connectTCP') + def test_add_connection(self, mock_connectTCP): + """ + Verify that add_connection() gives us a valid twisted connector. + """ + self.obj_ut.add_connection() + self.assertTrue(self.obj_ut.connector is not None) + self.assertTrue(mock_connectTCP.called) + + def test_client_connection_made(self): + """ + Verifiy that _send_options_message() is called in + client_connection_made() + """ + self.obj_ut._send_options_message = Mock() + self.obj_ut.client_connection_made() + self.obj_ut._send_options_message.assert_called_with() + + @patch('twisted.internet.reactor.connectTCP') + def test_close(self, mock_connectTCP): + """ + Verify that close() disconnects the connector and errors callbacks. + """ + self.obj_ut.error_all_callbacks = Mock() + self.obj_ut.add_connection() + self.obj_ut.is_closed = False + self.obj_ut.close() + self.obj_ut.connector.disconnect.assert_called_with() + self.assertTrue(self.obj_ut.connected_event.is_set()) + self.assertTrue(self.obj_ut.error_all_callbacks.called) + + def test_handle_close(self): + """ + Skipped for now, since it just calls close() and isn't really used. + """ + + def test_handle_write(self): + """ + Verify that this raises an exception if called. + """ + self.assertRaises(RuntimeError, self.obj_ut.handle_write) + + def test_handle_read__incomplete(self): + """ + Verify that handle_read() processes incomplete messages properly. + """ + self.obj_ut.process_msg = Mock() + self.assertEqual(self.obj_ut._iobuf.getvalue(), '') # buf starts empty + # incomplete header + self.obj_ut._iobuf.write('\xff\x00\x00\x00') + self.obj_ut.handle_read() + self.assertEqual(self.obj_ut._iobuf.getvalue(), '\xff\x00\x00\x00') + + # full header, but incomplete body + self.obj_ut._iobuf.write('\x00\x00\x00\x15') + self.obj_ut.handle_read() + self.assertEqual(self.obj_ut._iobuf.getvalue(), + '\xff\x00\x00\x00\x00\x00\x00\x15') + self.assertEqual(self.obj_ut._total_reqd_bytes, 29) + + # verify we never attempted to process the incomplete message + self.assertFalse(self.obj_ut.process_msg.called) + + def test_handle_read__fullmessage(self): + """ + Verify that handle_read() processes complete messages properly. + """ + self.obj_ut.process_msg = Mock() + self.assertEqual(self.obj_ut._iobuf.getvalue(), '') # buf starts empty + + # write a complete message, plus 'NEXT' (to simulate next message) + self.obj_ut._iobuf.write( + '\xff\x00\x00\x00\x00\x00\x00\x15this is the drum rollNEXT') + self.obj_ut.handle_read() + self.assertEqual(self.obj_ut._iobuf.getvalue(), 'NEXT') + self.obj_ut.process_msg.assert_called_with( + '\xff\x00\x00\x00\x00\x00\x00\x15this is the drum roll', 21) + + @patch('twisted.internet.reactor.connectTCP') + def test_push(self, mock_connectTCP): + """ + Verifiy that push() calls transport.write(data). + """ + self.obj_ut.add_connection() + self.obj_ut.push('123 pickup') + self.mock_reactor_cft.assert_called_with( + self.obj_ut.connector.transport.write, '123 pickup') From 9a799c76d9eb2cb03261441623b9902be520cd10 Mon Sep 17 00:00:00 2001 From: Colin Stolley Date: Wed, 18 Jun 2014 13:16:20 -0500 Subject: [PATCH 02/10] Use io.BytesIO instead of six.BytesIO --- cassandra/io/twistedreactor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index 442200c9..6b33f461 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -23,7 +23,7 @@ import weakref import atexit import os -from six import BytesIO +from io import BytesIO from cassandra import OperationTimedOut from cassandra.connection import Connection, ConnectionShutdown From 6bc5f944433e89769e8e4f4bf9afdfbde473b680 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:03:05 -0500 Subject: [PATCH 03/10] Remove unused methods --- cassandra/io/twistedreactor.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index 6b33f461..8821b003 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -188,19 +188,6 @@ class TwistedConnection(Connection): # don't leave in-progress operations hanging self.connected_event.set() - def handle_close(self): - """ - Not used, but kept for consistency with other reactors. - """ - self.close() - - def handle_write(self): - """ - This function is not needed, so if it is called, blow up. - """ - raise RuntimeError("handle_write() should not be called" - "in TwistedConnection") - def handle_read(self): """ Process the incoming data buffer. From 7e4f90c90830dca9f96d0d99750a5a5a3f6df300 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:16:36 -0500 Subject: [PATCH 04/10] Minor refactor of loop state handling --- cassandra/io/twistedreactor.py | 61 ++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index 8821b003..9ab5765d 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -16,7 +16,7 @@ Module that implements an event loop based on twisted ( https://twistedmatrix.com ). """ from twisted.internet import reactor, protocol -from threading import Event, Thread +from threading import Event, Thread, Lock from functools import partial import logging import weakref @@ -36,7 +36,7 @@ log = logging.getLogger(__name__) def _cleanup(cleanup_weakref): try: - cleanup_weakref() + cleanup_weakref()._cleanup() except ReferenceError: return @@ -46,6 +46,7 @@ class TwistedConnectionProtocol(protocol.Protocol): Twisted Protocol class for handling data received and connection made events. """ + def dataReceived(self, data): """ Callback function that is called when data has been received @@ -68,6 +69,7 @@ class TwistedConnectionProtocol(protocol.Protocol): class TwistedConnectionClientFactory(protocol.ClientFactory): + def __init__(self, connection): # ClientFactory does not define __init__() in parent classes # and does not inherit from object. @@ -97,14 +99,49 @@ class TwistedConnectionClientFactory(protocol.ClientFactory): self.conn.close() +class TwistedLoop(object): + + _lock = None + _thread = None + + def __init__(self): + self._lock = Lock() + + def maybe_start(self): + with self._lock: + if not reactor.running: + self._thread = Thread(target=reactor.run, + name="cassandra_driver_event_loop", + kwargs={'installSignalHandlers': False}) + self._thread.daemon = True + self._thread.start() + atexit.register(partial(_cleanup, weakref.ref(self))) + + def _cleanup(self): + if self._thread: + reactor.callFromThread(reactor.stop) + self._thread.join(timeout=1.0) + if self._thread.is_alive(): + log.warning("Event loop thread could not be joined, so " + "shutdown may not be clean. Please call " + "Cluster.shutdown() to avoid this.") + log.debug("Event loop thread was joined") + + class TwistedConnection(Connection): """ An implementation of :class:`.Connection` that utilizes the Twisted event loop. """ + _loop = None _total_reqd_bytes = 0 + @classmethod + def initialize_reactor(cls): + if not cls._loop: + cls._loop = TwistedLoop() + @classmethod def factory(cls, *args, **kwargs): """ @@ -136,20 +173,12 @@ class TwistedConnection(Connection): self.connected_event = Event() self._iobuf = BytesIO() - self._thread = None self.is_closed = True self.connector = None self._callbacks = {} reactor.callFromThread(self.add_connection) - - if not reactor.running: # XXX: might want a lock here? - self._thread = Thread(target=reactor.run, - name="cassandra_driver_event_loop", - kwargs={'installSignalHandlers': False}) - self._thread.daemon = True - self._thread.start() - atexit.register(partial(_cleanup, weakref.ref(self))) + self._loop.maybe_start() def add_connection(self): """ @@ -236,16 +265,6 @@ class TwistedConnection(Connection): """ reactor.callFromThread(self.connector.transport.write, data) - def _cleanup(self): - if self._thread: - reactor.callFromThread(reactor.stop) - self._thread.join(timeout=1.0) - if self._thread.is_alive(): - log.warning("Event loop thread could not be joined, so " - "shutdown may not be clean. Please call " - "Cluster.shutdown() to avoid this.") - log.debug("Event loop thread was joined") - def register_watcher(self, event_type, callback, register_timeout=None): """ Register a callback for a given event type. From 57a47995f5aca652ffe523f39489fe9058245d13 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:44:40 -0500 Subject: [PATCH 05/10] Defunct where appropriate --- cassandra/io/twistedreactor.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index 9ab5765d..9ca401dd 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -67,6 +67,10 @@ class TwistedConnectionProtocol(protocol.Protocol): """ self.transport.connector.factory.conn.client_connection_made() + def connectionLost(self, reason): + # reason is a Failure instance + self.transport.connector.factory.conn.defunct(reason.value) + class TwistedConnectionClientFactory(protocol.ClientFactory): @@ -88,15 +92,21 @@ class TwistedConnectionClientFactory(protocol.ClientFactory): connection attempt fails. """ log.debug("Connect failed: %s", reason) - self.conn.close() + self.conn.defunct(reason.value) def clientConnectionLost(self, connector, reason): """ Overridden twisted callback which is called when the connection goes away (cleanly or otherwise). + + It should be safe to call defunct() here instead of just close, because + we can assume that if the connection was closed cleanly, there are no + callbacks to error out. If this assumption turns out to be false, we + can call close() instead of defunct() when "reason" is an appropriate + type. """ log.debug("Connect lost: %s", reason) - self.conn.close() + self.conn.defunct(reason.value) class TwistedLoop(object): From 83c019ea4b7ef5d4d0886a6efc0560f17456ac54 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:44:55 -0500 Subject: [PATCH 06/10] Add twisted reactor to benchmarking tools --- benchmarks/base.py | 64 +++++++++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 6b719ef8..8355fcf2 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -26,6 +26,7 @@ dirname = os.path.dirname(os.path.abspath(__file__)) sys.path.append(dirname) sys.path.append(os.path.join(dirname, '..')) +import cassandra from cassandra.cluster import Cluster from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.policies import HostDistance @@ -44,39 +45,47 @@ try: except ImportError as exc: pass -KEYSPACE = "testkeyspace" +have_twisted = False +try: + from cassandra.io.twistedreactor import TwistedConnection + have_twisted = True + supported_reactors.append(TwistedConnection) +except ImportError as exc: + log.exception("Error importing twisted") + pass + +KEYSPACE = "testkeyspace" + str(int(time.time())) TABLE = "testtable" def setup(hosts): + log.info("Using 'cassandra' package from %s", cassandra.__path__) cluster = Cluster(hosts) cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) - session = cluster.connect() + try: + session = cluster.connect() - rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") - if KEYSPACE in [row[0] for row in rows]: - log.debug("dropping existing keyspace...") - session.execute("DROP KEYSPACE " + KEYSPACE) + log.debug("Creating keyspace...") + session.execute(""" + CREATE KEYSPACE %s + WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } + """ % KEYSPACE) - log.debug("Creating keyspace...") - session.execute(""" - CREATE KEYSPACE %s - WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' } - """ % KEYSPACE) + log.debug("Setting keyspace...") + session.set_keyspace(KEYSPACE) - log.debug("Setting keyspace...") - session.set_keyspace(KEYSPACE) - - log.debug("Creating table...") - session.execute(""" - CREATE TABLE %s ( - thekey text, - col1 text, - col2 text, - PRIMARY KEY (thekey, col1) - ) - """ % TABLE) + log.debug("Creating table...") + session.execute(""" + CREATE TABLE %s ( + thekey text, + col1 text, + col2 text, + PRIMARY KEY (thekey, col1) + ) + """ % TABLE) + finally: + cluster.shutdown() def teardown(hosts): @@ -84,6 +93,7 @@ def teardown(hosts): cluster.set_core_connections_per_host(HostDistance.LOCAL, 1) session = cluster.connect() session.execute("DROP KEYSPACE " + KEYSPACE) + cluster.shutdown() def benchmark(thread_class): @@ -124,6 +134,7 @@ def benchmark(thread_class): end = time.time() finally: + cluster.shutdown() teardown(options.hosts) total = end - start @@ -164,6 +175,8 @@ def parse_options(): help='only benchmark with asyncore connections') parser.add_option('--libev-only', action='store_true', dest='libev_only', help='only benchmark with libev connections') + parser.add_option('--twisted-only', action='store_true', dest='twisted_only', + help='only benchmark with Twisted connections') parser.add_option('-m', '--metrics', action='store_true', dest='enable_metrics', help='enable and print metrics for operations') parser.add_option('-l', '--log-level', default='info', @@ -184,6 +197,11 @@ def parse_options(): log.error("libev is not available") sys.exit(1) options.supported_reactors = [LibevConnection] + elif options.twisted_only: + if not have_twisted: + log.error("Twisted is not available") + sys.exit(1) + options.supported_reactors = [TwistedConnection] else: options.supported_reactors = supported_reactors if not have_libev: From f58c315fcd9306751ca03025795ba594483afdd2 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:45:32 -0500 Subject: [PATCH 07/10] Prefer asyncore instead of Twisted --- cassandra/cluster.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index c4c32e73..f9d04ad2 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -66,17 +66,14 @@ from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement, named_tuple_factory, dict_factory) # default to gevent when we are monkey patched, otherwise if libev is available, use that as the -# default because it's fastest. Try twisted otherwise, then fallback to asyncore. +# default because it's fastest. Otherwise, use asyncore. if 'gevent.monkey' in sys.modules: from cassandra.io.geventreactor import GeventConnection as DefaultConnection else: try: from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA except ImportError: - try: - from cassandra.io.twistedreactor import TwistedConnection as DefaultConnection # NOQA - except ImportError: - from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA + from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA # Forces load of utf8 encoding module to avoid deadlock that occurs # if code that is being imported tries to import the module in a seperate From 0e620db28a61bf8413b89733e10ffb36e5d78fce Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:47:01 -0500 Subject: [PATCH 08/10] Update docs about available reactors --- cassandra/cluster.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index f9d04ad2..1456ad88 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -309,6 +309,8 @@ class Cluster(object): * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` * :class:`cassandra.io.libevreactor.LibevConnection` + * :class:`cassandra.io.libevreactor.GeventConnection` (requires monkey-patching) + * :class:`cassandra.io.libevreactor.TwistedConnection` By default, ``AsyncoreConnection`` will be used, which uses the ``asyncore`` module in the Python standard library. The @@ -316,6 +318,9 @@ class Cluster(object): supported on a wider range of systems. If ``libev`` is installed, ``LibevConnection`` will be used instead. + + If gevent monkey-patching of the standard library is detected, + GeventConnection will be used automatically. """ control_connection_timeout = 2.0 From 0edfe56d773be0cbfed3e60c58488147ef4a7bac Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:48:31 -0500 Subject: [PATCH 09/10] Update changelog --- CHANGELOG.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d6ab3dc6..17feba9a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,10 +1,13 @@ -2.0.3 +2.1.0 ===== In Progress Features -------- * Use io.BytesIO for reduced CPU consumption (github #143) +* Support Twisted as a reactor. Note that a Twisted-compatible + API is not exposed (so no Deferreds), this is just a reactor + implementation. (github #135, PYTHON-8) Bug Fixes --------- From 2d425af0699a32b4eecc7d80db6d4602593cf57c Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 18 Jun 2014 14:53:37 -0500 Subject: [PATCH 10/10] Update twisted reactor unit tests --- tests/unit/io/test_twistedreactor.py | 33 ++++++++++++---------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/tests/unit/io/test_twistedreactor.py b/tests/unit/io/test_twistedreactor.py index 6772a35a..de22415b 100644 --- a/tests/unit/io/test_twistedreactor.py +++ b/tests/unit/io/test_twistedreactor.py @@ -20,15 +20,18 @@ from mock import Mock, patch try: from twisted.test import proto_helpers + from twisted.python.failure import Failure from cassandra.io import twistedreactor except ImportError: - twistedreactor = None + twistedreactor = None # NOQA class TestTwistedProtocol(unittest.TestCase): + def setUp(self): if twistedreactor is None: raise unittest.SkipTest("Twisted libraries not available") + twistedreactor.TwistedConnection.initialize_reactor() self.tr = proto_helpers.StringTransportWithDisconnection() self.tr.connector = Mock() self.mock_connection = Mock() @@ -63,6 +66,7 @@ class TestTwistedClientFactory(unittest.TestCase): def setUp(self): if twistedreactor is None: raise unittest.SkipTest("Twisted libraries not available") + twistedreactor.TwistedConnection.initialize_reactor() self.mock_connection = Mock() self.obj_ut = twistedreactor.TwistedConnectionClientFactory( self.mock_connection) @@ -71,21 +75,24 @@ class TestTwistedClientFactory(unittest.TestCase): """ Verify that connection failed causes the connection object to close. """ - self.obj_ut.clientConnectionFailed(None, 'a test') - self.mock_connection.close.assert_called_with() + exc = Exception('a test') + self.obj_ut.clientConnectionFailed(None, Failure(exc)) + self.mock_connection.defunct.assert_called_with(exc) def test_client_connection_lost(self): """ Verify that connection lost causes the connection object to close. """ - self.obj_ut.clientConnectionLost(None, 'a test') - self.mock_connection.close.assert_called_with() + exc = Exception('a test') + self.obj_ut.clientConnectionLost(None, Failure(exc)) + self.mock_connection.defunct.assert_called_with(exc) class TestTwistedConnection(unittest.TestCase): def setUp(self): if twistedreactor is None: raise unittest.SkipTest("Twisted libraries not available") + twistedreactor.TwistedConnection.initialize_reactor() self.reactor_cft_patcher = patch( 'twisted.internet.reactor.callFromThread') self.reactor_running_patcher = patch( @@ -99,15 +106,14 @@ class TestTwistedConnection(unittest.TestCase): def tearDown(self): self.reactor_cft_patcher.stop() self.reactor_run_patcher.stop() - if self.obj_ut._thread: - self.obj_ut._thread.join() + self.obj_ut._loop._cleanup() def test_connection_initialization(self): """ Verify that __init__() works correctly. """ self.mock_reactor_cft.assert_called_with(self.obj_ut.add_connection) - self.obj_ut._thread.join() # make sure thread exits before checking + self.obj_ut._loop._cleanup() self.mock_reactor_run.assert_called_with(installSignalHandlers=False) @patch('twisted.internet.reactor.connectTCP') @@ -141,17 +147,6 @@ class TestTwistedConnection(unittest.TestCase): self.assertTrue(self.obj_ut.connected_event.is_set()) self.assertTrue(self.obj_ut.error_all_callbacks.called) - def test_handle_close(self): - """ - Skipped for now, since it just calls close() and isn't really used. - """ - - def test_handle_write(self): - """ - Verify that this raises an exception if called. - """ - self.assertRaises(RuntimeError, self.obj_ut.handle_write) - def test_handle_read__incomplete(self): """ Verify that handle_read() processes incomplete messages properly.