""" websocket - WebSocket client library for Python Copyright (C) 2010 Hiroki Ohtani(liris) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ import six import array import struct import os class ABNF(object): """ ABNF frame class. see http://tools.ietf.org/html/rfc5234 and http://tools.ietf.org/html/rfc6455#section-5.2 """ # operation code values. OPCODE_CONT = 0x0 OPCODE_TEXT = 0x1 OPCODE_BINARY = 0x2 OPCODE_CLOSE = 0x8 OPCODE_PING = 0x9 OPCODE_PONG = 0xa # available operation code value tuple OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG) # opcode human readable string OPCODE_MAP = { OPCODE_CONT: "cont", OPCODE_TEXT: "text", OPCODE_BINARY: "binary", OPCODE_CLOSE: "close", OPCODE_PING: "ping", OPCODE_PONG: "pong" } # data length threashold. LENGTH_7 = 0x7e LENGTH_16 = 1 << 16 LENGTH_63 = 1 << 63 def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0, opcode=OPCODE_TEXT, mask=1, data=""): """ Constructor for ABNF. please check RFC for arguments. """ self.fin = fin self.rsv1 = rsv1 self.rsv2 = rsv2 self.rsv3 = rsv3 self.opcode = opcode self.mask = mask self.data = data self.get_mask_key = os.urandom def __str__(self): return "fin=" + str(self.fin) \ + " opcode=" + str(self.opcode) \ + " data=" + str(self.data) @staticmethod def create_frame(data, opcode, fin=1): """ create frame to send text, binary and other data. data: data to send. This is string value(byte array). if opcode is OPCODE_TEXT and this value is uniocde, data value is conveted into unicode string, automatically. opcode: operation code. please see OPCODE_XXX. fin: fin flag. if set to 0, create continue fragmentation. """ if opcode == ABNF.OPCODE_TEXT and isinstance(data, six.text_type): data = data.encode("utf-8") # mask must be set if send data from client return ABNF(fin, 0, 0, 0, opcode, 1, data) def format(self): """ format this object to string(byte array) to send data to server. """ if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): raise ValueError("not 0 or 1") if self.opcode not in ABNF.OPCODES: raise ValueError("Invalid OPCODE") length = len(self.data) if length >= ABNF.LENGTH_63: raise ValueError("data is too long") frame_header = chr(self.fin << 7 | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 | self.opcode) if length < ABNF.LENGTH_7: frame_header += chr(self.mask << 7 | length) frame_header = six.b(frame_header) elif length < ABNF.LENGTH_16: frame_header += chr(self.mask << 7 | 0x7e) frame_header = six.b(frame_header) frame_header += struct.pack("!H", length) else: frame_header += chr(self.mask << 7 | 0x7f) frame_header = six.b(frame_header) frame_header += struct.pack("!Q", length) if not self.mask: return frame_header + self.data else: mask_key = self.get_mask_key(4) return frame_header + self._get_masked(mask_key) def _get_masked(self, mask_key): s = ABNF.mask(mask_key, self.data) if isinstance(mask_key, six.text_type): mask_key = mask_key.encode('utf-8') return mask_key + s @staticmethod def mask(mask_key, data): """ mask or unmask data. Just do xor for each byte mask_key: 4 byte string(byte). data: data to mask/unmask. """ if isinstance(mask_key, six.text_type): mask_key = six.b(mask_key) if isinstance(data, six.text_type): data = six.b(data) _m = array.array("B", mask_key) _d = array.array("B", data) for i in range(len(_d)): _d[i] ^= _m[i % 4] if six.PY3: return _d.tobytes() else: return _d.tostring()