167 lines
5.0 KiB
Python
167 lines
5.0 KiB
Python
"""
|
|
websocket - WebSocket client library for Python
|
|
|
|
Copyright (C) 2010 Hiroki Ohtani(liris)
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 2.1 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
"""
|
|
import six
|
|
import array
|
|
import struct
|
|
import os
|
|
|
|
|
|
|
|
|
|
class ABNF(object):
|
|
"""
|
|
ABNF frame class.
|
|
see http://tools.ietf.org/html/rfc5234
|
|
and http://tools.ietf.org/html/rfc6455#section-5.2
|
|
"""
|
|
|
|
# operation code values.
|
|
OPCODE_CONT = 0x0
|
|
OPCODE_TEXT = 0x1
|
|
OPCODE_BINARY = 0x2
|
|
OPCODE_CLOSE = 0x8
|
|
OPCODE_PING = 0x9
|
|
OPCODE_PONG = 0xa
|
|
|
|
# available operation code value tuple
|
|
OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
|
|
OPCODE_PING, OPCODE_PONG)
|
|
|
|
# opcode human readable string
|
|
OPCODE_MAP = {
|
|
OPCODE_CONT: "cont",
|
|
OPCODE_TEXT: "text",
|
|
OPCODE_BINARY: "binary",
|
|
OPCODE_CLOSE: "close",
|
|
OPCODE_PING: "ping",
|
|
OPCODE_PONG: "pong"
|
|
}
|
|
|
|
# data length threashold.
|
|
LENGTH_7 = 0x7e
|
|
LENGTH_16 = 1 << 16
|
|
LENGTH_63 = 1 << 63
|
|
|
|
def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0,
|
|
opcode=OPCODE_TEXT, mask=1, data=""):
|
|
"""
|
|
Constructor for ABNF.
|
|
please check RFC for arguments.
|
|
"""
|
|
self.fin = fin
|
|
self.rsv1 = rsv1
|
|
self.rsv2 = rsv2
|
|
self.rsv3 = rsv3
|
|
self.opcode = opcode
|
|
self.mask = mask
|
|
self.data = data
|
|
self.get_mask_key = os.urandom
|
|
|
|
def __str__(self):
|
|
return "fin=" + str(self.fin) \
|
|
+ " opcode=" + str(self.opcode) \
|
|
+ " data=" + str(self.data)
|
|
|
|
@staticmethod
|
|
def create_frame(data, opcode, fin=1):
|
|
"""
|
|
create frame to send text, binary and other data.
|
|
|
|
data: data to send. This is string value(byte array).
|
|
if opcode is OPCODE_TEXT and this value is uniocde,
|
|
data value is conveted into unicode string, automatically.
|
|
|
|
opcode: operation code. please see OPCODE_XXX.
|
|
|
|
fin: fin flag. if set to 0, create continue fragmentation.
|
|
"""
|
|
if opcode == ABNF.OPCODE_TEXT and isinstance(data, six.text_type):
|
|
data = data.encode("utf-8")
|
|
# mask must be set if send data from client
|
|
return ABNF(fin, 0, 0, 0, opcode, 1, data)
|
|
|
|
def format(self):
|
|
"""
|
|
format this object to string(byte array) to send data to server.
|
|
"""
|
|
if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
|
|
raise ValueError("not 0 or 1")
|
|
if self.opcode not in ABNF.OPCODES:
|
|
raise ValueError("Invalid OPCODE")
|
|
length = len(self.data)
|
|
if length >= ABNF.LENGTH_63:
|
|
raise ValueError("data is too long")
|
|
|
|
frame_header = chr(self.fin << 7
|
|
| self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4
|
|
| self.opcode)
|
|
if length < ABNF.LENGTH_7:
|
|
frame_header += chr(self.mask << 7 | length)
|
|
frame_header = six.b(frame_header)
|
|
elif length < ABNF.LENGTH_16:
|
|
frame_header += chr(self.mask << 7 | 0x7e)
|
|
frame_header = six.b(frame_header)
|
|
frame_header += struct.pack("!H", length)
|
|
else:
|
|
frame_header += chr(self.mask << 7 | 0x7f)
|
|
frame_header = six.b(frame_header)
|
|
frame_header += struct.pack("!Q", length)
|
|
|
|
if not self.mask:
|
|
return frame_header + self.data
|
|
else:
|
|
mask_key = self.get_mask_key(4)
|
|
return frame_header + self._get_masked(mask_key)
|
|
|
|
def _get_masked(self, mask_key):
|
|
s = ABNF.mask(mask_key, self.data)
|
|
|
|
if isinstance(mask_key, six.text_type):
|
|
mask_key = mask_key.encode('utf-8')
|
|
|
|
return mask_key + s
|
|
|
|
@staticmethod
|
|
def mask(mask_key, data):
|
|
"""
|
|
mask or unmask data. Just do xor for each byte
|
|
|
|
mask_key: 4 byte string(byte).
|
|
|
|
data: data to mask/unmask.
|
|
"""
|
|
|
|
if isinstance(mask_key, six.text_type):
|
|
mask_key = six.b(mask_key)
|
|
|
|
if isinstance(data, six.text_type):
|
|
data = six.b(data)
|
|
|
|
_m = array.array("B", mask_key)
|
|
_d = array.array("B", data)
|
|
for i in range(len(_d)):
|
|
_d[i] ^= _m[i % 4]
|
|
|
|
if six.PY3:
|
|
return _d.tobytes()
|
|
else:
|
|
return _d.tostring()
|