From 9ea4987d8c91151d5962c872fbea840014d6fb9a Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 28 Jun 2013 15:39:28 -0500 Subject: [PATCH] Update unit tests for Connection inheritance --- cassandra/connection.py | 2 +- tests/unit/io/__init__.py | 0 tests/unit/io/test_pyevreactor.py | 226 ++++++++++++++++++++++++++++ tests/unit/test_connection.py | 237 +++++------------------------- 4 files changed, 265 insertions(+), 200 deletions(-) create mode 100644 tests/unit/io/__init__.py create mode 100644 tests/unit/io/test_pyevreactor.py diff --git a/cassandra/connection.py b/cassandra/connection.py index c9f34b95..b3b34b09 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -106,7 +106,7 @@ class Connection(object): def close(self): raise NotImplementedError() - def defunct(self): + def defunct(self, exc): raise NotImplementedError() def send_msg(self, msg, cb): diff --git a/tests/unit/io/__init__.py b/tests/unit/io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/io/test_pyevreactor.py b/tests/unit/io/test_pyevreactor.py new file mode 100644 index 00000000..e1052cbd --- /dev/null +++ b/tests/unit/io/test_pyevreactor.py @@ -0,0 +1,226 @@ +import errno +from StringIO import StringIO +from socket import error as socket_error +import unittest + +from mock import patch, Mock + +from cassandra.connection import (PROTOCOL_VERSION, + HEADER_DIRECTION_TO_CLIENT, + ProtocolError, + ConnectionException) +from cassandra.io.pyevreactor import PyevConnection +from cassandra.decoder import (write_stringmultimap, write_int, write_string, + SupportedMessage, ReadyMessage, ServerError) +from cassandra.marshal import uint8_pack, uint32_pack + +@patch('socket.socket') +@patch('pyev.Io') +@patch('cassandra.io.asyncorereactor._start_loop') +class PyevConnectionTest(unittest.TestCase): + + def make_connection(self): + c = PyevConnection('1.2.3.4') + c._socket = Mock() + c._socket.send.side_effect = lambda x: len(x) + return c + + def make_header_prefix(self, message_class, version=PROTOCOL_VERSION, stream_id=0): + return ''.join(map(uint8_pack, [ + 0xff & (HEADER_DIRECTION_TO_CLIENT | version), + 0, # flags (compression) + stream_id, + message_class.opcode # opcode + ])) + + def make_options_body(self): + options_buf = StringIO() + write_stringmultimap(options_buf, { + 'CQL_VERSION': ['3.0.1'], + 'COMPRESSION': [] + }) + return options_buf.getvalue() + + def make_error_body(self, code, msg): + buf = StringIO() + write_int(buf, code) + write_string(buf, msg) + return buf.getvalue() + + def make_msg(self, header, body=""): + return header + uint32_pack(len(body)) + body + + def test_successful_connection(self, *args): + c = self.make_connection() + + # let it write the OptionsMessage + c.handle_write(None, None) + + # read in a SupportedMessage response + header = self.make_header_prefix(SupportedMessage) + options = self.make_options_body() + c._socket.recv.return_value = self.make_msg(header, options) + c.handle_read(None, None) + + # let it write out a StartupMessage + c.handle_write(None, None) + + header = self.make_header_prefix(ReadyMessage, stream_id=1) + c._socket.recv.return_value = self.make_msg(header) + c.handle_read(None, None) + + self.assertTrue(c.connected_event.is_set()) + + def test_protocol_error(self, *args): + c = self.make_connection() + + # let it write the OptionsMessage + c.handle_write(None, None) + + # read in a SupportedMessage response + header = self.make_header_prefix(SupportedMessage, version=0x04) + options = self.make_options_body() + c._socket.recv.return_value = self.make_msg(header, options) + c.handle_read(None, None) + + # make sure it errored correctly + self.assertTrue(c.is_defunct) + self.assertTrue(isinstance(c.last_error, ProtocolError)) + self.assertTrue(c.connected_event.is_set()) + + def test_error_message_on_startup(self, *args): + c = self.make_connection() + + # let it write the OptionsMessage + c.handle_write(None, None) + + # read in a SupportedMessage response + header = self.make_header_prefix(SupportedMessage) + options = self.make_options_body() + c._socket.recv.return_value = self.make_msg(header, options) + c.handle_read(None, None) + + # let it write out a StartupMessage + c.handle_write(None, None) + + header = self.make_header_prefix(ServerError, stream_id=1) + body = self.make_error_body(ServerError.error_code, ServerError.summary) + c._socket.recv.return_value = self.make_msg(header, body) + c.handle_read(None, None) + + # make sure it errored correctly + self.assertTrue(c.is_defunct) + self.assertTrue(isinstance(c.last_error, ConnectionException)) + self.assertTrue(c.connected_event.is_set()) + + def test_socket_error_on_write(self, *args): + c = self.make_connection() + + # make the OptionsMessage write fail + c._socket.send.side_effect = socket_error(errno.EIO, "bad stuff!") + c.handle_write(None, None) + + # make sure it errored correctly + self.assertTrue(c.is_defunct) + self.assertTrue(isinstance(c.last_error, socket_error)) + self.assertTrue(c.connected_event.is_set()) + + def test_blocking_on_write(self, *args): + c = self.make_connection() + + # make the OptionsMessage write block + c._socket.send.side_effect = socket_error(errno.EAGAIN, "socket busy") + c.handle_write(None, None) + + self.assertFalse(c.is_defunct) + + # try again with normal behavior + c._socket.send.side_effect = lambda x: len(x) + c.handle_write(None, None) + self.assertFalse(c.is_defunct) + self.assertTrue(c._socket.send.call_args is not None) + + def test_partial_send(self, *args): + c = self.make_connection() + + # only write the first four bytes of the OptionsMessage + c._socket.send.side_effect = None + c._socket.send.return_value = 4 + c.handle_write(None, None) + + orig_msg = c._socket.send.call_args[0][0] + self.assertFalse(c.is_defunct) + + # try again with normal behavior + c._socket.send.side_effect = lambda x: len(x) + c.handle_write(None, None) + self.assertFalse(c.is_defunct) + self.assertEqual(c._socket.send.call_args[0][0], orig_msg[4:]) + + def test_socket_error_on_read(self, *args): + c = self.make_connection() + + # let it write the OptionsMessage + c.handle_write(None, None) + + # read in a SupportedMessage response + c._socket.recv.side_effect = socket_error(errno.EIO, "busy socket") + c.handle_read(None, None) + + # make sure it errored correctly + self.assertTrue(c.is_defunct) + self.assertTrue(isinstance(c.last_error, socket_error)) + self.assertTrue(c.connected_event.is_set()) + + def test_partial_header_read(self, *args): + c = self.make_connection() + + header = self.make_header_prefix(SupportedMessage) + options = self.make_options_body() + message = self.make_msg(header, options) + + # read in the first byte + c._socket.recv.return_value = message[0] + c.handle_read(None, None) + self.assertEquals(c._buf, message[0]) + + c._socket.recv.return_value = message[1:] + c.handle_read(None, None) + self.assertEquals("", c._buf) + + # let it write out a StartupMessage + c.handle_write(None, None) + + header = self.make_header_prefix(ReadyMessage, stream_id=1) + c._socket.recv.return_value = self.make_msg(header) + c.handle_read(None, None) + + self.assertTrue(c.connected_event.is_set()) + self.assertFalse(c.is_defunct) + + def test_partial_message_read(self, *args): + c = self.make_connection() + + header = self.make_header_prefix(SupportedMessage) + options = self.make_options_body() + message = self.make_msg(header, options) + + # read in the first nine bytes + c._socket.recv.return_value = message[:9] + c.handle_read(None, None) + self.assertEquals(c._buf, message[:9]) + + # ... then read in the rest + c._socket.recv.return_value = message[9:] + c.handle_read(None, None) + self.assertEquals("", c._buf) + + # let it write out a StartupMessage + c.handle_write(None, None) + + header = self.make_header_prefix(ReadyMessage, stream_id=1) + c._socket.recv.return_value = self.make_msg(header) + c.handle_read(None, None) + + self.assertTrue(c.connected_event.is_set()) + self.assertFalse(c.is_defunct) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 25018ab5..396aa9dc 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -1,21 +1,18 @@ -import errno from StringIO import StringIO -from socket import error as socket_error import unittest -from mock import patch, Mock +from mock import patch, Mock, ANY from cassandra.connection import (Connection, PROTOCOL_VERSION, HEADER_DIRECTION_TO_CLIENT, - HEADER_DIRECTION_FROM_CLIENT, ProtocolError, - ConnectionException) + HEADER_DIRECTION_FROM_CLIENT, ProtocolError) from cassandra.decoder import (write_stringmultimap, write_int, write_string, - SupportedMessage, ReadyMessage, ServerError) -from cassandra.marshal import uint8_pack, uint32_pack, int32_pack + SupportedMessage) +from cassandra.marshal import uint8_pack, uint32_pack @patch('socket.socket') @patch('pyev.Io') -@patch('cassandra.connection._start_loop') +@patch('cassandra.io.asyncorereactor._start_loop') class ConnectionTest(unittest.TestCase): def make_connection(self): @@ -49,49 +46,28 @@ class ConnectionTest(unittest.TestCase): def make_msg(self, header, body=""): return header + uint32_pack(len(body)) + body - def test_successful_connection(self, *args): - c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) - - # read in a SupportedMessage response - header = self.make_header_prefix(SupportedMessage) - options = self.make_options_body() - c._socket.recv.return_value = self.make_msg(header, options) - c.handle_read(None, None) - - # let it write out a StartupMessage - c.handle_write(None, None) - - header = self.make_header_prefix(ReadyMessage, stream_id=1) - c._socket.recv.return_value = self.make_msg(header) - c.handle_read(None, None) - - self.assertTrue(c.connected_event.is_set()) - def test_bad_protocol_version(self, *args): c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) + c._id_queue.get_nowait() + c._callbacks = Mock() + c.defunct = Mock() # read in a SupportedMessage response header = self.make_header_prefix(SupportedMessage, version=0x04) options = self.make_options_body() - c._socket.recv.return_value = self.make_msg(header, options) - c.handle_read(None, None) + message = self.make_msg(header, options) + c.process_msg(message, len(message) - 8) # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, ProtocolError)) - self.assertTrue(c.connected_event.is_set()) + c.defunct.assert_called_once_with(ANY) + args, kwargs = c.defunct.call_args + self.assertIsInstance(args[0], ProtocolError) def test_bad_header_direction(self, *args): c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) + c._id_queue.get_nowait() + c._callbacks = Mock() + c.defunct = Mock() # read in a SupportedMessage response header = ''.join(map(uint8_pack, [ @@ -101,38 +77,38 @@ class ConnectionTest(unittest.TestCase): SupportedMessage.opcode # opcode ])) options = self.make_options_body() - c._socket.recv.return_value = self.make_msg(header, options) - c.handle_read(None, None) + message = self.make_msg(header, options) + c.process_msg(message, len(message) - 8) # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, ProtocolError)) - self.assertTrue(c.connected_event.is_set()) + c.defunct.assert_called_once_with(ANY) + args, kwargs = c.defunct.call_args + self.assertIsInstance(args[0], ProtocolError) def test_negative_body_length(self, *args): c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) + c._id_queue.get_nowait() + c._callbacks = Mock() + c.defunct = Mock() # read in a SupportedMessage response - header = self.make_header_prefix(SupportedMessage) + header = self.make_header_prefix(SupportedMessage, version=0x04) options = self.make_options_body() - c._socket.recv.return_value = header + int32_pack(-13) + options - c.handle_read(None, None) + message = self.make_msg(header, options) + c.process_msg(message, -13) # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, ProtocolError)) - self.assertTrue(c.connected_event.is_set()) + c.defunct.assert_called_once_with(ANY) + args, kwargs = c.defunct.call_args + self.assertIsInstance(args[0], ProtocolError) def test_unsupported_cql_version(self, *args): c = self.make_connection() + c._id_queue.get_nowait() + c._callbacks = {0: c._handle_options_response} + c.defunct = Mock() c.cql_version = "3.0.3" - # let it write the OptionsMessage - c.handle_write(None, None) - # read in a SupportedMessage response header = self.make_header_prefix(SupportedMessage) @@ -143,147 +119,10 @@ class ConnectionTest(unittest.TestCase): }) options = options_buf.getvalue() - c._socket.recv.return_value = self.make_msg(header, options) - c.handle_read(None, None) - - # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, ProtocolError)) - self.assertTrue(c.connected_event.is_set()) - - def test_error_message_on_startup(self, *args): - c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) - - # read in a SupportedMessage response - header = self.make_header_prefix(SupportedMessage) - options = self.make_options_body() - c._socket.recv.return_value = self.make_msg(header, options) - c.handle_read(None, None) - - # let it write out a StartupMessage - c.handle_write(None, None) - - header = self.make_header_prefix(ServerError, stream_id=1) - body = self.make_error_body(ServerError.error_code, ServerError.summary) - c._socket.recv.return_value = self.make_msg(header, body) - c.handle_read(None, None) - - # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, ConnectionException)) - self.assertTrue(c.connected_event.is_set()) - - def test_socket_error_on_write(self, *args): - c = self.make_connection() - - # make the OptionsMessage write fail - c._socket.send.side_effect = socket_error(errno.EIO, "bad stuff!") - c.handle_write(None, None) - - # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, socket_error)) - self.assertTrue(c.connected_event.is_set()) - - def test_blocking_on_write(self, *args): - c = self.make_connection() - - # make the OptionsMessage write block - c._socket.send.side_effect = socket_error(errno.EAGAIN, "socket busy") - c.handle_write(None, None) - - self.assertFalse(c.is_defunct) - - # try again with normal behavior - c._socket.send.side_effect = lambda x: len(x) - c.handle_write(None, None) - self.assertFalse(c.is_defunct) - self.assertTrue(c._socket.send.call_args is not None) - - def test_partial_send(self, *args): - c = self.make_connection() - - # only write the first four bytes of the OptionsMessage - c._socket.send.side_effect = None - c._socket.send.return_value = 4 - c.handle_write(None, None) - - orig_msg = c._socket.send.call_args[0][0] - self.assertFalse(c.is_defunct) - - # try again with normal behavior - c._socket.send.side_effect = lambda x: len(x) - c.handle_write(None, None) - self.assertFalse(c.is_defunct) - self.assertEqual(c._socket.send.call_args[0][0], orig_msg[4:]) - - def test_socket_error_on_read(self, *args): - c = self.make_connection() - - # let it write the OptionsMessage - c.handle_write(None, None) - - # read in a SupportedMessage response - c._socket.recv.side_effect = socket_error(errno.EIO, "busy socket") - c.handle_read(None, None) - - # make sure it errored correctly - self.assertTrue(c.is_defunct) - self.assertTrue(isinstance(c.last_error, socket_error)) - self.assertTrue(c.connected_event.is_set()) - - def test_partial_header_read(self, *args): - c = self.make_connection() - - header = self.make_header_prefix(SupportedMessage) - options = self.make_options_body() message = self.make_msg(header, options) + c.process_msg(message, len(message) - 8) - # read in the first byte - c._socket.recv.return_value = message[0] - c.handle_read(None, None) - self.assertEquals(c._buf, message[0]) - - c._socket.recv.return_value = message[1:] - c.handle_read(None, None) - self.assertEquals("", c._buf) - - # let it write out a StartupMessage - c.handle_write(None, None) - - header = self.make_header_prefix(ReadyMessage, stream_id=1) - c._socket.recv.return_value = self.make_msg(header) - c.handle_read(None, None) - - self.assertTrue(c.connected_event.is_set()) - self.assertFalse(c.is_defunct) - - def test_partial_message_read(self, *args): - c = self.make_connection() - - header = self.make_header_prefix(SupportedMessage) - options = self.make_options_body() - message = self.make_msg(header, options) - - # read in the first nine bytes - c._socket.recv.return_value = message[:9] - c.handle_read(None, None) - self.assertEquals(c._buf, message[:9]) - - # ... then read in the rest - c._socket.recv.return_value = message[9:] - c.handle_read(None, None) - self.assertEquals("", c._buf) - - # let it write out a StartupMessage - c.handle_write(None, None) - - header = self.make_header_prefix(ReadyMessage, stream_id=1) - c._socket.recv.return_value = self.make_msg(header) - c.handle_read(None, None) - - self.assertTrue(c.connected_event.is_set()) - self.assertFalse(c.is_defunct) + # make sure it errored correctly + c.defunct.assert_called_once_with(ANY) + args, kwargs = c.defunct.call_args + self.assertIsInstance(args[0], ProtocolError)