refs #117 - improve close frame.

This commit is contained in:
liris
2014-10-15 14:53:29 +09:00
parent b6e7f71474
commit 158663b84f
4 changed files with 51 additions and 18 deletions

View File

@@ -22,6 +22,7 @@ failed = 0
for case in range(1, count+1):
url = SERVER + '/runCase?case={}&agent={}'.format(case, AGENT)
status = websocket.STATUS_NORMAL
try:
ws = websocket.create_connection(url)
opcode, msg = ws.recv_data()
@@ -33,12 +34,13 @@ for case in range(1, count+1):
except UnicodeDecodeError:
# this case is ok.
success += 1
status = websocket.STATUS_PROTOCOL_ERROR
except Exception as e:
failed += 1
# print("[Faield] Test Case: " + str(case))
#print(traceback.format_exc())
status = websocket.STATUS_PROTOCOL_ERROR
print(traceback.format_exc())
finally:
ws.close()
ws.close(status)
print("Ran {} test cases. success: {}, faield: {}".format(case, success, failed))
url = SERVER + '/updateReports?agent={}'.format(AGENT)

View File

@@ -23,8 +23,33 @@ import array
import struct
import os
from ._exceptions import *
from ._utils import validate_utf8
# closing frame status codes.
STATUS_NORMAL = 1000
STATUS_GOING_AWAY = 1001
STATUS_PROTOCOL_ERROR = 1002
STATUS_UNSUPPORTED_DATA_TYPE = 1003
STATUS_STATUS_NOT_AVAILABLE = 1005
STATUS_ABNORMAL_CLOSED = 1006
STATUS_INVALID_PAYLOAD = 1007
STATUS_POLICY_VIOLATION = 1008
STATUS_MESSAGE_TOO_BIG = 1009
STATUS_INVALID_EXTENSION = 1010
STATUS_UNEXPECTED_CONDITION = 1011
STATUS_TLS_HANDSHAKE_ERROR = 1015
VALID_CLOSE_STATUS = (
STATUS_NORMAL,
STATUS_GOING_AWAY,
STATUS_PROTOCOL_ERROR,
STATUS_UNSUPPORTED_DATA_TYPE,
STATUS_INVALID_PAYLOAD,
STATUS_POLICY_VIOLATION,
STATUS_MESSAGE_TOO_BIG,
STATUS_INVALID_EXTENSION,
STATUS_UNEXPECTED_CONDITION,
)
class ABNF(object):
"""
@@ -85,6 +110,21 @@ class ABNF(object):
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketException("Invalid close frame.")
if l > 2 and not validate_utf8(self.data[2:]):
raise WebSocketException("Invalid close frame.")
code = 256*six.byte2int(self.data[0]) + six.byte2int(self.data[1])
if not self._is_valid_close_status(code):
raise WebSocketException("Invalid close opcode.")
def _is_valid_close_status(self, code):
return code in VALID_CLOSE_STATUS or (3000 <= code <5000)
def __str__(self):
return "fin=" + str(self.fin) \
+ " opcode=" + str(self.opcode) \

View File

@@ -56,7 +56,7 @@ import logging
# websocket modules
from ._exceptions import *
from ._abnf import ABNF
from ._abnf import *
from ._utils import NoLock, validate_utf8
"""
@@ -71,19 +71,6 @@ Please see http://tools.ietf.org/html/rfc6455 for protocol.
# websocket supported version.
VERSION = 13
# closing frame status codes.
STATUS_NORMAL = 1000
STATUS_GOING_AWAY = 1001
STATUS_PROTOCOL_ERROR = 1002
STATUS_UNSUPPORTED_DATA_TYPE = 1003
STATUS_STATUS_NOT_AVAILABLE = 1005
STATUS_ABNORMAL_CLOSED = 1006
STATUS_INVALID_PAYLOAD = 1007
STATUS_POLICY_VIOLATION = 1008
STATUS_MESSAGE_TOO_BIG = 1009
STATUS_INVALID_EXTENSION = 1010
STATUS_UNEXPECTED_CONDITION = 1011
STATUS_TLS_HANDSHAKE_ERROR = 1015
DEFAULT_SOCKET_OPTION = [(socket.SOL_TCP, socket.TCP_NODELAY, 1),]
if hasattr(socket, "SO_KEEPALIVE"):
@@ -733,7 +720,7 @@ class WebSocket(object):
self._cont_data = None
frame.data = data[1]
if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not validate_utf8(frame.data):
raise UnicodeDecodeError("cannot decode: " + repr(frame.data))
raise WebSocketException("cannot decode: " + repr(frame.data))
return [data[0], frame]
elif frame.opcode == ABNF.OPCODE_CLOSE:
@@ -742,6 +729,8 @@ class WebSocket(object):
elif frame.opcode == ABNF.OPCODE_PING:
if len(frame.data) < 126:
self.pong(frame.data)
else:
raise WebSocketException("Protocol Error")
if control_frame:
return (frame.opcode, frame)
elif frame.opcode == ABNF.OPCODE_PONG:

View File

@@ -530,6 +530,8 @@ class UtilsTest(unittest.TestCase):
self.assertEqual(state, True)
state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited'))
self.assertEqual(state, False)
state = validate_utf8(six.b(''))
self.assertEqual(state, True)
if __name__ == "__main__":
unittest.main()