diff --git a/compliance/test_fuzzingclient.py b/compliance/test_fuzzingclient.py index 038a2c6..63fc326 100644 --- a/compliance/test_fuzzingclient.py +++ b/compliance/test_fuzzingclient.py @@ -3,6 +3,11 @@ import websocket import json import traceback +import six + + + + SERVER = 'ws://127.0.0.1:8642' AGENT = 'py-websockets-cleint' @@ -30,8 +35,8 @@ for case in range(1, count+1): success += 1 except Exception as e: failed += 1 - print("[Faield] Test Case: " + str(case)) - print(traceback.format_exc()) + # print("[Faield] Test Case: " + str(case)) + #print(traceback.format_exc()) finally: ws.close() diff --git a/websocket/_core.py b/websocket/_core.py index ef1247f..a0ffc37 100644 --- a/websocket/_core.py +++ b/websocket/_core.py @@ -57,7 +57,7 @@ import logging # websocket modules from ._exceptions import * from ._abnf import ABNF -from ._utils import NoLock +from ._utils import NoLock, validate_utf8 """ websocket python client. @@ -732,6 +732,8 @@ class WebSocket(object): data = self._cont_data self._cont_data = None frame.data = data[1] + if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not validate_utf8(frame.data): + raise UnicodeDecodeError("cannot decode: " + repr(frame.data)) return [data[0], frame] elif frame.opcode == ABNF.OPCODE_CLOSE: diff --git a/websocket/_utils.py b/websocket/_utils.py index c23bf6f..31f7d0c 100644 --- a/websocket/_utils.py +++ b/websocket/_utils.py @@ -19,6 +19,7 @@ Copyright (C) 2010 Hiroki Ohtani(liris) """ +import six class NoLock(object): def __enter__(self): @@ -27,3 +28,57 @@ class NoLock(object): def __exit__(self,type, value, traceback): pass + +# UTF-8 validator +# python implementation of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ + +UTF8_ACCEPT = 0 +UTF8_REJECT=12 + +_UTF8D = [ + # The first part of the table maps bytes to character classes that + # to reduce the size of the transition table and create bitmasks. + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, + 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, + 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, + 10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8, + + # The second part is a transition table that maps a combination + # of a state of the automaton and a character class to a state. + 0,12,24,36,60,96,84,12,12,12,48,72, 12,12,12,12,12,12,12,12,12,12,12,12, + 12, 0,12,12,12,12,12, 0,12, 0,12,12, 12,24,12,12,12,12,12,24,12,24,12,12, + 12,12,12,12,12,12,12,24,12,12,12,12, 12,24,12,12,12,12,12,12,12,24,12,12, + 12,12,12,12,12,12,12,36,12,36,12,12, 12,36,12,12,12,12,12,36,12,36,12,12, + 12,36,12,12,12,12,12,12,12,12,12,12, ] + +def _decode(state, codep, ch): + tp = _UTF8D[ch] + + codep = (ch & 0x3f ) | (codep << 6) if (state != UTF8_ACCEPT) else (0xff >> tp) & (ch) + state = _UTF8D[256 + state + tp] + + return state, codep; + +def validate_utf8(utfbytes): + """ + validate utf8 byte string. + utfbytes: utf byte string to check. + return value: if valid utf8 string, return true. Otherwise, return false. + """ + state = UTF8_ACCEPT + codep = 0 + for i in utfbytes: + if six.PY2: + i = ord(i) + state, codep = _decode(state, codep, i) + if state == UTF8_REJECT: + return False + + return True + + + diff --git a/websocket/tests/test_websocket.py b/websocket/tests/test_websocket.py index 84ea5ca..eab6c47 100644 --- a/websocket/tests/test_websocket.py +++ b/websocket/tests/test_websocket.py @@ -25,6 +25,7 @@ import uuid # websocket-client import websocket as ws from websocket._core import _parse_url, _create_sec_websocket_key +from websocket._utils import validate_utf8 # Skip test to access the internet. @@ -32,7 +33,6 @@ TEST_WITH_INTERNET = False # Skip Secure WebSocket test. TEST_SECURE_WS = False - TRACABLE = False @@ -524,6 +524,12 @@ class SockOptTest(unittest.TestCase): self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) s.close() +class UtilsTest(unittest.TestCase): + def testUtf8Validator(self): + state = validate_utf8(six.b('\xf0\x90\x80\x80')) + self.assertEqual(state, True) + state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited')) + self.assertEqual(state, False) if __name__ == "__main__": unittest.main()