- commit test code and add more test code

- fix bugs found with some unittest
This commit is contained in:
Hiroki Ohtani
2011-01-05 00:02:28 +09:00
parent a1538ab23f
commit 2be0632fd7
5 changed files with 208 additions and 17 deletions

9
data/header01.txt Normal file
View File

@@ -0,0 +1,9 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade: WebSocket
sec-websocket-location: http://localhost/r
sec-websocket-origin: http://localhost/r
some_header: something
ssssss
aaaaaaaa

9
data/header02.txt Normal file
View File

@@ -0,0 +1,9 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade WebSocket
sec-websocket-location: http://localhost/r
sec-websocket-origin: http://localhost/r
some_header: something
ssssss
aaaaaaaa

View File

@@ -1,6 +1,6 @@
from setuptools import setup
VERSION = "0.3"
VERSION = "0.4"
setup(

169
test.py Normal file
View File

@@ -0,0 +1,169 @@
# -*- coding: utf-8 -*-
#
import unittest
import websocket as ws
class StringSockMock:
def __init__(self):
self.set_data("")
self.sent = []
def set_data(self, data):
self.data = data
self.pos = 0
self.len = len(data)
def recv(self, bufsize):
if self.len < self.pos:
return
buf = self.data[self.pos: self.pos + bufsize]
self.pos += bufsize
return buf
def send(self, data):
self.sent.append(data)
class HeaderSockMock(StringSockMock):
def __init__(self, fname):
self.set_data(open(fname).read())
self.sent = []
class WebSocketTest(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def testDefaultTimeout(self):
self.assertEquals(ws.getdefaulttimeout(), None)
ws.setdefaulttimeout(10)
self.assertEquals(ws.getdefaulttimeout(), 10)
ws.setdefaulttimeout(None)
def testParseUrl(self):
p = ws._parse_url("ws://www.example.com/r")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 80)
self.assertEquals(p[2], "/r")
p = ws._parse_url("ws://www.example.com/")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 80)
self.assertEquals(p[2], "/")
p = ws._parse_url("ws://www.example.com")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 80)
self.assertEquals(p[2], "/")
p = ws._parse_url("ws://www.example.com:8080/r")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 8080)
self.assertEquals(p[2], "/r")
p = ws._parse_url("ws://www.example.com:8080/")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 8080)
self.assertEquals(p[2], "/")
p = ws._parse_url("ws://www.example.com:8080")
self.assertEquals(p[0], "www.example.com")
self.assertEquals(p[1], 8080)
self.assertEquals(p[2], "/")
# we do not support wss for a while
self.assertRaises(ValueError, ws._parse_url, "wss://www.example.com/r")
self.assertRaises(ValueError, ws._parse_url, "http://www.example.com/r")
def testWSKey(self):
n, k = ws._create_sec_websocket_key()
self.assert_(0 < n < (1<<32))
self.assert_(len(k) > 0)
k3 = ws._create_key3()
self.assertEquals(len(k3), 8)
def testWsUtils(self):
sock = ws.WebSocket()
self.assertNotEquals(sock._validate_resp(1,2,"test", "fuga"), True)
hashed = '6\xa3p\xb6#\xac\xb9=\xec\x0e\x96\xb5\xc1@\x1d\x90'
self.assertEquals(sock._validate_resp(1,2,"test", hashed), True)
hibi_header = {
"upgrade": "websocket",
"connection": "upgrade",
"sec-websocket-origin": "http://www.example.com",
"sec-websocket-location": "http://www.example.com",
}
self.assertEquals(sock._validate_header(hibi_header), (True, True))
header = hibi_header.copy()
header["upgrade"] = "http"
self.assertEquals(sock._validate_header(header), (False, False))
del header["upgrade"]
self.assertEquals(sock._validate_header(header), (False, False))
header = hibi_header.copy()
header["connection"] = "http"
self.assertEquals(sock._validate_header(header), (False, False))
del header["connection"]
self.assertEquals(sock._validate_header(header), (False, False))
header = hibi_header.copy()
header["sec-websocket-origin"] = "somewhere origin"
self.assertEquals(sock._validate_header(header), (True, True))
del header["sec-websocket-origin"]
self.assertEquals(sock._validate_header(header), (False, True))
header = hibi_header.copy()
header["sec-websocket-location"] = "somewhere location"
self.assertEquals(sock._validate_header(header), (True, True))
del header["sec-websocket-location"]
self.assertEquals(sock._validate_header(header), (False, True))
def testReadHeader(self):
sock = ws.WebSocket()
sock.sock = HeaderSockMock("data/header01.txt")
status, header = sock._read_headers()
self.assertEquals(status, 101)
self.assertEquals(header["connection"], "upgrade")
self.assertEquals("ssssss" in header, False)
self.assertEquals(sock._get_resp(), "ssssss\r\naaaaaaaa")
sock.sock = HeaderSockMock("data/header02.txt")
self.assertRaises(ws.WebSocketException, sock._read_headers)
def testSend(self):
sock = ws.WebSocket()
s = sock.sock = HeaderSockMock("data/header01.txt")
sock.send("Hello")
self.assertEquals(s.sent[0], "\x00Hello\xff")
sock.send("こんにちは")
self.assertEquals(s.sent[1], "\x00こんにちは\xff")
sock.send(u"こんにちは")
self.assertEquals(s.sent[1], "\x00こんにちは\xff")
def testRecv(self):
sock = ws.WebSocket()
s = sock.sock = StringSockMock()
s.set_data("\x00こんにちは\xff")
data = sock.recv()
self.assertEquals(data, "こんにちは")
s.set_data("\x01\x05Hello")
data = sock.recv()
self.assertEquals(data, "Hello")
s.set_data("\x01\x81\x7f" + ("a"*255))
data = sock.recv()
self.assertEquals(len(data), 255)
self.assertEquals(data, "a" * 255)
if __name__ == "__main__":
unittest.main()

