greenio: pep8, pyflakes

This commit is contained in:
Sergey Shepelev
2012-12-14 17:30:55 +04:00
parent 27713348f7
commit f5fe024979
2 changed files with 103 additions and 83 deletions

View File

@@ -2,6 +2,7 @@ from eventlet.support import get_errno
from eventlet.hubs import trampoline from eventlet.hubs import trampoline
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
import array
import errno import errno
import os import os
import socket import socket
@@ -14,7 +15,7 @@ __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)) CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
CONNECT_SUCCESS = set((0, errno.EISCONN)) CONNECT_SUCCESS = set((0, errno.EISCONN))
if sys.platform[:3]=="win": if sys.platform[:3] == "win":
CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
# Emulate _fileobject class in 3.x implementation # Emulate _fileobject class in 3.x implementation
@@ -25,6 +26,7 @@ except AttributeError:
def _fileobject(sock, *args, **kwargs): def _fileobject(sock, *args, **kwargs):
return _original_socket.makefile(sock, *args, **kwargs) return _original_socket.makefile(sock, *args, **kwargs)
def socket_connect(descriptor, address): def socket_connect(descriptor, address):
""" """
Attempts to connect to the address, returns the descriptor if it succeeds, Attempts to connect to the address, returns the descriptor if it succeeds,
@@ -37,11 +39,13 @@ def socket_connect(descriptor, address):
raise socket.error(err, errno.errorcode[err]) raise socket.error(err, errno.errorcode[err])
return descriptor return descriptor
def socket_checkerr(descriptor): def socket_checkerr(descriptor):
err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err not in CONNECT_SUCCESS: if err not in CONNECT_SUCCESS:
raise socket.error(err, errno.errorcode[err]) raise socket.error(err, errno.errorcode[err])
def socket_accept(descriptor): def socket_accept(descriptor):
""" """
Attempts to accept() on the descriptor, returns a client,address tuple Attempts to accept() on the descriptor, returns a client,address tuple
@@ -56,7 +60,7 @@ def socket_accept(descriptor):
raise raise
if sys.platform[:3]=="win": if sys.platform[:3] == "win":
# winsock sometimes throws ENOTCONN # winsock sometimes throws ENOTCONN
SOCKET_BLOCKING = set((errno.EWOULDBLOCK,)) SOCKET_BLOCKING = set((errno.EWOULDBLOCK,))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)) SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
@@ -174,7 +178,7 @@ class GreenSocket(object):
return return
if time.time() >= end: if time.time() >= end:
raise socket.timeout("timed out") raise socket.timeout("timed out")
trampoline(fd, write=True, timeout=end-time.time(), trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout("timed out")) timeout_exc=socket.timeout("timed out"))
socket_checkerr(fd) socket_checkerr(fd)
@@ -197,7 +201,7 @@ class GreenSocket(object):
return 0 return 0
if time.time() >= end: if time.time() >= end:
raise socket.timeout(errno.EAGAIN) raise socket.timeout(errno.EAGAIN)
trampoline(fd, write=True, timeout=end-time.time(), trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout(errno.EAGAIN)) timeout_exc=socket.timeout(errno.EAGAIN))
socket_checkerr(fd) socket_checkerr(fd)
except socket.error, ex: except socket.error, ex:
@@ -316,6 +320,7 @@ class GreenSocket(object):
def gettimeout(self): def gettimeout(self):
return self._timeout return self._timeout
class _SocketDuckForFd(object): class _SocketDuckForFd(object):
""" Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls.""" """ Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls."""
def __init__(self, fileno): def __init__(self, fileno):
@@ -348,7 +353,7 @@ class _SocketDuckForFd(object):
if get_errno(e) != errno.EAGAIN: if get_errno(e) != errno.EAGAIN:
raise IOError(*e.args) raise IOError(*e.args)
total_sent = 0 total_sent = 0
while total_sent <len_data: while total_sent < len_data:
trampoline(self, write=True) trampoline(self, write=True)
try: try:
total_sent += os_write(fileno, data[total_sent:]) total_sent += os_write(fileno, data[total_sent:])
@@ -366,9 +371,11 @@ class _SocketDuckForFd(object):
def __repr__(self): def __repr__(self):
return "%s:%d" % (self.__class__.__name__, self._fileno) return "%s:%d" % (self.__class__.__name__, self._fileno)
def _operationOnClosedFile(*args, **kwargs): def _operationOnClosedFile(*args, **kwargs):
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
class GreenPipe(_fileobject): class GreenPipe(_fileobject):
""" """
GreenPipe is a cooperative replacement for file class. GreenPipe is a cooperative replacement for file class.
@@ -404,7 +411,8 @@ class GreenPipe(_fileobject):
self.softspace = 0 self.softspace = 0
@property @property
def name(self): return self._name def name(self):
return self._name
def __repr__(self): def __repr__(self):
return "<%s %s %r, mode %r at 0x%x>" % ( return "<%s %s %r, mode %r at 0x%x>" % (
@@ -412,7 +420,7 @@ class GreenPipe(_fileobject):
self.__class__.__name__, self.__class__.__name__,
self.name, self.name,
self.mode, self.mode,
(id(self) < 0) and (sys.maxint +id(self)) or id(self)) (id(self) < 0) and (sys.maxint + id(self)) or id(self))
def close(self): def close(self):
super(GreenPipe, self).close() super(GreenPipe, self).close()
@@ -432,7 +440,7 @@ class GreenPipe(_fileobject):
return iterator(self) return iterator(self)
def readinto(self, buf): def readinto(self, buf):
data = self.read(len(buf)) #FIXME could it be done without allocating intermediate? data = self.read(len(buf)) # FIXME could it be done without allocating intermediate?
n = len(data) n = len(data)
try: try:
buf[:n] = data buf[:n] = data
@@ -450,7 +458,7 @@ class GreenPipe(_fileobject):
def _clear_readahead_buf(self): def _clear_readahead_buf(self):
len = self._get_readahead_len() len = self._get_readahead_len()
if len>0: if len > 0:
self.read(len) self.read(len)
def tell(self): def tell(self):
@@ -462,7 +470,7 @@ class GreenPipe(_fileobject):
def seek(self, offset, whence=0): def seek(self, offset, whence=0):
self.flush() self.flush()
if whence == 1 and offset==0: # tell synonym if whence == 1 and offset == 0: # tell synonym
return self.tell() return self.tell()
if whence == 1: # adjust offset by what is read ahead if whence == 1: # adjust offset by what is read ahead
offset -= self.get_readahead_len() offset -= self.get_readahead_len()
@@ -477,7 +485,7 @@ class GreenPipe(_fileobject):
if getattr(file, "truncate", None): # not all OSes implement truncate if getattr(file, "truncate", None): # not all OSes implement truncate
def truncate(self, size=-1): def truncate(self, size=-1):
self.flush() self.flush()
if size ==-1: if size == -1:
size = self.tell() size = self.tell()
try: try:
rv = os.ftruncate(self.fileno(), size) rv = os.ftruncate(self.fileno(), size)
@@ -534,4 +542,3 @@ def shutdown_safe(sock):
# this will often be the case in an http server context # this will often be the case in an http server context
if get_errno(e) != errno.ENOTCONN: if get_errno(e) != errno.ENOTCONN:
raise raise

