diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 6b7376e3..5aa52f9e 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -114,6 +114,7 @@ class LibevConnection(Connection): self._callbacks = {} self._push_watchers = defaultdict(set) self.deque = deque() + self._deque_lock = Lock() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.ssl_options: @@ -149,10 +150,11 @@ class LibevConnection(Connection): self.is_closed = True log.debug("Closing connection (%s) to %s", id(self), self.host) - if self._read_watcher: - self._read_watcher.stop() - if self._write_watcher: - self._write_watcher.stop() + with _loop_lock: + if self._read_watcher: + self._read_watcher.stop() + if self._write_watcher: + self._write_watcher.stop() self._socket.close() with _loop_lock: _loop_notifier.send() @@ -194,28 +196,49 @@ class LibevConnection(Connection): id(self), self.host, exc_info=True) def handle_write(self, watcher, revents): - try: - next_msg = self.deque.popleft() - except IndexError: - self._write_watcher.stop() + if revents & libev.EV_ERROR: + self.defunct(Exception("lbev reported an error")) return - try: - sent = self._socket.send(next_msg) - except socket.error as err: - if (err.args[0] in NONBLOCKING): - self.deque.appendleft(next_msg) + while True: + try: + with self._deque_lock: + next_msg = self.deque.popleft() + except IndexError: + with self._deque_lock: + if not self.deque: + with _loop_lock: + if self._write_watcher.is_active(): + self._write_watcher.stop() + return + + try: + sent = self._socket.send(next_msg) + except socket.error as err: + if (err.args[0] in NONBLOCKING): + with self._deque_lock: + self.deque.appendleft(next_msg) + _loop_notifier.send() + else: + self.defunct(err) + return else: - self.defunct(err) - return - else: - if sent < len(next_msg): - self.deque.appendleft(next_msg[sent:]) - - if not self.deque: - self._write_watcher.stop() + if sent < len(next_msg): + with self._deque_lock: + self.deque.appendleft(next_msg[sent:]) + _loop_notifier.send() + elif not self.deque: + with self._deque_lock: + if not self.deque: + with _loop_lock: + if self._write_watcher.is_active(): + self._write_watcher.stop() + return def handle_read(self, watcher, revents): + if revents & libev.EV_ERROR: + self.defunct(Exception("lbev reported an error")) + return try: while True: buf = self._socket.recv(self.in_buffer_size) @@ -280,13 +303,13 @@ class LibevConnection(Connection): else: chunks = [data] - with self.lock: + with self._deque_lock: self.deque.extend(chunks) + with _loop_lock: if not self._write_watcher.is_active(): - with _loop_lock: - self._write_watcher.start() - _loop_notifier.send() + self._write_watcher.start() + _loop_notifier.send() def send_msg(self, msg, cb, wait_for_id=False): if self.is_defunct: