From 36d08e776444cf0e3ee450cc775c3276e4041099 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 19 Sep 2013 13:34:58 -0500 Subject: [PATCH] Add 1s timeout for initial connection This prevents hanging for unreasonable amounts of time when a connection cannot be made to the host. --- cassandra/io/asyncorereactor.py | 29 +++++++++++++++++++++++++++-- cassandra/io/libevreactor.py | 1 + 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 1ee42904..60fa2484 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -1,16 +1,18 @@ from collections import defaultdict, deque from functools import partial import logging +import os import socket import sys from threading import Event, Lock, Thread import traceback from Queue import Queue +from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL, EISCONN, errorcode import asyncore from cassandra.connection import (Connection, ResponseWaiter, ConnectionShutdown, - ConnectionBusy, NONBLOCKING) + ConnectionBusy, ConnectionException, NONBLOCKING) from cassandra.decoder import RegisterMessage from cassandra.marshal import int32_unpack @@ -96,7 +98,6 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): with _starting_conns_lock: _starting_conns.add(self) - log.debug("Opening socket to %s", self.host) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((self.host, self.port)) @@ -110,6 +111,30 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): # start the global event loop if needed _start_loop() + def create_socket(self, family, type): + # copied from asyncore, but with the line to set the socket in + # non-blocking mode removed (we will do that after connecting) + self.family_and_type = family, type + sock = socket.socket(family, type) + self.set_socket(sock) + + def connect(self, address): + # this is copied directly from asyncore.py, except that + # a timeout is set before connecting + self.connected = False + self.connecting = True + self.socket.settimeout(1.0) + err = self.socket.connect_ex(address) + if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ + or err == EINVAL and os.name in ('nt', 'ce'): + raise ConnectionException("Timed out connecting to %s" % (address[0])) + if err in (0, EISCONN): + self.addr = address + self.setblocking(0) + self.handle_connect_event() + else: + raise socket.error(err, errorcode[err]) + def close(self): with self.lock: if self.is_closed: diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index a097b40b..a5325f74 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -104,6 +104,7 @@ class LibevConnection(Connection): self.deque = deque() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(1.0) # TODO potentially make this value configurable self._socket.connect((self.host, self.port)) self._socket.setblocking(0)