# -*- coding: utf-8 -*- # import sys sys.path[0:0] = [""] import os.path import base64 import socket try: from ssl import SSLError except ImportError: # dummy class of SSLError for ssl none-support environment. class SSLError(Exception): pass import unittest import uuid # websocket-client import websocket as ws # There are three tests to test the library's behaviour when the received # message is fragmented (please see [1] for details). They fail currently as # the library doesn't yet support fragmentation. Therefore the tests are # skipped unless the flag below is set. # # [1]: https://tools.ietf.org/html/rfc6455#section-5.4 # "RFC6455: 5.4. Fragmentation" # TEST_FRAGMENTATION = True TEST_WITH_INTERNET = True TRACABLE = False def create_mask_key(n): return "abcd" class SockMock(object): def __init__(self): self.data = [] self.sent = [] def add_packet(self, data): self.data.append(data) def recv(self, bufsize): if self.data: e = self.data.pop(0) if isinstance(e, Exception): raise e if len(e) > bufsize: self.data.insert(0, e[bufsize:]) return e[:bufsize] def send(self, data): self.sent.append(data) return len(data) class HeaderSockMock(SockMock): def __init__(self, fname): SockMock.__init__(self) path = os.path.join(os.path.dirname(__file__), fname) self.add_packet(open(path).read()) class WebSocketTest(unittest.TestCase): def setUp(self): ws.enableTrace(TRACABLE) def tearDown(self): pass def testDefaultTimeout(self): self.assertEquals(ws.getdefaulttimeout(), None) ws.setdefaulttimeout(10) self.assertEquals(ws.getdefaulttimeout(), 10) ws.setdefaulttimeout(None) def testParseUrl(self): p = ws._parse_url("ws://www.example.com/r") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 80) self.assertEquals(p[2], "/r") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com/r/") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 80) self.assertEquals(p[2], "/r/") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com/") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 80) self.assertEquals(p[2], "/") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 80) self.assertEquals(p[2], "/") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com:8080/r") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 8080) self.assertEquals(p[2], "/r") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com:8080/") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 8080) self.assertEquals(p[2], "/") self.assertEquals(p[3], False) p = ws._parse_url("ws://www.example.com:8080") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 8080) self.assertEquals(p[2], "/") self.assertEquals(p[3], False) p = ws._parse_url("wss://www.example.com:8080/r") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 8080) self.assertEquals(p[2], "/r") self.assertEquals(p[3], True) p = ws._parse_url("wss://www.example.com:8080/r?key=value") self.assertEquals(p[0], "www.example.com") self.assertEquals(p[1], 8080) self.assertEquals(p[2], "/r?key=value") self.assertEquals(p[3], True) self.assertRaises(ValueError, ws._parse_url, "http://www.example.com/r") def testWSKey(self): key = ws._create_sec_websocket_key() self.assert_(key != 24) self.assert_("¥n" not in key) def testWsUtils(self): sock = ws.WebSocket() key = "c6b8hTg4EeGb2gQMztV1/g==" required_header = { "upgrade": "websocket", "connection": "upgrade", "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=", } self.assertEquals(sock._validate_header(required_header, key), True) header = required_header.copy() header["upgrade"] = "http" self.assertEquals(sock._validate_header(header, key), False) del header["upgrade"] self.assertEquals(sock._validate_header(header, key), False) header = required_header.copy() header["connection"] = "something" self.assertEquals(sock._validate_header(header, key), False) del header["connection"] self.assertEquals(sock._validate_header(header, key), False) header = required_header.copy() header["sec-websocket-accept"] = "something" self.assertEquals(sock._validate_header(header, key), False) del header["sec-websocket-accept"] self.assertEquals(sock._validate_header(header, key), False) def testReadHeader(self): sock = ws.WebSocket() sock.sock = HeaderSockMock("data/header01.txt") status, header = sock._read_headers() self.assertEquals(status, 101) self.assertEquals(header["connection"], "upgrade") sock.sock = HeaderSockMock("data/header02.txt") self.assertRaises(ws.WebSocketException, sock._read_headers) def testSend(self): # TODO: add longer frame data sock = ws.WebSocket() sock.set_mask_key(create_mask_key) s = sock.sock = HeaderSockMock("data/header01.txt") sock.send("Hello") self.assertEquals(s.sent[0], "\x81\x85abcd)\x07\x0f\x08\x0e") sock.send("こんにちは") self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") sock.send(u"こんにちは") self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") def testRecv(self): # TODO: add longer frame data sock = ws.WebSocket() s = sock.sock = SockMock() s.add_packet("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") data = sock.recv() self.assertEquals(data, "こんにちは") s.add_packet("\x81\x85abcd)\x07\x0f\x08\x0e") data = sock.recv() self.assertEquals(data, "Hello") def testInternalRecvStrict(self): sock = ws.WebSocket() s = sock.sock = SockMock() s.add_packet("foo") s.add_packet(socket.timeout()) s.add_packet("bar") s.add_packet(SSLError("The read operation timed out")) s.add_packet("baz") with self.assertRaises(ws.WebSocketTimeoutException): data = sock._recv_strict(9) with self.assertRaises(ws.WebSocketTimeoutException): data = sock._recv_strict(9) data = sock._recv_strict(9) self.assertEquals(data, "foobarbaz") with self.assertRaises(ws.WebSocketConnectionClosedException): data = sock._recv_strict(1) def testRecvTimeout(self): sock = ws.WebSocket() s = sock.sock = SockMock() s.add_packet("\x81") s.add_packet(socket.timeout()) s.add_packet("\x8dabcd\x29\x07\x0f\x08\x0e") s.add_packet(socket.timeout()) s.add_packet("\x4e\x43\x33\x0e\x10\x0f\x00\x40") with self.assertRaises(ws.WebSocketTimeoutException): data = sock.recv() with self.assertRaises(ws.WebSocketTimeoutException): data = sock.recv() data = sock.recv() self.assertEquals(data, "Hello, World!") with self.assertRaises(ws.WebSocketConnectionClosedException): data = sock.recv() @unittest.skipUnless(TEST_FRAGMENTATION, "fragmentation not implemented") def testRecvWithSimpleFragmentation(self): sock = ws.WebSocket() s = sock.sock = SockMock() # OPCODE=TEXT, FIN=0, MSG="Brevity is " s.add_packet("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C") # OPCODE=CONT, FIN=1, MSG="the soul of wit" s.add_packet("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17") data = sock.recv() self.assertEqual(data, "Brevity is the soul of wit") with self.assertRaises(ws.WebSocketConnectionClosedException): sock.recv() @unittest.skipUnless(TEST_FRAGMENTATION, "fragmentation not implemented") def testRecvContFragmentation(self): sock = ws.WebSocket() s = sock.sock = SockMock() # OPCODE=CONT, FIN=1, MSG="the soul of wit" s.add_packet("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17") self.assertRaises(ws.WebSocketException, sock.recv) @unittest.skipUnless(TEST_FRAGMENTATION, "fragmentation not implemented") def testRecvWithProlongedFragmentation(self): sock = ws.WebSocket() s = sock.sock = SockMock() # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, " s.add_packet("\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15" \ "\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC") # OPCODE=CONT, FIN=0, MSG="dear friends, " s.add_packet("\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07" \ "\x17MB") # OPCODE=CONT, FIN=1, MSG="once more" s.add_packet("\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04") data = sock.recv() self.assertEqual(data, "Once more unto the breach, dear friends, " \ "once more") with self.assertRaises(ws.WebSocketConnectionClosedException): sock.recv() @unittest.skipUnless(TEST_FRAGMENTATION, "fragmentation not implemented") def testRecvWithFragmentationAndControlFrame(self): sock = ws.WebSocket() sock.set_mask_key(create_mask_key) s = sock.sock = SockMock() # OPCODE=TEXT, FIN=0, MSG="Too much " s.add_packet("\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA") # OPCODE=PING, FIN=1, MSG="Please PONG this" s.add_packet("\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17") # OPCODE=CONT, FIN=1, MSG="of a good thing" s.add_packet("\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c" \ "\x08\x0c\x04") data = sock.recv() self.assertEqual(data, "Too much of a good thing") with self.assertRaises(ws.WebSocketConnectionClosedException): sock.recv() self.assertEqual(s.sent[0], "\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D" \ "\x15\n\n\x17") @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testWebSocket(self): s = ws.create_connection("ws://echo.websocket.org/") self.assertNotEquals(s, None) s.send("Hello, World") result = s.recv() self.assertEquals(result, "Hello, World") s.send("こにゃにゃちは、世界") result = s.recv() self.assertEquals(result, "こにゃにゃちは、世界") s.close() @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testPingPong(self): s = ws.create_connection("ws://echo.websocket.org/") self.assertNotEquals(s, None) s.ping("Hello") s.pong("Hi") s.close() def testSecureWebSocket(self): try: import ssl s = ws.create_connection("wss://echo.websocket.org/") self.assertNotEquals(s, None) self.assert_(isinstance(s.sock, ssl.SSLSock)) s.send("Hello, World") result = s.recv() self.assertEquals(result, "Hello, World") s.send("こにゃにゃちは、世界") result = s.recv() self.assertEquals(result, "こにゃにゃちは、世界") s.close() except: pass @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testWebSocketWihtCustomHeader(self): s = ws.create_connection("ws://echo.websocket.org/", headers={"User-Agent": "PythonWebsocketClient"}) self.assertNotEquals(s, None) s.send("Hello, World") result = s.recv() self.assertEquals(result, "Hello, World") s.close() @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testAfterClose(self): from socket import error s = ws.create_connection("ws://echo.websocket.org/") self.assertNotEquals(s, None) s.close() self.assertRaises(error, s.send, "Hello") self.assertRaises(error, s.recv) def testUUID4(self): """ WebSocket key should be a UUID4. """ key = ws._create_sec_websocket_key() u = uuid.UUID(bytes=base64.b64decode(key)) self.assertEquals(4, u.version) class WebSocketAppTest(unittest.TestCase): class NotSetYet(object): """ A marker class for signalling that a value hasn't been set yet. """ def setUp(self): ws.enableTrace(TRACABLE) WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() def tearDown(self): WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testKeepRunning(self): """ A WebSocketApp should keep running as long as its self.keep_running is not False (in the boolean context). """ def on_open(self, *args, **kwargs): """ Set the keep_running flag for later inspection and immediately close the connection. """ WebSocketAppTest.keep_running_open = self.keep_running self.close() def on_close(self, *args, **kwargs): """ Set the keep_running flag for the test to use. """ WebSocketAppTest.keep_running_close = self.keep_running app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close) app.run_forever() self.assertFalse(isinstance(WebSocketAppTest.keep_running_open, WebSocketAppTest.NotSetYet)) self.assertFalse(isinstance(WebSocketAppTest.keep_running_close, WebSocketAppTest.NotSetYet)) self.assertEquals(True, WebSocketAppTest.keep_running_open) self.assertEquals(False, WebSocketAppTest.keep_running_close) @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testSockMaskKey(self): """ A WebSocketApp should forward the received mask_key function down to the actual socket. """ def my_mask_key_func(): pass def on_open(self, *args, **kwargs): """ Set the value so the test can use it later on and immediately close the connection. """ WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) self.close() app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func) app.run_forever() # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. self.assertEquals(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)) class SockOptTest(unittest.TestCase): @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") def testSockOpt(self): sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),) s = ws.WebSocket(sockopt=sockopt) self.assertNotEquals(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt) self.assertNotEquals(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) if __name__ == "__main__": unittest.main()