adopted test_socketserver.py from standard tests

This commit is contained in:
Denis Bilenko
2008-10-08 12:19:42 +07:00
parent 918686bc97
commit 8f97c238ba
2 changed files with 228 additions and 0 deletions

View File

@@ -0,0 +1,218 @@
# Test suite for SocketServer.py
from greentest import test_support
from greentest.test_support import (verbose, verify, TESTFN, TestSkipped,
reap_children)
test_support.requires('network')
import errno
import os
NREQ = 3
DELAY = 0.1
class MyMixinHandler:
def handle(self):
time.sleep(DELAY)
line = self.rfile.readline()
time.sleep(DELAY)
self.wfile.write(line)
class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
pass
class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
pass
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
teststring = "hello world\r\n"
def receive(sock, n, timeout=20):
#print 'select args: ', [sock], [], [], timeout
r, w, x = select.select([sock], [], [], timeout)
#print 'select results: ', r, w, x
if sock in r:
return sock.recv(n)
else:
raise RuntimeError, "timed out on %r" % (sock,)
def testdgram(proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(teststring, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
def teststream(proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(teststring)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
class ServerThread(threading.Thread):
def __init__(self, addr, svrcls, hdlrcls):
threading.Thread.__init__(self)
self.__addr = addr
self.__svrcls = svrcls
self.__hdlrcls = hdlrcls
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
if verbose: print "thread: creating server"
svr = svrcls(self.__addr, self.__hdlrcls)
# pull the address out of the server in case it changed
# this can happen if another process is using the port
addr = svr.server_address
if addr:
self.__addr = addr
if self.__addr != svr.socket.getsockname():
raise RuntimeError('server_address was %s, expected %s' %
(self.__addr, svr.socket.getsockname()))
if verbose: print "thread: serving three times"
svr.serve_a_few()
if verbose: print "thread: done"
seed = 0
def pickport():
global seed
seed += 1
return 10000 + (os.getpid() % 1000)*10 + seed
host = "localhost"
testfiles = []
def pickaddr(proto):
if proto == socket.AF_INET:
return (host, pickport())
else:
fn = TESTFN + str(pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
fn = os.path.join('\socket', fn)
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
testfiles.append(fn)
return fn
def cleanup():
for fn in testfiles:
try:
os.remove(fn)
except os.error:
pass
testfiles[:] = []
def testloop(proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = pickaddr(proto)
if verbose:
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print "server created"
t.start()
if verbose: print "server running"
for i in range(NREQ):
time.sleep(DELAY)
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print "waiting for server"
t.join()
if verbose: print "done"
class ForgivingTCPServer(TCPServer):
# prevent errors if another process is using the port we want
def server_bind(self):
host, default_port = self.server_address
# this code shamelessly stolen from test.test_support
# the ports were changed to protect the innocent
import sys
for port in [default_port, 3434, 8798, 23833]:
try:
self.server_address = host, port
TCPServer.server_bind(self)
break
except socket.error, (err, msg):
if err != errno.EADDRINUSE:
raise
print >>sys.__stderr__, \
' WARNING: failed to listen on port %d, trying another' % port
tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
tcpservers.append(ForkingTCPServer)
udpservers = [UDPServer, ThreadingUDPServer]
print 'WARNING: ThreadingUDPServer is disabled'
udpservers = [UDPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
udpservers.append(ForkingUDPServer)
if not hasattr(socket, 'AF_UNIX'):
streamservers = []
dgramservers = []
else:
class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
streamservers.append(ForkingUnixStreamServer)
class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
dgramservers.append(ForkingUnixDatagramServer)
def sloppy_cleanup():
# See http://python.org/sf/1540386
# We need to reap children here otherwise a child from one server
# can be left running for the next server and cause a test failure.
time.sleep(DELAY)
reap_children()
def testall():
testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
sloppy_cleanup()
testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
if hasattr(socket, 'AF_UNIX'):
sloppy_cleanup()
testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
def test_main():
import imp
if imp.lock_held():
# If the import lock is held, the threads will hang.
raise TestSkipped("can't run when import lock is held")
try:
testall()
finally:
cleanup()
reap_children()
if __name__ == "__main__":
test_main()

View File

@@ -0,0 +1,10 @@
import sys
if 'twisted.internet.reactor' not in sys.modules:
from twisted.internet import pollreactor; pollreactor.install()
from eventlet.green.SocketServer import *
from eventlet.green import socket
from eventlet.green import select
from eventlet.green import time
from eventlet.green import threading
execfile('test_socketserver.py')