View File

@@ -45,6 +45,7 @@ def _parse_url(url):
else:
port = 80
elif parsed.scheme == "wss":
# TODO: support wss
raise ValueError("scheme wss is not supported")
else:
raise ValueError("scheme %s is invalid" % parsed.scheme)
@@ -137,6 +138,7 @@ class WebSocket(object):
Connect to url. url is websocket url scheme. ie. ws://host:port/resource
"""
hostname, port, resource = _parse_url(url)
# TODO: we need to support proxy
self.sock.connect((hostname, port))
self._handshake(hostname, port, resource, **options)
@@ -166,7 +168,7 @@ class WebSocket(object):
header_str = "\r\n".join(headers)
sock.send(header_str)
if enableTrace:
if traceEnabled:
print "--- request header ---"
print header_str
print "-----------------------"
@@ -197,11 +199,8 @@ class WebSocket(object):
return resp == digest
def _get_resp(self):
to = self.sock.gettimeout()
self.sock.settimeout(3)
result = self._recv(16)
self.sock.settimeout(to)
if enableTrace:
if traceEnabled:
print "--- challenge response result ---"
print repr(result)
print "---------------------------------"
@@ -211,9 +210,10 @@ class WebSocket(object):
def _validate_header(self, headers):
#TODO: check other headers
for key, value in HEADERS_TO_CHECK.iteritems():
v = headers[key]
v = headers.get(key, None)
if value != v:
return False
return False, False
success = 0
for key in HEADERS_TO_EXIST_FOR_HYBI00:
if key in headers:
@@ -236,7 +236,7 @@ class WebSocket(object):
def _read_headers(self):
status = None
headers = {}
if enableTrace:
if traceEnabled:
print "--- response header ---"
while True:
@@ -244,16 +244,20 @@ class WebSocket(object):
if line == "\r\n":
break
line = line.strip()
if enableTrace:
if traceEnabled:
print line
if not status:
status_info = line.split(" ", 2)
status = int(status_info[1])
else:
key, value = line.split(":", 1)
headers[key.lower()] = value.strip().lower()
kv = line.split(":", 1)
if len(kv) == 2:
key, value = kv
headers[key.lower()] = value.strip().lower()
else:
raise WebSocketException("Invalid header")
if enableTrace:
if traceEnabled:
print "-----------------------"
return status, headers
@@ -281,7 +285,7 @@ class WebSocket(object):
else:
bytes.append(b)
return "".join(bytes)
elif (frame_type & 0x80) == 0x80:
elif 0 < frame_type < 0x80:
# which frame type is valid?
length = self._read_length()
bytes = self._recv_strict(length)
@@ -292,7 +296,7 @@ class WebSocket(object):
while True:
b = ord(self._recv(1))
length = length * (1 << 7) + (b & 0x7f)
if (b & 0x80) == 0x80:
if b < 0x80:
break
return length
@@ -318,13 +322,13 @@ class WebSocket(object):
return bytes
def _recv_strict(self, bufsize):
remaining = bufszie
remaining = bufsize
bytes = ""
while remaining:
bytes += self._recv(remaining)
remaining = bufsize - len(bytes)
return self._recv(bufsize)
return bytes
def _recv_line(self):
line = []