From f65ecadfaaba12d494610c33c0408fd64ed0f737 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 23 Jun 2015 05:33:56 +0900 Subject: [PATCH] eventlet has already supported WebSocket(RFC6455) in stable releases (>= 0.15) Signed-off-by: Satoshi Kobayashi Signed-off-by: FUJITA Tomonori --- ryu/contrib/_eventlet/__init__.py | 0 ryu/contrib/_eventlet/websocket.py | 656 ----------------------------- ryu/lib/hub.py | 2 +- tools/pip-requires | 2 +- 4 files changed, 2 insertions(+), 658 deletions(-) delete mode 100644 ryu/contrib/_eventlet/__init__.py delete mode 100644 ryu/contrib/_eventlet/websocket.py diff --git a/ryu/contrib/_eventlet/__init__.py b/ryu/contrib/_eventlet/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/ryu/contrib/_eventlet/websocket.py b/ryu/contrib/_eventlet/websocket.py deleted file mode 100644 index 4a07c8d8..00000000 --- a/ryu/contrib/_eventlet/websocket.py +++ /dev/null @@ -1,656 +0,0 @@ -import base64 -import codecs -import collections -import errno -from random import Random -import string -import struct -import sys -import time -from socket import error as SocketError - -try: - from hashlib import md5, sha1 -except ImportError: #pragma NO COVER - from md5 import md5 - from sha import sha as sha1 - -import eventlet -from eventlet import semaphore -from eventlet import wsgi -from eventlet.green import socket -from eventlet.support import get_errno - -# Python 2's utf8 decoding is more lenient than we'd like -# In order to pass autobahn's testsuite we need stricter validation -# if available... -for _mod in ('wsaccel.utf8validator', 'autobahn.utf8validator'): - # autobahn has it's own python-based validator. in newest versions - # this prefers to use wsaccel, a cython based implementation, if available. - # wsaccel may also be installed w/out autobahn, or with a earlier version. - try: - utf8validator = __import__(_mod, {}, {}, ['']) - except ImportError: - utf8validator = None - else: - break - -ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE)) - -__all__ = ["WebSocketWSGI", "WebSocket"] -PROTOCOL_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' -VALID_CLOSE_STATUS = (range(1000, 1004) - + range(1007, 1012) - # 3000-3999: reserved for use by libraries, frameworks, - # and applications - + range(3000, 4000) - # 4000-4999: reserved for private use and thus can't - # be registered - + range(4000, 5000)) - - -class BadRequest(Exception): - def __init__(self, status='400 Bad Request', body=None, headers=None): - super(Exception, self).__init__() - self.status = status - self.body = body - self.headers = headers - - -class WebSocketWSGI(object): - """Wraps a websocket handler function in a WSGI application. - - Use it like this:: - - @websocket.WebSocketWSGI - def my_handler(ws): - from_browser = ws.wait() - ws.send("from server") - - The single argument to the function will be an instance of - :class:`WebSocket`. To close the socket, simply return from the - function. Note that the server will log the websocket request at - the time of closure. - """ - def __init__(self, handler): - self.handler = handler - self.protocol_version = None - self.support_legacy_versions = True - self.supported_protocols = [] - self.origin_checker = None - - @classmethod - def configured(cls, - handler=None, - supported_protocols=None, - origin_checker=None, - support_legacy_versions=False): - def decorator(handler): - inst = cls(handler) - inst.support_legacy_versions = support_legacy_versions - inst.origin_checker = origin_checker - if supported_protocols: - inst.supported_protocols = supported_protocols - return inst - if handler is None: - return decorator - return decorator(handler) - - def __call__(self, environ, start_response): - http_connection_parts = [ - part.strip() - for part in environ.get('HTTP_CONNECTION', '').lower().split(',')] - if not ('upgrade' in http_connection_parts and - environ.get('HTTP_UPGRADE', '').lower() == 'websocket'): - # need to check a few more things here for true compliance - start_response('400 Bad Request', [('Connection', 'close')]) - return [] - - try: - if 'HTTP_SEC_WEBSOCKET_VERSION' in environ: - ws = self._handle_hybi_request(environ) - elif self.support_legacy_versions: - ws = self._handle_legacy_request(environ) - else: - raise BadRequest() - except BadRequest as e: - status = e.status - body = e.body or '' - headers = e.headers or [] - start_response(status, - [('Connection', 'close'), ] + headers) - return [body] - - try: - self.handler(ws) - except socket.error as e: - if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: - raise - # Make sure we send the closing frame - ws._send_closing_frame(True) - # use this undocumented feature of eventlet.wsgi to ensure that it - # doesn't barf on the fact that we didn't call start_response - return wsgi.ALREADY_HANDLED - - def _handle_legacy_request(self, environ): - sock = environ['eventlet.input'].get_socket() - - if 'HTTP_SEC_WEBSOCKET_KEY1' in environ: - self.protocol_version = 76 - if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ: - raise BadRequest() - else: - self.protocol_version = 75 - - if self.protocol_version == 76: - key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1']) - key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2']) - # There's no content-length header in the request, but it has 8 - # bytes of data. - environ['wsgi.input'].content_length = 8 - key3 = environ['wsgi.input'].read(8) - key = struct.pack(">II", key1, key2) + key3 - response = md5(key).digest() - - # Start building the response - scheme = 'ws' - if environ.get('wsgi.url_scheme') == 'https': - scheme = 'wss' - location = '%s://%s%s%s' % ( - scheme, - environ.get('HTTP_HOST'), - environ.get('SCRIPT_NAME'), - environ.get('PATH_INFO') - ) - qs = environ.get('QUERY_STRING') - if qs is not None: - location += '?' + qs - if self.protocol_version == 75: - handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - "Upgrade: WebSocket\r\n" - "Connection: Upgrade\r\n" - "WebSocket-Origin: %s\r\n" - "WebSocket-Location: %s\r\n\r\n" % ( - environ.get('HTTP_ORIGIN'), - location)) - elif self.protocol_version == 76: - handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n" - "Upgrade: WebSocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Origin: %s\r\n" - "Sec-WebSocket-Protocol: %s\r\n" - "Sec-WebSocket-Location: %s\r\n" - "\r\n%s" % ( - environ.get('HTTP_ORIGIN'), - environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'), - location, - response)) - else: #pragma NO COVER - raise ValueError("Unknown WebSocket protocol version.") - sock.sendall(handshake_reply) - return WebSocket(sock, environ, self.protocol_version) - - def _handle_hybi_request(self, environ): - sock = environ['eventlet.input'].get_socket() - hybi_version = environ['HTTP_SEC_WEBSOCKET_VERSION'] - if hybi_version not in ('8', '13', ): - raise BadRequest(status='426 Upgrade Required', - headers=[('Sec-WebSocket-Version', '8, 13')]) - self.protocol_version = int(hybi_version) - if 'HTTP_SEC_WEBSOCKET_KEY' not in environ: - # That's bad. - raise BadRequest() - origin = environ.get( - 'HTTP_ORIGIN', - (environ.get('HTTP_SEC_WEBSOCKET_ORIGIN', '') - if self.protocol_version <= 8 else '')) - if self.origin_checker is not None: - if not self.origin_checker(environ.get('HTTP_HOST'), origin): - raise BadRequest(status='403 Forbidden') - protocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', None) - negotiated_protocol = None - if protocols: - for p in (i.strip() for i in protocols.split(',')): - if p in self.supported_protocols: - negotiated_protocol = p - break - #extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS', None) - #if extensions: - # extensions = [i.strip() for i in extensions.split(',')] - - key = environ['HTTP_SEC_WEBSOCKET_KEY'] - response = base64.b64encode(sha1(key + PROTOCOL_GUID).digest()) - handshake_reply = ["HTTP/1.1 101 Switching Protocols", - "Upgrade: websocket", - "Connection: Upgrade", - "Sec-WebSocket-Accept: %s" % (response, )] - if negotiated_protocol: - handshake_reply.append("Sec-WebSocket-Protocol: %s" - % (negotiated_protocol, )) - sock.sendall('\r\n'.join(handshake_reply) + '\r\n\r\n') - return RFC6455WebSocket(sock, environ, self.protocol_version, - protocol=negotiated_protocol) - - def _extract_number(self, value): - """ - Utility function which, given a string like 'g98sd 5[]221@1', will - return 9852211. Used to parse the Sec-WebSocket-Key headers. - """ - out = "" - spaces = 0 - for char in value: - if char in string.digits: - out += char - elif char == " ": - spaces += 1 - return int(out) / spaces - -class WebSocket(object): - """A websocket object that handles the details of - serialization/deserialization to the socket. - - The primary way to interact with a :class:`WebSocket` object is to - call :meth:`send` and :meth:`wait` in order to pass messages back - and forth with the browser. Also available are the following - properties: - - path - The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient. - protocol - The value of the Websocket-Protocol header. - origin - The value of the 'Origin' header. - environ - The full WSGI environment for this request. - - """ - def __init__(self, sock, environ, version=76): - """ - :param socket: The eventlet socket - :type socket: :class:`eventlet.greenio.GreenSocket` - :param environ: The wsgi environment - :param version: The WebSocket spec version to follow (default is 76) - """ - self.socket = sock - self.origin = environ.get('HTTP_ORIGIN') - self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL') - self.path = environ.get('PATH_INFO') - self.environ = environ - self.version = version - self.websocket_closed = False - self._buf = "" - self._msgs = collections.deque() - self._sendlock = semaphore.Semaphore() - - @staticmethod - def _pack_message(message): - """Pack the message inside ``00`` and ``FF`` - - As per the dataframing section (5.3) for the websocket spec - """ - if isinstance(message, unicode): - message = message.encode('utf-8') - elif not isinstance(message, str): - message = str(message) - packed = "\x00%s\xFF" % message - return packed - - def _parse_messages(self): - """ Parses for messages in the buffer *buf*. It is assumed that - the buffer contains the start character for a message, but that it - may contain only part of the rest of the message. - - Returns an array of messages, and the buffer remainder that - didn't contain any full messages.""" - msgs = [] - end_idx = 0 - buf = self._buf - while buf: - frame_type = ord(buf[0]) - if frame_type == 0: - # Normal message. - end_idx = buf.find("\xFF") - if end_idx == -1: #pragma NO COVER - break - msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) - buf = buf[end_idx+1:] - elif frame_type == 255: - # Closing handshake. - assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf - self.websocket_closed = True - break - else: - raise ValueError("Don't understand how to parse this type of message: %r" % buf) - self._buf = buf - return msgs - - def send(self, message): - """Send a message to the browser. - - *message* should be convertable to a string; unicode objects should be - encodable as utf-8. Raises socket.error with errno of 32 - (broken pipe) if the socket has already been closed by the client.""" - packed = self._pack_message(message) - # if two greenthreads are trying to send at the same time - # on the same socket, sendlock prevents interleaving and corruption - self._sendlock.acquire() - try: - self.socket.sendall(packed) - finally: - self._sendlock.release() - - def wait(self): - """Waits for and deserializes messages. - - Returns a single message; the oldest not yet processed. If the client - has already closed the connection, returns None. This is different - from normal socket behavior because the empty string is a valid - websocket message.""" - while not self._msgs: - # Websocket might be closed already. - if self.websocket_closed: - return None - # no parsed messages, must mean buf needs more data - delta = self.socket.recv(8096) - if delta == '': - return None - self._buf += delta - msgs = self._parse_messages() - self._msgs.extend(msgs) - return self._msgs.popleft() - - def _send_closing_frame(self, ignore_send_errors=False): - """Sends the closing frame to the client, if required.""" - if self.version == 76 and not self.websocket_closed: - try: - self.socket.sendall("\xff\x00") - except SocketError: - # Sometimes, like when the remote side cuts off the connection, - # we don't care about this. - if not ignore_send_errors: #pragma NO COVER - raise - self.websocket_closed = True - - def close(self): - """Forcibly close the websocket; generally it is preferable to - return from the handler method.""" - self._send_closing_frame() - self.socket.shutdown(True) - self.socket.close() - - -class ConnectionClosedError(Exception): - pass - - -class FailedConnectionError(Exception): - def __init__(self, status, message): - super(FailedConnectionError, self).__init__(status, message) - self.message = message - self.status = status - - -class ProtocolError(ValueError): - pass - - -class RFC6455WebSocket(WebSocket): - def __init__(self, sock, environ, version=13, protocol=None, client=False): - super(RFC6455WebSocket, self).__init__(sock, environ, version) - self.iterator = self._iter_frames() - self.client = client - self.protocol = protocol - - class UTF8Decoder(object): - def __init__(self): - if utf8validator: - self.validator = utf8validator.Utf8Validator() - else: - self.validator = None - decoderclass = codecs.getincrementaldecoder('utf8') - self.decoder = decoderclass() - - def reset(self): - if self.validator: - self.validator.reset() - self.decoder.reset() - - def decode(self, data, final=False): - if self.validator: - valid, eocp, c_i, t_i = self.validator.validate(data) - if not valid: - raise ValueError('Data is not valid unicode') - return self.decoder.decode(data, final) - - def _get_bytes(self, numbytes): - data = '' - while len(data) < numbytes: - d = self.socket.recv(numbytes - len(data)) - if not d: - raise ConnectionClosedError() - data = data + d - return data - - class Message(object): - def __init__(self, opcode, decoder=None): - self.decoder = decoder - self.data = [] - self.finished = False - self.opcode = opcode - - def push(self, data, final=False): - if self.decoder: - data = self.decoder.decode(data, final=final) - self.finished = final - self.data.append(data) - - def getvalue(self): - return ''.join(self.data) - - @staticmethod - def _apply_mask(data, mask, length=None, offset=0): - if length is None: - length = len(data) - cnt = range(length) - return ''.join(chr(ord(data[i]) ^ mask[(offset + i) % 4]) for i in cnt) - - def _handle_control_frame(self, opcode, data): - if opcode == 8: # connection close - if not data: - status = 1000 - elif len(data) > 1: - status = struct.unpack_from('!H', data)[0] - if not status or status not in VALID_CLOSE_STATUS: - raise FailedConnectionError( - 1002, - "Unexpected close status code.") - try: - data = self.UTF8Decoder().decode(data[2:], True) - except (UnicodeDecodeError, ValueError): - raise FailedConnectionError( - 1002, - "Close message data should be valid UTF-8.") - else: - status = 1002 - self.close(close_data=(status, '')) - raise ConnectionClosedError() - elif opcode == 9: # ping - self.send(data, control_code=0xA) - elif opcode == 0xA: # pong - pass - else: - raise FailedConnectionError( - 1002, "Unknown control frame received.") - - def _iter_frames(self): - fragmented_message = None - try: - while True: - message = self._recv_frame(message=fragmented_message) - if message.opcode & 8: - self._handle_control_frame( - message.opcode, message.getvalue()) - continue - if fragmented_message and message is not fragmented_message: - raise RuntimeError('Unexpected message change.') - fragmented_message = message - if message.finished: - data = fragmented_message.getvalue() - fragmented_message = None - yield data - except FailedConnectionError: - exc_typ, exc_val, exc_tb = sys.exc_info() - self.close(close_data=(exc_val.status, exc_val.message)) - except ConnectionClosedError: - return - except Exception: - self.close(close_data=(1011, 'Internal Server Error')) - raise - - def _recv_frame(self, message=None): - recv = self._get_bytes - header = recv(2) - a, b = struct.unpack('!BB', header) - finished = a >> 7 == 1 - rsv123 = a >> 4 & 7 - if rsv123: - # must be zero - raise FailedConnectionError( - 1002, - "RSV1, RSV2, RSV3: MUST be 0 unless an extension is" - " negotiated that defines meanings for non-zero values.") - opcode = a & 15 - if opcode not in (0, 1, 2, 8, 9, 0xA): - raise FailedConnectionError(1002, "Unknown opcode received.") - masked = b & 128 == 128 - if not masked and not self.client: - raise FailedConnectionError(1002, "A client MUST mask all frames" - " that it sends to the server") - length = b & 127 - if opcode & 8: - if not finished: - raise FailedConnectionError(1002, "Control frames must not" - " be fragmented.") - if length > 125: - raise FailedConnectionError( - 1002, - "All control frames MUST have a payload length of 125" - " bytes or less") - elif opcode and message: - raise FailedConnectionError( - 1002, - "Received a non-continuation opcode within" - " fragmented message.") - elif not opcode and not message: - raise FailedConnectionError( - 1002, - "Received continuation opcode with no previous" - " fragments received.") - if length == 126: - length = struct.unpack('!H', recv(2))[0] - elif length == 127: - length = struct.unpack('!Q', recv(8))[0] - if masked: - mask = struct.unpack('!BBBB', recv(4)) - received = 0 - if not message or opcode & 8: - decoder = self.UTF8Decoder() if opcode == 1 else None - message = self.Message(opcode, decoder=decoder) - if not length: - message.push('', final=finished) - else: - while received < length: - d = self.socket.recv(length - received) - if not d: - raise ConnectionClosedError() - dlen = len(d) - if masked: - d = self._apply_mask(d, mask, length=dlen, offset=received) - received = received + dlen - try: - message.push(d, final=finished) - except (UnicodeDecodeError, ValueError): - raise FailedConnectionError( - 1007, "Text data must be valid utf-8") - return message - - @staticmethod - def _pack_message(message, masked=False, - continuation=False, final=True, control_code=None): - is_text = False - if isinstance(message, unicode): - message = message.encode('utf-8') - is_text = True - length = len(message) - if not length: - # no point masking empty data - masked = False - if control_code: - if control_code not in (8, 9, 0xA): - raise ProtocolError('Unknown control opcode.') - if continuation or not final: - raise ProtocolError('Control frame cannot be a fragment.') - if length > 125: - raise ProtocolError('Control frame data too large (>125).') - header = struct.pack('!B', control_code | 1 << 7) - else: - opcode = 0 if continuation else (1 if is_text else 2) - header = struct.pack('!B', opcode | (1 << 7 if final else 0)) - lengthdata = 1 << 7 if masked else 0 - if length > 65535: - lengthdata = struct.pack('!BQ', lengthdata | 127, length) - elif length > 125: - lengthdata = struct.pack('!BH', lengthdata | 126, length) - else: - lengthdata = struct.pack('!B', lengthdata | length) - if masked: - # NOTE: RFC6455 states: - # A server MUST NOT mask any frames that it sends to the client - rand = Random(time.time()) - mask = map(rand.getrandbits, (8, ) * 4) - message = RFC6455WebSocket._apply_mask(message, mask, length) - maskdata = struct.pack('!BBBB', *mask) - else: - maskdata = '' - return ''.join((header, lengthdata, maskdata, message)) - - def wait(self): - for i in self.iterator: - return i - - def _send(self, frame): - self._sendlock.acquire() - try: - self.socket.sendall(frame) - finally: - self._sendlock.release() - - def send(self, message, **kw): - kw['masked'] = self.client - payload = self._pack_message(message, **kw) - self._send(payload) - - def _send_closing_frame(self, ignore_send_errors=False, close_data=None): - if self.version in (8, 13) and not self.websocket_closed: - if close_data is not None: - status, msg = close_data - if isinstance(msg, unicode): - msg = msg.encode('utf-8') - data = struct.pack('!H', status) + msg - else: - data = '' - try: - self.send(data, control_code=8) - except SocketError: - # Sometimes, like when the remote side cuts off the connection, - # we don't care about this. - if not ignore_send_errors: # pragma NO COVER - raise - self.websocket_closed = True - - def close(self, close_data=None): - """Forcibly close the websocket; generally it is preferable to - return from the handler method.""" - self._send_closing_frame(close_data=close_data) - self.socket.shutdown(socket.SHUT_WR) - self.socket.close() diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 836b9242..56211479 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -32,7 +32,7 @@ if HUB_TYPE == 'eventlet': import eventlet.semaphore import eventlet.timeout import eventlet.wsgi - from ryu.contrib._eventlet import websocket + from eventlet import websocket import greenlet import ssl import socket diff --git a/tools/pip-requires b/tools/pip-requires index 96eba22c..567600f1 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,4 +1,4 @@ -eventlet +eventlet>=0.15 msgpack-python>=0.3.0 # RPC library, BGP speaker(net_cntl) netaddr oslo.config>=1.6.0