Merge remote-tracking branch 'flupke/gevent-ssl' into PYTHON-174
This commit is contained in:
@@ -12,7 +12,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import gevent
|
import gevent
|
||||||
from gevent import select, socket
|
from gevent import select, socket, ssl
|
||||||
from gevent.event import Event
|
from gevent.event import Event
|
||||||
from gevent.queue import Queue
|
from gevent.queue import Queue
|
||||||
|
|
||||||
@@ -77,6 +77,11 @@ class GeventConnection(Connection):
|
|||||||
for (af, socktype, proto, canonname, sockaddr) in addresses:
|
for (af, socktype, proto, canonname, sockaddr) in addresses:
|
||||||
try:
|
try:
|
||||||
self._socket = socket.socket(af, socktype, proto)
|
self._socket = socket.socket(af, socktype, proto)
|
||||||
|
if self.ssl_options:
|
||||||
|
self._socket = ssl.wrap_socket(self._socket, **self.ssl_options)
|
||||||
|
self._ssl_socket = True
|
||||||
|
else:
|
||||||
|
self._ssl_socket = False
|
||||||
self._socket.settimeout(1.0)
|
self._socket.settimeout(1.0)
|
||||||
self._socket.connect(sockaddr)
|
self._socket.connect(sockaddr)
|
||||||
sockerr = None
|
sockerr = None
|
||||||
@@ -90,8 +95,8 @@ class GeventConnection(Connection):
|
|||||||
for args in self.sockopts:
|
for args in self.sockopts:
|
||||||
self._socket.setsockopt(*args)
|
self._socket.setsockopt(*args)
|
||||||
|
|
||||||
self._read_watcher = gevent.spawn(lambda: self.handle_read())
|
self._read_watcher = gevent.spawn(self.handle_read)
|
||||||
self._write_watcher = gevent.spawn(lambda: self.handle_write())
|
self._write_watcher = gevent.spawn(self.handle_write)
|
||||||
self._send_options_message()
|
self._send_options_message()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
@@ -152,6 +157,13 @@ class GeventConnection(Connection):
|
|||||||
try:
|
try:
|
||||||
buf = self._socket.recv(self.in_buffer_size)
|
buf = self._socket.recv(self.in_buffer_size)
|
||||||
self._iobuf.write(buf)
|
self._iobuf.write(buf)
|
||||||
|
if self._ssl_socket:
|
||||||
|
# We need to drain pending data when dealing with a SSL socket
|
||||||
|
data_left = self._socket.pending()
|
||||||
|
while data_left:
|
||||||
|
buf = self._socket.recv(data_left)
|
||||||
|
self._iobuf.write(buf)
|
||||||
|
data_left = self._socket.pending()
|
||||||
except socket.error as err:
|
except socket.error as err:
|
||||||
if not is_timeout(err):
|
if not is_timeout(err):
|
||||||
log.debug("Exception during socket recv for %s: %s", self, err)
|
log.debug("Exception during socket recv for %s: %s", self, err)
|
||||||
|
|||||||
Reference in New Issue
Block a user