parent
931f79d25e
commit
b6e7f71474
@ -3,6 +3,11 @@
|
||||
import websocket
|
||||
import json
|
||||
import traceback
|
||||
import six
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
SERVER = 'ws://127.0.0.1:8642'
|
||||
AGENT = 'py-websockets-cleint'
|
||||
@ -30,8 +35,8 @@ for case in range(1, count+1):
|
||||
success += 1
|
||||
except Exception as e:
|
||||
failed += 1
|
||||
print("[Faield] Test Case: " + str(case))
|
||||
print(traceback.format_exc())
|
||||
# print("[Faield] Test Case: " + str(case))
|
||||
#print(traceback.format_exc())
|
||||
finally:
|
||||
ws.close()
|
||||
|
||||
|
@ -57,7 +57,7 @@ import logging
|
||||
# websocket modules
|
||||
from ._exceptions import *
|
||||
from ._abnf import ABNF
|
||||
from ._utils import NoLock
|
||||
from ._utils import NoLock, validate_utf8
|
||||
|
||||
"""
|
||||
websocket python client.
|
||||
@ -732,6 +732,8 @@ class WebSocket(object):
|
||||
data = self._cont_data
|
||||
self._cont_data = None
|
||||
frame.data = data[1]
|
||||
if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not validate_utf8(frame.data):
|
||||
raise UnicodeDecodeError("cannot decode: " + repr(frame.data))
|
||||
return [data[0], frame]
|
||||
|
||||
elif frame.opcode == ABNF.OPCODE_CLOSE:
|
||||
|
@ -19,6 +19,7 @@ Copyright (C) 2010 Hiroki Ohtani(liris)
|
||||
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
class NoLock(object):
|
||||
def __enter__(self):
|
||||
@ -27,3 +28,57 @@ class NoLock(object):
|
||||
def __exit__(self,type, value, traceback):
|
||||
pass
|
||||
|
||||
|
||||
# UTF-8 validator
|
||||
# python implementation of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
|
||||
|
||||
UTF8_ACCEPT = 0
|
||||
UTF8_REJECT=12
|
||||
|
||||
_UTF8D = [
|
||||
# The first part of the table maps bytes to character classes that
|
||||
# to reduce the size of the transition table and create bitmasks.
|
||||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
|
||||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
|
||||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
|
||||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
|
||||
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,
|
||||
7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,
|
||||
8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
|
||||
10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8,
|
||||
|
||||
# The second part is a transition table that maps a combination
|
||||
# of a state of the automaton and a character class to a state.
|
||||
0,12,24,36,60,96,84,12,12,12,48,72, 12,12,12,12,12,12,12,12,12,12,12,12,
|
||||
12, 0,12,12,12,12,12, 0,12, 0,12,12, 12,24,12,12,12,12,12,24,12,24,12,12,
|
||||
12,12,12,12,12,12,12,24,12,12,12,12, 12,24,12,12,12,12,12,12,12,24,12,12,
|
||||
12,12,12,12,12,12,12,36,12,36,12,12, 12,36,12,12,12,12,12,36,12,36,12,12,
|
||||
12,36,12,12,12,12,12,12,12,12,12,12, ]
|
||||
|
||||
def _decode(state, codep, ch):
|
||||
tp = _UTF8D[ch]
|
||||
|
||||
codep = (ch & 0x3f ) | (codep << 6) if (state != UTF8_ACCEPT) else (0xff >> tp) & (ch)
|
||||
state = _UTF8D[256 + state + tp]
|
||||
|
||||
return state, codep;
|
||||
|
||||
def validate_utf8(utfbytes):
|
||||
"""
|
||||
validate utf8 byte string.
|
||||
utfbytes: utf byte string to check.
|
||||
return value: if valid utf8 string, return true. Otherwise, return false.
|
||||
"""
|
||||
state = UTF8_ACCEPT
|
||||
codep = 0
|
||||
for i in utfbytes:
|
||||
if six.PY2:
|
||||
i = ord(i)
|
||||
state, codep = _decode(state, codep, i)
|
||||
if state == UTF8_REJECT:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
@ -25,6 +25,7 @@ import uuid
|
||||
# websocket-client
|
||||
import websocket as ws
|
||||
from websocket._core import _parse_url, _create_sec_websocket_key
|
||||
from websocket._utils import validate_utf8
|
||||
|
||||
|
||||
# Skip test to access the internet.
|
||||
@ -32,7 +33,6 @@ TEST_WITH_INTERNET = False
|
||||
|
||||
# Skip Secure WebSocket test.
|
||||
TEST_SECURE_WS = False
|
||||
|
||||
TRACABLE = False
|
||||
|
||||
|
||||
@ -524,6 +524,12 @@ class SockOptTest(unittest.TestCase):
|
||||
self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
|
||||
s.close()
|
||||
|
||||
class UtilsTest(unittest.TestCase):
|
||||
def testUtf8Validator(self):
|
||||
state = validate_utf8(six.b('\xf0\x90\x80\x80'))
|
||||
self.assertEqual(state, True)
|
||||
state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited'))
|
||||
self.assertEqual(state, False)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user