View File

@@ -1,11 +1,9 @@
import socket as _orig_sock import socket as _orig_sock
from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows
from eventlet import event from eventlet import event, greenio, debug
from eventlet import greenio from eventlet.hubs import get_hub
from eventlet import debug from eventlet.green import socket, time
from eventlet.support import get_errno from eventlet.support import get_errno
from eventlet.green import socket
from eventlet.green import time
import errno import errno
import eventlet import eventlet
@@ -14,6 +12,7 @@ import sys
import array import array
import tempfile, shutil import tempfile, shutil
def bufsized(sock, size=1): def bufsized(sock, size=1):
""" Resize both send and receive buffers on a socket. """ Resize both send and receive buffers on a socket.
Useful for testing trampoline. Returns the socket. Useful for testing trampoline. Returns the socket.
@@ -25,6 +24,7 @@ def bufsized(sock, size=1):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
return sock return sock
def min_buf_size(): def min_buf_size():
"""Return the minimum buffer size that the platform supports.""" """Return the minimum buffer size that the platform supports."""
test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -33,7 +33,6 @@ def min_buf_size():
def using_epoll_hub(_f): def using_epoll_hub(_f):
from eventlet.hubs import get_hub
try: try:
return 'epolls' in type(get_hub()).__module__ return 'epolls' in type(get_hub()).__module__
except Exception: except Exception:
@@ -42,7 +41,7 @@ def using_epoll_hub(_f):
class TestGreenSocket(LimitedTestCase): class TestGreenSocket(LimitedTestCase):
def assertWriteToClosedFileRaises(self, fd): def assertWriteToClosedFileRaises(self, fd):
if sys.version_info[0]<3: if sys.version_info[0] < 3:
# 2.x socket._fileobjects are odd: writes don't check # 2.x socket._fileobjects are odd: writes don't check
# whether the socket is closed or not, and you get an # whether the socket is closed or not, and you get an
# AttributeError during flush if it is closed # AttributeError during flush if it is closed
@@ -95,6 +94,7 @@ class TestGreenSocket(LimitedTestCase):
listener.listen(50) listener.listen(50)
evt = event.Event() evt = event.Event()
def server(): def server():
# accept the connection in another greenlet # accept the connection in another greenlet
sock, addr = listener.accept() sock, addr = listener.accept()
@@ -155,6 +155,7 @@ class TestGreenSocket(LimitedTestCase):
listener.listen(50) listener.listen(50)
evt = event.Event() evt = event.Event()
def server(): def server():
# accept the connection in another greenlet # accept the connection in another greenlet
sock, addr = listener.accept() sock, addr = listener.accept()
@@ -183,6 +184,7 @@ class TestGreenSocket(LimitedTestCase):
listener = bufsized(eventlet.listen(('', 0))) listener = bufsized(eventlet.listen(('', 0)))
evt = event.Event() evt = event.Event()
def server(): def server():
# accept the connection in another greenlet # accept the connection in another greenlet
sock, addr = listener.accept() sock, addr = listener.accept()
@@ -197,7 +199,7 @@ class TestGreenSocket(LimitedTestCase):
client.connect(addr) client.connect(addr)
try: try:
client.settimeout(0.00001) client.settimeout(0.00001)
msg = s2b("A")*(100000) # large enough number to overwhelm most buffers msg = s2b("A") * 100000 # large enough number to overwhelm most buffers
total_sent = 0 total_sent = 0
# want to exceed the size of the OS buffer so it'll block in a # want to exceed the size of the OS buffer so it'll block in a
@@ -218,6 +220,7 @@ class TestGreenSocket(LimitedTestCase):
listener.listen(50) listener.listen(50)
evt = event.Event() evt = event.Event()
def server(): def server():
# accept the connection in another greenlet # accept the connection in another greenlet
sock, addr = listener.accept() sock, addr = listener.accept()
@@ -232,7 +235,7 @@ class TestGreenSocket(LimitedTestCase):
client.connect(addr) client.connect(addr)
try: try:
msg = s2b("A")*(8*1024*1024) msg = s2b("A") * (8 << 20)
# want to exceed the size of the OS buffer so it'll block # want to exceed the size of the OS buffer so it'll block
client.sendall(msg) client.sendall(msg)
@@ -284,7 +287,7 @@ class TestGreenSocket(LimitedTestCase):
fd.close() fd.close()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 0)) server.bind(('0.0.0.0', 0))
server.listen(50) server.listen(50)
killer = eventlet.spawn(accept_close_early, server) killer = eventlet.spawn(accept_close_early, server)
@@ -292,7 +295,7 @@ class TestGreenSocket(LimitedTestCase):
killer.wait() killer.wait()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 0)) server.bind(('0.0.0.0', 0))
server.listen(50) server.listen(50)
killer = eventlet.spawn(accept_close_late, server) killer = eventlet.spawn(accept_close_late, server)
@@ -312,8 +315,9 @@ class TestGreenSocket(LimitedTestCase):
self.assertWriteToClosedFileRaises(conn) self.assertWriteToClosedFileRaises(conn)
finally: finally:
listener.close() listener.close()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0)) server.bind(('127.0.0.1', 0))
server.listen(50) server.listen(50)
killer = eventlet.spawn(accept_once, server) killer = eventlet.spawn(accept_once, server)
@@ -329,7 +333,7 @@ class TestGreenSocket(LimitedTestCase):
def test_full_duplex(self): def test_full_duplex(self):
large_data = s2b('*') * 10 * min_buf_size() large_data = s2b('*') * 10 * min_buf_size()
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', 0)) listener.bind(('127.0.0.1', 0))
listener.listen(50) listener.listen(50)
bufsized(listener) bufsized(listener)
@@ -371,16 +375,18 @@ class TestGreenSocket(LimitedTestCase):
# it may legitimately take a while, but will eventually complete # it may legitimately take a while, but will eventually complete
self.timer.cancel() self.timer.cancel()
second_bytes = 10 second_bytes = 10
def test_sendall_impl(many_bytes): def test_sendall_impl(many_bytes):
bufsize = max(many_bytes//15, 2) bufsize = max(many_bytes // 15, 2)
def sender(listener): def sender(listener):
(sock, addr) = listener.accept() (sock, addr) = listener.accept()
sock = bufsized(sock, size=bufsize) sock = bufsized(sock, size=bufsize)
sock.sendall(s2b('x')*many_bytes) sock.sendall(s2b('x') * many_bytes)
sock.sendall(s2b('y')*second_bytes) sock.sendall(s2b('y') * second_bytes)
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(("", 0)) listener.bind(("", 0))
listener.listen(50) listener.listen(50)
sender_coro = eventlet.spawn(sender, listener) sender_coro = eventlet.spawn(sender, listener)
@@ -389,7 +395,7 @@ class TestGreenSocket(LimitedTestCase):
bufsized(client, size=bufsize) bufsized(client, size=bufsize)
total = 0 total = 0
while total < many_bytes: while total < many_bytes:
data = client.recv(min(many_bytes - total, many_bytes//10)) data = client.recv(min(many_bytes - total, many_bytes // 10))
if not data: if not data:
break break
total += len(data) total += len(data)
@@ -414,16 +420,16 @@ class TestGreenSocket(LimitedTestCase):
pass # pre-2.6 pass # pre-2.6
else: else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 0)) sock.bind(('127.0.0.1', 0))
sock.listen(50) sock.listen(50)
ssl_sock = ssl.wrap_socket(sock) ssl.wrap_socket(sock)
def test_timeout_and_final_write(self): def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've # This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch # stopped listening for doesn't result in an incorrect switch
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0)) server.bind(('127.0.0.1', 0))
server.listen(50) server.listen(50)
bound_port = server.getsockname()[1] bound_port = server.getsockname()[1]
@@ -437,7 +443,6 @@ class TestGreenSocket(LimitedTestCase):
s2.close() s2.close()
evt.send('sent via event') evt.send('sent via event')
from eventlet import event
evt = event.Event() evt = event.Event()
eventlet.spawn(sender, evt) eventlet.spawn(sender, evt)
eventlet.sleep(0) # lets the socket enter accept mode, which eventlet.sleep(0) # lets the socket enter accept mode, which
@@ -449,7 +454,7 @@ class TestGreenSocket(LimitedTestCase):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', bound_port)) client.connect(('127.0.0.1', bound_port))
wrap_rfile = client.makefile() wrap_rfile = client.makefile()
_c = wrap_rfile.read(1) wrap_rfile.read(1)
self.fail() self.fail()
except eventlet.TimeoutError: except eventlet.TimeoutError:
pass pass
@@ -467,10 +472,10 @@ class TestGreenSocket(LimitedTestCase):
sock.recv(1) sock.recv(1)
sock.sendall("a") sock.sendall("a")
raise eventlet.StopServe() raise eventlet.StopServe()
listener = eventlet.listen(('127.0.0.1', 0)) listener = eventlet.listen(('127.0.0.1', 0))
server = eventlet.spawn(eventlet.serve, eventlet.spawn(eventlet.serve, listener, handle)
listener,
handle)
def reader(s): def reader(s):
s.recv(1) s.recv(1)
@@ -490,7 +495,7 @@ class TestGreenSocket(LimitedTestCase):
try: try:
sock.sendall('hello world') sock.sendall('hello world')
except socket.error, e: except socket.error, e:
if get_errno(e)== errno.EPIPE: if get_errno(e) == errno.EPIPE:
return return
raise raise
@@ -523,6 +528,7 @@ class TestGreenSocket(LimitedTestCase):
port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1] port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1]
self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port)) self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
class TestGreenPipe(LimitedTestCase): class TestGreenPipe(LimitedTestCase):
@skip_on_windows @skip_on_windows
def setUp(self): def setUp(self):
@@ -534,17 +540,18 @@ class TestGreenPipe(LimitedTestCase):
super(self.__class__, self).tearDown() super(self.__class__, self).tearDown()
def test_pipe(self): def test_pipe(self):
r,w = os.pipe() r, w = os.pipe()
rf = greenio.GreenPipe(r, 'r'); rf = greenio.GreenPipe(r, 'r')
wf = greenio.GreenPipe(w, 'w', 0); wf = greenio.GreenPipe(w, 'w', 0)
def sender(f, content): def sender(f, content):
for ch in content: for ch in content:
eventlet.sleep(0.0001) eventlet.sleep(0.0001)
f.write(ch) f.write(ch)
f.close() f.close()
one_line = "12345\n"; one_line = "12345\n"
eventlet.spawn(sender, wf, one_line*5) eventlet.spawn(sender, wf, one_line * 5)
for i in xrange(5): for i in xrange(5):
line = rf.readline() line = rf.readline()
eventlet.sleep(0.01) eventlet.sleep(0.01)
@@ -587,7 +594,8 @@ class TestGreenPipe(LimitedTestCase):
r = greenio.GreenPipe(r) r = greenio.GreenPipe(r)
w = greenio.GreenPipe(w, 'w') w = greenio.GreenPipe(w, 'w')
large_message = "".join([1024*chr(i) for i in xrange(65)]) large_message = "".join([1024 * chr(i) for i in xrange(65)])
def writer(): def writer():
w.write(large_message) w.write(large_message)
w.close() w.close()
@@ -596,27 +604,27 @@ class TestGreenPipe(LimitedTestCase):
for i in xrange(65): for i in xrange(65):
buf = r.read(1024) buf = r.read(1024)
expected = 1024*chr(i) expected = 1024 * chr(i)
self.assertEquals(buf, expected, self.assertEquals(buf, expected,
"expected=%r..%r, found=%r..%r iter=%d" "expected=%r..%r, found=%r..%r iter=%d"
% (expected[:4], expected[-4:], buf[:4], buf[-4:], i)) % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
gt.wait() gt.wait()
def test_seek_on_buffered_pipe(self): def test_seek_on_buffered_pipe(self):
f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024) f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
self.assertEquals(f.tell(),0) self.assertEquals(f.tell(), 0)
f.seek(0,2) f.seek(0, 2)
self.assertEquals(f.tell(),0) self.assertEquals(f.tell(), 0)
f.write('1234567890') f.write('1234567890')
f.seek(0,2) f.seek(0, 2)
self.assertEquals(f.tell(),10) self.assertEquals(f.tell(), 10)
f.seek(0) f.seek(0)
value = f.read(1) value = f.read(1)
self.assertEqual(value, '1') self.assertEqual(value, '1')
self.assertEquals(f.tell(),1) self.assertEquals(f.tell(), 1)
value = f.read(1) value = f.read(1)
self.assertEqual(value, '2') self.assertEqual(value, '2')
self.assertEquals(f.tell(),2) self.assertEquals(f.tell(), 2)
f.seek(0, 1) f.seek(0, 1)
self.assertEqual(f.readline(), '34567890') self.assertEqual(f.readline(), '34567890')
f.seek(0) f.seek(0)
@@ -625,19 +633,21 @@ class TestGreenPipe(LimitedTestCase):
self.assertEqual(f.readline(), '') self.assertEqual(f.readline(), '')
def test_truncate(self): def test_truncate(self):
f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024) f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
f.write('1234567890') f.write('1234567890')
f.truncate(9) f.truncate(9)
self.assertEquals(f.tell(), 9) self.assertEquals(f.tell(), 9)
class TestGreenIoLong(LimitedTestCase): class TestGreenIoLong(LimitedTestCase):
TEST_TIMEOUT=10 # the test here might take a while depending on the OS TEST_TIMEOUT = 10 # the test here might take a while depending on the OS
@skip_with_pyevent @skip_with_pyevent
def test_multiple_readers(self, clibufsize=False): def test_multiple_readers(self, clibufsize=False):
debug.hub_prevent_multiple_readers(False) debug.hub_prevent_multiple_readers(False)
recvsize = 2 * min_buf_size() recvsize = 2 * min_buf_size()
sendsize = 10 * recvsize sendsize = 10 * recvsize
# test that we can have multiple coroutines reading # test that we can have multiple coroutines reading
# from the same fd. We make no guarantees about which one gets which # from the same fd. We make no guarantees about which one gets which
# bytes, but they should both get at least some # bytes, but they should both get at least some
@@ -651,9 +661,10 @@ class TestGreenIoLong(LimitedTestCase):
results1 = [] results1 = []
results2 = [] results2 = []
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', 0)) listener.bind(('127.0.0.1', 0))
listener.listen(50) listener.listen(50)
def server(): def server():
(sock, addr) = listener.accept() (sock, addr) = listener.accept()
sock = bufsized(sock) sock = bufsized(sock)
@@ -689,12 +700,14 @@ class TestGreenIoLong(LimitedTestCase):
def test_multiple_readers2(self): def test_multiple_readers2(self):
self.test_multiple_readers(clibufsize=True) self.test_multiple_readers(clibufsize=True)
class TestGreenIoStarvation(LimitedTestCase): class TestGreenIoStarvation(LimitedTestCase):
# fixme: this doesn't succeed, because of eventlet's predetermined # fixme: this doesn't succeed, because of eventlet's predetermined
# ordering. two processes, one with server, one with client eventlets # ordering. two processes, one with server, one with client eventlets
# might be more reliable? # might be more reliable?
TEST_TIMEOUT=300 # the test here might take a while depending on the OS TEST_TIMEOUT = 300 # the test here might take a while depending on the OS
@skipped # by rdw, because it fails but it's not clear how to make it pass @skipped # by rdw, because it fails but it's not clear how to make it pass
@skip_with_pyevent @skip_with_pyevent
def test_server_starvation(self, sendloops=15): def test_server_starvation(self, sendloops=15):
@@ -704,7 +717,7 @@ class TestGreenIoStarvation(LimitedTestCase):
results = [[] for i in xrange(5)] results = [[] for i in xrange(5)]
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', 0)) listener.bind(('127.0.0.1', 0))
port = listener.getsockname()[1] port = listener.getsockname()[1]
listener.listen(50) listener.listen(50)
@@ -712,7 +725,7 @@ class TestGreenIoStarvation(LimitedTestCase):
base_time = time.time() base_time = time.time()
def server(my_results): def server(my_results):
(sock, addr) = listener.accept() sock, addr = listener.accept()
datasize = 0 datasize = 0
@@ -726,7 +739,7 @@ class TestGreenIoStarvation(LimitedTestCase):
if not data: if not data:
t2 = time.time() - base_time t2 = time.time() - base_time
my_results.append(datasize) my_results.append(datasize)
my_results.append((t1,t2)) my_results.append((t1, t2))
break break
datasize += len(data) datasize += len(data)
finally: finally: