diff --git a/autobahn/autobahn/twisted/websocket.py b/autobahn/autobahn/twisted/websocket.py index 4601cb04..8365e4e7 100644 --- a/autobahn/autobahn/twisted/websocket.py +++ b/autobahn/autobahn/twisted/websocket.py @@ -17,39 +17,264 @@ ############################################################################### from __future__ import absolute_import + import twisted.internet.protocol +from twisted.internet.defer import maybeDeferred +from twisted.python import log -from autobahn.websocket2 import protocol +from autobahn import websocket -class WebSocketServerProtocol(twisted.internet.protocol.Protocol, protocol.WebSocketServerProtocol): +class WebSocketServerProtocol(websocket.WebSocketServerProtocol, twisted.internet.protocol.Protocol): + """ + """ def connectionMade(self): - peername = str(self.transport.getPeer()) - print('connection from {}'.format(peername)) + ## the peer we are connected to + peer = self.transport.getPeer() + try: + self.peer = "%s:%d" % (peer.host, peer.port) + except: + ## eg Unix Domain sockets don't have host/port + self.peer = str(peer) - def dataReceived(self, data): - self._onData(data) - #print('data received: {}'.format(data.decode())) - #self.transport.write(data) - #self.transport.loseConnection() + websocket.WebSocketServerProtocol.connectionMade(self) + + ## Set "Nagle" + try: + self.transport.setTcpNoDelay(self.tcpNoDelay) + except: + ## eg Unix Domain sockets throw Errno 22 on this + pass + + + # peername = str(self.transport.getPeer()) + # print('connection from {}'.format(peername)) + + # def dataReceived(self, data): + # self._onData(data) + # #print('data received: {}'.format(data.decode())) + # #self.transport.write(data) + # #self.transport.loseConnection() + + # def connectionLost(self, reason): + # pass + + def _closeConnection(self, abort = False): + if abort: + self.transport.abortConnection() + else: + self.transport.loseConnection() + + + def _run_onConnect(self, connectionRequest): + + ## onConnect() will return the selected subprotocol or None + ## or a pair (protocol, headers) or raise an HttpException + ## + res = maybeDeferred(self.onConnect, connectionRequest) + res.addCallback(self._processHandshake_buildResponse) + res.addErrback(self._processHandshake_failed) + + + def registerProducer(self, producer, streaming): + """ + Register a Twisted producer with this protocol. + + Modes: Hybi, Hixie + + :param producer: A Twisted push or pull producer. + :type producer: object + :param streaming: Producer type. + :type streaming: bool + """ + self.transport.registerProducer(producer, streaming) -class WebSocketServerFactory(twisted.internet.protocol.Factory, protocol.WebSocketServerFactory): - protocol = WebSocketServerProtocol +class WebSocketServerFactory(websocket.WebSocketServerFactory, twisted.internet.protocol.ServerFactory): + """ + """ - def __init__(self, reactor = None): + def __init__(self, *args, **kwargs): + + #twisted.internet.protocol.ServerFactory.__init__(self) + + websocket.WebSocketServerFactory.__init__(self, *args, **kwargs) ## lazy import to avoid reactor install upon module import - if reactor is None: + if kwargs.has_key('reactor'): + self.reactor = kwargs['reactor'] + else: from twisted.internet import reactor - self._reactor = reactor + self.reactor = reactor - def buildProtocol(self, addr): - proto = self.protocol() - proto.factory = self - return proto + def _log(self, msg): + log.msg(msg) + + + def _callLater(self, delay, fun): + return self.reactor.callLater(delay, fun) + + + #protocol = WebSocketServerProtocol + + # def __init__(self, reactor = None): + + # ## lazy import to avoid reactor install upon module import + # if reactor is None: + # from twisted.internet import reactor + # self._reactor = reactor + + + # def buildProtocol(self, addr): + # proto = self.protocol() + # proto.factory = self + # return proto + + +class WebSocketClientProtocol(websocket.WebSocketClientProtocol, twisted.internet.protocol.Protocol): + """ + """ + + def connectionMade(self): + ## the peer we are connected to + peer = self.transport.getPeer() + try: + self.peer = "%s:%d" % (peer.host, peer.port) + except: + ## eg Unix Domain sockets don't have host/port + self.peer = str(peer) + + websocket.WebSocketClientProtocol.connectionMade(self) + + ## Set "Nagle" + try: + self.transport.setTcpNoDelay(self.tcpNoDelay) + except: + ## eg Unix Domain sockets throw Errno 22 on this + pass + + + + def registerProducer(self, producer, streaming): + """ + Register a Twisted producer with this protocol. + + Modes: Hybi, Hixie + + :param producer: A Twisted push or pull producer. + :type producer: object + :param streaming: Producer type. + :type streaming: bool + """ + self.transport.registerProducer(producer, streaming) + + + def _closeConnection(self, abort = False): + if abort: + self.transport.abortConnection() + else: + self.transport.loseConnection() + + + +class WebSocketClientFactory(websocket.WebSocketClientFactory, twisted.internet.protocol.ClientFactory): + """ + """ + + def __init__(self, *args, **kwargs): + + #twisted.internet.protocol.ClientFactory.__init__(self) + + websocket.WebSocketClientFactory.__init__(self, *args, **kwargs) + + ## lazy import to avoid reactor install upon module import + if kwargs.has_key('reactor'): + self.reactor = kwargs['reactor'] + else: + from twisted.internet import reactor + self.reactor = reactor + + + def _log(self, msg): + log.msg(msg) + + + def _callLater(self, delay, fun): + return self.reactor.callLater(delay, fun) + + + +def connectWS(factory, contextFactory = None, timeout = 30, bindAddress = None): + """ + Establish WebSocket connection to a server. The connection parameters like target + host, port, resource and others are provided via the factory. + + :param factory: The WebSocket protocol factory to be used for creating client protocol instances. + :type factory: An :class:`autobahn.websocket.WebSocketClientFactory` instance. + :param contextFactory: SSL context factory, required for secure WebSocket connections ("wss"). + :type contextFactory: A `twisted.internet.ssl.ClientContextFactory `_ instance. + :param timeout: Number of seconds to wait before assuming the connection has failed. + :type timeout: int + :param bindAddress: A (host, port) tuple of local address to bind to, or None. + :type bindAddress: tuple + + :returns: obj -- An object which implements `twisted.interface.IConnector `_. + """ + ## lazy import to avoid reactor install upon module import + if hasattr(factory, 'reactor'): + reactor = factory.reactor + else: + from twisted.internet import reactor + + if factory.proxy is not None: + if factory.isSecure: + raise Exception("WSS over explicit proxies not implemented") + else: + conn = reactor.connectTCP(factory.proxy['host'], factory.proxy['port'], factory, timeout, bindAddress) + else: + if factory.isSecure: + if contextFactory is None: + # create default client SSL context factory when none given + from twisted.internet import ssl + contextFactory = ssl.ClientContextFactory() + conn = reactor.connectSSL(factory.host, factory.port, factory, contextFactory, timeout, bindAddress) + else: + conn = reactor.connectTCP(factory.host, factory.port, factory, timeout, bindAddress) + return conn + + + +def listenWS(factory, contextFactory = None, backlog = 50, interface = ''): + """ + Listen for incoming WebSocket connections from clients. The connection parameters like + listening port and others are provided via the factory. + + :param factory: The WebSocket protocol factory to be used for creating server protocol instances. + :type factory: An :class:`autobahn.websocket.WebSocketServerFactory` instance. + :param contextFactory: SSL context factory, required for secure WebSocket connections ("wss"). + :type contextFactory: A twisted.internet.ssl.ContextFactory. + :param backlog: Size of the listen queue. + :type backlog: int + :param interface: The interface (derived from hostname given) to bind to, defaults to '' (all). + :type interface: str + + :returns: obj -- An object that implements `twisted.interface.IListeningPort `_. + """ + ## lazy import to avoid reactor install upon module import + if hasattr(factory, 'reactor'): + reactor = factory.reactor + else: + from twisted.internet import reactor + + if factory.isSecure: + if contextFactory is None: + raise Exception("Secure WebSocket listen requested, but no SSL context factory given") + listener = reactor.listenSSL(factory.port, factory, contextFactory, backlog, interface) + else: + listener = reactor.listenTCP(factory.port, factory, backlog, interface) + return listener diff --git a/autobahn/autobahn/websocket.py b/autobahn/autobahn/websocket.py index d20680ff..122bcfd2 100644 --- a/autobahn/autobahn/websocket.py +++ b/autobahn/autobahn/websocket.py @@ -16,10 +16,10 @@ ## ############################################################################### +from __future__ import absolute_import + __all__ = ["createWsUrl", "parseWsUrl", - "connectWS", - "listenWS", "HttpException", "ConnectionRequest", @@ -60,20 +60,17 @@ from collections import deque from zope.interface import implementer -from twisted.internet import protocol -from twisted.internet.defer import maybeDeferred -from twisted.python import log +from autobahn._version import __version__ -from _version import __version__ +from autobahn.interfaces import IWebSocketChannel, \ + IWebSocketChannelFrameApi, \ + IWebSocketChannelStreamingApi -from interfaces import IWebSocketChannel, \ - IWebSocketChannelFrameApi, \ - IWebSocketChannelStreamingApi -from utf8validator import Utf8Validator -from xormasker import XorMaskerNull, createXorMasker -from httpstatus import * -from util import Stopwatch -from compress import * +from autobahn.utf8validator import Utf8Validator +from autobahn.xormasker import XorMaskerNull, createXorMasker +from autobahn.httpstatus import * +from autobahn.util import Stopwatch +from autobahn.compress import * def createWsUrl(hostname, port = None, isSecure = False, path = None, params = None): @@ -160,78 +157,6 @@ def parseWsUrl(url): -def connectWS(factory, contextFactory = None, timeout = 30, bindAddress = None): - """ - Establish WebSocket connection to a server. The connection parameters like target - host, port, resource and others are provided via the factory. - - :param factory: The WebSocket protocol factory to be used for creating client protocol instances. - :type factory: An :class:`autobahn.websocket.WebSocketClientFactory` instance. - :param contextFactory: SSL context factory, required for secure WebSocket connections ("wss"). - :type contextFactory: A `twisted.internet.ssl.ClientContextFactory `_ instance. - :param timeout: Number of seconds to wait before assuming the connection has failed. - :type timeout: int - :param bindAddress: A (host, port) tuple of local address to bind to, or None. - :type bindAddress: tuple - - :returns: obj -- An object which implements `twisted.interface.IConnector `_. - """ - ## lazy import to avoid reactor install upon module import - if hasattr(factory, 'reactor'): - reactor = factory.reactor - else: - from twisted.internet import reactor - - if factory.proxy is not None: - if factory.isSecure: - raise Exception("WSS over explicit proxies not implemented") - else: - conn = reactor.connectTCP(factory.proxy['host'], factory.proxy['port'], factory, timeout, bindAddress) - else: - if factory.isSecure: - if contextFactory is None: - # create default client SSL context factory when none given - from twisted.internet import ssl - contextFactory = ssl.ClientContextFactory() - conn = reactor.connectSSL(factory.host, factory.port, factory, contextFactory, timeout, bindAddress) - else: - conn = reactor.connectTCP(factory.host, factory.port, factory, timeout, bindAddress) - return conn - - - -def listenWS(factory, contextFactory = None, backlog = 50, interface = ''): - """ - Listen for incoming WebSocket connections from clients. The connection parameters like - listening port and others are provided via the factory. - - :param factory: The WebSocket protocol factory to be used for creating server protocol instances. - :type factory: An :class:`autobahn.websocket.WebSocketServerFactory` instance. - :param contextFactory: SSL context factory, required for secure WebSocket connections ("wss"). - :type contextFactory: A twisted.internet.ssl.ContextFactory. - :param backlog: Size of the listen queue. - :type backlog: int - :param interface: The interface (derived from hostname given) to bind to, defaults to '' (all). - :type interface: str - - :returns: obj -- An object that implements `twisted.interface.IListeningPort `_. - """ - ## lazy import to avoid reactor install upon module import - if hasattr(factory, 'reactor'): - reactor = factory.reactor - else: - from twisted.internet import reactor - - if factory.isSecure: - if contextFactory is None: - raise Exception("Secure WebSocket listen requested, but no SSL context factory given") - listener = reactor.listenSSL(factory.port, factory, contextFactory, backlog, interface) - else: - listener = reactor.listenTCP(factory.port, factory, backlog, interface) - return listener - - - class TrafficStats: def __init__(self): @@ -366,14 +291,12 @@ class ConnectionRequest: provided in :meth:`autobahn.websocket.WebSocketServerProtocol.onConnect` when a WebSocket client establishes a connection to a WebSocket server. """ - def __init__(self, peer, peerstr, headers, host, path, params, version, origin, protocols, extensions): + def __init__(self, peer, headers, host, path, params, version, origin, protocols, extensions): """ Constructor. - :param peer: IP address/port of the connecting client. - :type peer: object - :param peerstr: IP address/port of the connecting client as string. - :type peerstr: str + :param peer: Descriptor of the connecting client (eg IP address/port in case of TCP transports). + :type peer: str :param headers: HTTP headers from opening handshake request. :type headers: dict :param host: Host from opening handshake HTTP header. @@ -392,7 +315,6 @@ class ConnectionRequest: :type extensions: array of strings """ self.peer = peer - self.peerstr = peerstr self.headers = headers self.host = host self.path = path @@ -405,7 +327,6 @@ class ConnectionRequest: def __json__(self): return {'peer': self.peer, - 'peerstr': self.peerstr, 'headers': self.headers, 'host': self.host, 'path': self.path, @@ -427,14 +348,12 @@ class ConnectionResponse(): provided in :meth:`autobahn.websocket.WebSocketClientProtocol.onConnect` when a WebSocket client has established a connection to a WebSocket server. """ - def __init__(self, peer, peerstr, headers, version, protocol, extensions): + def __init__(self, peer, headers, version, protocol, extensions): """ Constructor. - :param peer: IP address/port of the connected server. - :type peer: object - :param peerstr: IP address/port of the connected server as string. - :type peerstr: str + :param peer: Descriptor of the connected server (e.g. IP address/port in case of TCP transport). + :type peer: str :param headers: HTTP headers from opening handshake response. :type headers: dict :param version: The WebSocket protocol version that is spoken. @@ -445,7 +364,6 @@ class ConnectionResponse(): :type extensions: array of strings """ self.peer = peer - self.peerstr = peerstr self.headers = headers self.version = version self.protocol = protocol @@ -454,7 +372,6 @@ class ConnectionResponse(): def __json__(self): return {'peer': self.peer, - 'peerstr': self.peerstr, 'headers': self.headers, 'version': self.version, 'protocol': self.protocol, @@ -571,7 +488,7 @@ class Timings: @implementer(IWebSocketChannel) @implementer(IWebSocketChannelFrameApi) @implementer(IWebSocketChannelStreamingApi) -class WebSocketProtocol(protocol.Protocol): +class WebSocketProtocol: """ A Twisted Protocol class for WebSocket. This class is used by both WebSocket client and server protocol version. It is unusable standalone, for example @@ -757,7 +674,7 @@ class WebSocketProtocol(protocol.Protocol): Modes: Hybi, Hixie """ if self.debugCodePaths: - log.msg("WebSocketProtocol.onOpen") + self.factory._log("WebSocketProtocol.onOpen") def onMessageBegin(self, opcode): @@ -884,7 +801,7 @@ class WebSocketProtocol(protocol.Protocol): :type binary: bool """ if self.debug: - log.msg("WebSocketProtocol.onMessage") + self.factory._log("WebSocketProtocol.onMessage") def onPing(self, payload): @@ -898,7 +815,7 @@ class WebSocketProtocol(protocol.Protocol): :type payload: str """ if self.debug: - log.msg("WebSocketProtocol.onPing") + self.factory._log("WebSocketProtocol.onPing") if self.state == WebSocketProtocol.STATE_OPEN: self.sendPong(payload) @@ -913,7 +830,7 @@ class WebSocketProtocol(protocol.Protocol): :param payload: Payload of Pong, when there was any. Can be arbitrary, up to 125 octets. """ if self.debug: - log.msg("WebSocketProtocol.onPong") + self.factory._log("WebSocketProtocol.onPong") def onClose(self, wasClean, code, reason): @@ -943,7 +860,7 @@ class WebSocketProtocol(protocol.Protocol): s += "self.localCloseReason=%s\n" % self.localCloseReason s += "self.remoteCloseCode=%s\n" % self.remoteCloseCode s += "self.remoteCloseReason=%s\n" % self.remoteCloseReason - log.msg(s) + self.factory._log(s) def onCloseFrame(self, code, reasonRaw): @@ -965,7 +882,7 @@ class WebSocketProtocol(protocol.Protocol): :type reason: str """ if self.debugCodePaths: - log.msg("WebSocketProtocol.onCloseFrame") + self.factory._log("WebSocketProtocol.onCloseFrame") self.remoteCloseCode = code self.remoteCloseReason = reasonRaw @@ -995,7 +912,7 @@ class WebSocketProtocol(protocol.Protocol): ## if self.closeHandshakeTimeoutCall is not None: if self.debugCodePaths: - log.msg("closeHandshakeTimeoutCall.cancel") + self.factory._log("closeHandshakeTimeoutCall.cancel") self.closeHandshakeTimeoutCall.cancel() self.closeHandshakeTimeoutCall = None @@ -1008,7 +925,7 @@ class WebSocketProtocol(protocol.Protocol): ## When we are a client, the server should drop the TCP ## If that doesn't happen, we do. And that will set wasClean = False. if self.serverConnectionDropTimeout > 0: - self.serverConnectionDropTimeoutCall = self.factory.reactor.callLater(self.serverConnectionDropTimeout, self.onServerConnectionDropTimeout) + self.serverConnectionDropTimeoutCall = self.factory._callLater(self.serverConnectionDropTimeout, self.onServerConnectionDropTimeout) elif self.state == WebSocketProtocol.STATE_OPEN: ## The peer initiates a closing handshake, so we reply @@ -1050,14 +967,14 @@ class WebSocketProtocol(protocol.Protocol): self.serverConnectionDropTimeoutCall = None if self.state != WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("onServerConnectionDropTimeout") + self.factory._log("onServerConnectionDropTimeout") self.wasClean = False self.wasNotCleanReason = "server did not drop TCP connection (in time)" self.wasServerConnectionDropTimeout = True self.dropConnection(abort = True) else: if self.debugCodePaths: - log.msg("skipping onServerConnectionDropTimeout since connection is already closed") + self.factory._log("skipping onServerConnectionDropTimeout since connection is already closed") def onOpenHandshakeTimeout(self): @@ -1071,20 +988,20 @@ class WebSocketProtocol(protocol.Protocol): self.openHandshakeTimeoutCall = None if self.state in [WebSocketProtocol.STATE_CONNECTING, WebSocketProtocol.STATE_PROXY_CONNECTING]: if self.debugCodePaths: - log.msg("onOpenHandshakeTimeout fired") + self.factory._log("onOpenHandshakeTimeout fired") self.wasClean = False self.wasNotCleanReason = "peer did not finish (in time) the opening handshake" self.wasOpenHandshakeTimeout = True self.dropConnection(abort = True) elif self.state == WebSocketProtocol.STATE_OPEN: if self.debugCodePaths: - log.msg("skipping onOpenHandshakeTimeout since WebSocket connection is open (opening handshake already finished)") + self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is open (opening handshake already finished)") elif self.state == WebSocketProtocol.STATE_CLOSING: if self.debugCodePaths: - log.msg("skipping onOpenHandshakeTimeout since WebSocket connection is closing") + self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is closing") elif self.state == WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("skipping onOpenHandshakeTimeout since WebSocket connection already closed") + self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection already closed") else: # should not arrive here raise Exception("logic error") @@ -1101,14 +1018,14 @@ class WebSocketProtocol(protocol.Protocol): self.closeHandshakeTimeoutCall = None if self.state != WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("onCloseHandshakeTimeout fired") + self.factory._log("onCloseHandshakeTimeout fired") self.wasClean = False self.wasNotCleanReason = "peer did not respond (in time) in closing handshake" self.wasCloseHandshakeTimeout = True self.dropConnection(abort = True) else: if self.debugCodePaths: - log.msg("skipping onCloseHandshakeTimeout since connection is already closed") + self.factory._log("skipping onCloseHandshakeTimeout since connection is already closed") def dropConnection(self, abort = False): @@ -1122,17 +1039,14 @@ class WebSocketProtocol(protocol.Protocol): """ if self.state != WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("dropping connection") + self.factory._log("dropping connection") self.droppedByMe = True self.state = WebSocketProtocol.STATE_CLOSED - if abort: - self.transport.abortConnection() - else: - self.transport.loseConnection() + self._closeConnection(abort) else: if self.debugCodePaths: - log.msg("skipping dropConnection since connection is already closed") + self.factory._log("skipping dropConnection since connection is already closed") def failConnection(self, code = CLOSE_STATUS_CODE_GOING_AWAY, reason = "Going Away"): @@ -1146,7 +1060,7 @@ class WebSocketProtocol(protocol.Protocol): """ if self.state != WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("Failing connection : %s - %s" % (code, reason)) + self.factory._log("Failing connection : %s - %s" % (code, reason)) self.failedByMe = True if self.failByDrop: ## brutally drop the TCP connection @@ -1159,10 +1073,10 @@ class WebSocketProtocol(protocol.Protocol): self.sendCloseFrame(code = code, reasonUtf8 = reason.encode("UTF-8"), isReply = False) else: if self.debugCodePaths: - log.msg("skipping failConnection since connection is already closing") + self.factory._log("skipping failConnection since connection is already closing") else: if self.debugCodePaths: - log.msg("skipping failConnection since connection is already closed") + self.factory._log("skipping failConnection since connection is already closed") def protocolViolation(self, reason): @@ -1180,7 +1094,7 @@ class WebSocketProtocol(protocol.Protocol): :returns: bool -- True, when any further processing should be discontinued. """ if self.debugCodePaths: - log.msg("Protocol violation : %s" % reason) + self.factory._log("Protocol violation : %s" % reason) self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_PROTOCOL_ERROR, reason) if self.failByDrop: return True @@ -1207,7 +1121,7 @@ class WebSocketProtocol(protocol.Protocol): :returns: bool -- True, when any further processing should be discontinued. """ if self.debugCodePaths: - log.msg("Invalid payload : %s" % reason) + self.factory._log("Invalid payload : %s" % reason) self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_INVALID_PAYLOAD, reason) if self.failByDrop: return True @@ -1254,8 +1168,8 @@ class WebSocketProtocol(protocol.Protocol): configAttrLog.append((configAttr, getattr(self, configAttr), configAttrSource)) if self.debug: - #log.msg(configAttrLog) - log.msg("\n" + pformat(configAttrLog)) + #self.factory._log(configAttrLog) + self.factory._log("\n" + pformat(configAttrLog)) ## permessage-compress extension self._perMessageCompress = None @@ -1267,21 +1181,6 @@ class WebSocketProtocol(protocol.Protocol): ## Traffic stats self.trafficStats = TrafficStats() - ## Set "Nagle" - try: - self.transport.setTcpNoDelay(self.tcpNoDelay) - except: - ## eg Unix Domain sockets throw Errno 22 on this - pass - - ## the peer we are connected to - self.peer = self.transport.getPeer() - try: - self.peerstr = "%s:%d" % (self.peer.host, self.peer.port) - except: - ## eg Unix Domain sockets don't have host/port - self.peerstr = str(self.peer) - ## initial state if not self.factory.isServer and self.factory.proxy is not None: self.state = WebSocketProtocol.STATE_PROXY_CONNECTING @@ -1355,7 +1254,7 @@ class WebSocketProtocol(protocol.Protocol): # set opening handshake timeout handler if self.openHandshakeTimeout > 0: - self.openHandshakeTimeoutCall = self.factory.reactor.callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout) + self.openHandshakeTimeoutCall = self.factory._callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout) def connectionLost(self, reason): @@ -1368,7 +1267,7 @@ class WebSocketProtocol(protocol.Protocol): ## if not self.factory.isServer and self.serverConnectionDropTimeoutCall is not None: if self.debugCodePaths: - log.msg("serverConnectionDropTimeoutCall.cancel") + self.factory._log("serverConnectionDropTimeoutCall.cancel") self.serverConnectionDropTimeoutCall.cancel() self.serverConnectionDropTimeoutCall = None @@ -1387,7 +1286,7 @@ class WebSocketProtocol(protocol.Protocol): Modes: Hybi, Hixie """ - log.msg("RX Octets from %s : octets = %s" % (self.peerstr, binascii.b2a_hex(data))) + self.factory._log("RX Octets from %s : octets = %s" % (self.peer, binascii.b2a_hex(data))) def logTxOctets(self, data, sync): @@ -1396,7 +1295,7 @@ class WebSocketProtocol(protocol.Protocol): Modes: Hybi, Hixie """ - log.msg("TX Octets to %s : sync = %s, octets = %s" % (self.peerstr, sync, binascii.b2a_hex(data))) + self.factory._log("TX Octets to %s : sync = %s, octets = %s" % (self.peer, sync, binascii.b2a_hex(data))) def logRxFrame(self, frameHeader, payload): @@ -1406,7 +1305,7 @@ class WebSocketProtocol(protocol.Protocol): Modes: Hybi """ data = ''.join(payload) - info = (self.peerstr, + info = (self.peer, frameHeader.fin, frameHeader.rsv, frameHeader.opcode, @@ -1414,7 +1313,7 @@ class WebSocketProtocol(protocol.Protocol): frameHeader.length, data if frameHeader.opcode == 1 else binascii.b2a_hex(data)) - log.msg("RX Frame from %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, payload = %s" % info) + self.factory._log("RX Frame from %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, payload = %s" % info) def logTxFrame(self, frameHeader, payload, repeatLength, chopsize, sync): @@ -1423,7 +1322,7 @@ class WebSocketProtocol(protocol.Protocol): Modes: Hybi """ - info = (self.peerstr, + info = (self.peer, frameHeader.fin, frameHeader.rsv, frameHeader.opcode, @@ -1434,7 +1333,7 @@ class WebSocketProtocol(protocol.Protocol): sync, payload if frameHeader.opcode == 1 else binascii.b2a_hex(payload)) - log.msg("TX Frame to %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, repeat_length = %s, chopsize = %s, sync = %s, payload = %s" % info) + self.factory._log("TX Frame to %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, repeat_length = %s, chopsize = %s, sync = %s, payload = %s" % info) def dataReceived(self, data): @@ -1493,7 +1392,7 @@ class WebSocketProtocol(protocol.Protocol): ## ignore any data received after WS was closed ## if self.debugCodePaths: - log.msg("received data in STATE_CLOSED") + self.factory._log("received data in STATE_CLOSED") ## should not arrive here (invalid state) ## @@ -1519,20 +1418,6 @@ class WebSocketProtocol(protocol.Protocol): raise Exception("must implement handshake (client or server) in derived class") - def registerProducer(self, producer, streaming): - """ - Register a Twisted producer with this protocol. - - Modes: Hybi, Hixie - - :param producer: A Twisted push or pull producer. - :type producer: object - :param streaming: Producer type. - :type streaming: bool - """ - self.transport.registerProducer(producer, streaming) - - def _trigger(self): """ Trigger sending stuff from send queue (which is only used for chopped/synched writes). @@ -1567,13 +1452,13 @@ class WebSocketProtocol(protocol.Protocol): self.logTxOctets(e[0], e[1]) else: if self.debugCodePaths: - log.msg("skipped delayed write, since connection is closed") + self.factory._log("skipped delayed write, since connection is closed") # we need to reenter the reactor to make the latter # reenter the OS network stack, so that octets # can get on the wire. Note: this is a "heuristic", # since there is no (easy) way to really force out # octets from the OS network stack to wire. - self.factory.reactor.callLater(WebSocketProtocol._QUEUED_WRITE_DELAY, self._send) + self.factory._callLater(WebSocketProtocol._QUEUED_WRITE_DELAY, self._send) else: self.triggered = False @@ -2017,7 +1902,7 @@ class WebSocketProtocol(protocol.Protocol): if self._isMessageCompressed: compressedLen = len(payload) if self.debug: - log.msg("RX compressed [%d]: %s" % (compressedLen, binascii.b2a_hex(payload))) + self.factory._log("RX compressed [%d]: %s" % (compressedLen, binascii.b2a_hex(payload))) payload = self._perMessageCompress.decompressMessageData(payload) uncompressedLen = len(payload) @@ -2074,7 +1959,7 @@ class WebSocketProtocol(protocol.Protocol): return False if self.debug: - log.msg("Traffic statistics:\n" + str(self.trafficStats)) + self.factory._log("Traffic statistics:\n" + str(self.trafficStats)) if self.state == WebSocketProtocol.STATE_OPEN: self.trafficStats.incomingWebSocketMessages += 1 @@ -2283,11 +2168,11 @@ class WebSocketProtocol(protocol.Protocol): """ if self.state == WebSocketProtocol.STATE_CLOSING: if self.debugCodePaths: - log.msg("ignoring sendCloseFrame since connection is closing") + self.factory._log("ignoring sendCloseFrame since connection is closing") elif self.state == WebSocketProtocol.STATE_CLOSED: if self.debugCodePaths: - log.msg("ignoring sendCloseFrame since connection already closed") + self.factory._log("ignoring sendCloseFrame since connection already closed") elif self.state in [WebSocketProtocol.STATE_PROXY_CONNECTING, WebSocketProtocol.STATE_CONNECTING]: raise Exception("cannot close a connection not yet connected") @@ -2315,7 +2200,7 @@ class WebSocketProtocol(protocol.Protocol): ## drop connection when timeout on receiving close handshake reply if self.closedByMe and self.closeHandshakeTimeout > 0: - self.closeHandshakeTimeoutCall = self.factory.reactor.callLater(self.closeHandshakeTimeout, self.onCloseHandshakeTimeout) + self.closeHandshakeTimeoutCall = self.factory._callLater(self.closeHandshakeTimeout, self.onCloseHandshakeTimeout) else: raise Exception("logic error") @@ -2742,7 +2627,7 @@ class WebSocketProtocol(protocol.Protocol): i += pfs if self.debug: - log.msg("Traffic statistics:\n" + str(self.trafficStats)) + self.factory._log("Traffic statistics:\n" + str(self.trafficStats)) def _parseExtensionsHeader(self, header, removeQuotes = True): @@ -2957,7 +2842,7 @@ class WebSocketServerProtocol(WebSocketProtocol): WebSocketProtocol.connectionMade(self) self.factory.countConnections += 1 if self.debug: - log.msg("connection accepted from peer %s" % self.peerstr) + self.factory._log("connection accepted from peer %s" % self.peer) def connectionLost(self, reason): @@ -2970,7 +2855,7 @@ class WebSocketServerProtocol(WebSocketProtocol): WebSocketProtocol.connectionLost(self, reason) self.factory.countConnections -= 1 if self.debug: - log.msg("connection from %s lost" % self.peerstr) + self.factory._log("connection from %s lost" % self.peer) def processProxyConnect(self): @@ -2995,7 +2880,7 @@ class WebSocketServerProtocol(WebSocketProtocol): self.http_request_data = self.data[:end_of_header + 4] if self.debug: - log.msg("received HTTP request:\n\n%s\n\n" % self.http_request_data) + self.factory._log("received HTTP request:\n\n%s\n\n" % self.http_request_data) ## extract HTTP status line and headers ## @@ -3004,8 +2889,8 @@ class WebSocketServerProtocol(WebSocketProtocol): ## validate WebSocket opening handshake client request ## if self.debug: - log.msg("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) - log.msg("received HTTP headers in opening handshake : %s" % str(self.http_headers)) + self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) + self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers)) ## HTTP Request line : METHOD, VERSION ## @@ -3087,15 +2972,15 @@ class WebSocketServerProtocol(WebSocketProtocol): if self.http_request_params.has_key('after') and len(self.http_request_params['after']) > 0: after = int(self.http_request_params['after'][0]) if self.debugCodePaths: - log.msg("HTTP Upgrade header missing : render server status page and meta-refresh-redirecting to %s after %d seconds" % (url, after)) + self.factory._log("HTTP Upgrade header missing : render server status page and meta-refresh-redirecting to %s after %d seconds" % (url, after)) self.sendServerStatus(url, after) else: if self.debugCodePaths: - log.msg("HTTP Upgrade header missing : 303-redirecting to %s" % url) + self.factory._log("HTTP Upgrade header missing : 303-redirecting to %s" % url) self.sendRedirect(url) else: if self.debugCodePaths: - log.msg("HTTP Upgrade header missing : render server status page") + self.factory._log("HTTP Upgrade header missing : render server status page") self.sendServerStatus() self.dropConnection(abort = False) return @@ -3125,14 +3010,14 @@ class WebSocketServerProtocol(WebSocketProtocol): ## if not self.http_headers.has_key("sec-websocket-version"): if self.debugCodePaths: - log.msg("Hixie76 protocol detected") + self.factory._log("Hixie76 protocol detected") if self.allowHixie76: version = 0 else: return self.failHandshake("WebSocket connection denied - Hixie76 protocol mode disabled.") else: if self.debugCodePaths: - log.msg("Hybi protocol detected") + self.factory._log("Hybi protocol detected") if http_headers_cnt["sec-websocket-version"] > 1: return self.failHandshake("HTTP Sec-WebSocket-Version header appears more than once in opening handshake request") try: @@ -3240,7 +3125,7 @@ class WebSocketServerProtocol(WebSocketProtocol): else: key3 = self.data[end_of_header + 4:end_of_header + 4 + 8] if self.debug: - log.msg("received HTTP request body containing key3 for Hixie-76: %s" % key3) + self.factory._log("received HTTP request body containing key3 for Hixie-76: %s" % key3) ## Ok, got complete HS input, remember rest (if any) ## @@ -3249,6 +3134,12 @@ class WebSocketServerProtocol(WebSocketProtocol): else: self.data = self.data[end_of_header + 4:] + ## store WS key + ## + if self.websocket_version == 0: + self._wskey = (key1, key2, key3) + else: + self._wskey = key ## WebSocket handshake validated => produce opening handshake response @@ -3257,7 +3148,6 @@ class WebSocketServerProtocol(WebSocketProtocol): ## may return a protocol from the protocols provided by client or None. ## self.connectionRequest = ConnectionRequest(self.peer, - self.peerstr, self.http_headers, self.http_request_host, self.http_request_path, @@ -3267,19 +3157,10 @@ class WebSocketServerProtocol(WebSocketProtocol): self.websocket_protocols, self.websocket_extensions) - ## onConnect() will return the selected subprotocol or None - ## or a pair (protocol, headers) or raise an HttpException - ## - protocol = maybeDeferred(self.onConnect, self.connectionRequest) - - if self.websocket_version == 0: - key = key1, key2, key3 - - protocol.addCallback(self._processHandshake_buildResponse, key) - protocol.addErrback(self._processHandshake_failed) + self._run_onConnect(self.connectionRequest) - def _processHandshake_buildResponse(self, res, key): + def _processHandshake_buildResponse(self, res): """ Callback for Deferred returned by self.onConnect. Generates the response for the handshake. """ @@ -3299,7 +3180,9 @@ class WebSocketServerProtocol(WebSocketProtocol): self.websocket_protocol_in_use = protocol if self.websocket_version == 0: - key1, key2, key3 = key + key1, key2, key3 = self._wskey + else: + key = self._wskey ## extensions effectively in use for this connection @@ -3317,7 +3200,7 @@ class WebSocketServerProtocol(WebSocketProtocol): for (extension, params) in self.websocket_extensions: if self.debug: - log.msg("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) + self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) ## process permessage-compress extension ## @@ -3333,7 +3216,7 @@ class WebSocketServerProtocol(WebSocketProtocol): else: if self.debug: - log.msg("client requested '%s' extension we don't support or which is not activated" % extension) + self.factory._log("client requested '%s' extension we don't support or which is not activated" % extension) ## handle permessage-compress offers by the client ## @@ -3346,7 +3229,7 @@ class WebSocketServerProtocol(WebSocketProtocol): extensionResponse.append(accept.getExtensionString()) else: if self.debug: - log.msg("client request permessage-compress extension, but we did not accept any offer [%s]" % pmceOffers) + self.factory._log("client request permessage-compress extension, but we did not accept any offer [%s]" % pmceOffers) ## build response to complete WebSocket handshake @@ -3378,15 +3261,15 @@ class WebSocketServerProtocol(WebSocketProtocol): response += "Sec-WebSocket-Origin: %s\x0d\x0a" % str(self.websocket_origin) if self.debugCodePaths: - log.msg('factory isSecure = %s port = %s' % (self.factory.isSecure, self.factory.externalPort)) + self.factory._log('factory isSecure = %s port = %s' % (self.factory.isSecure, self.factory.externalPort)) if (self.factory.isSecure and self.factory.externalPort != 443) or ((not self.factory.isSecure) and self.factory.externalPort != 80): if self.debugCodePaths: - log.msg('factory running on non-default port') + self.factory._log('factory running on non-default port') response_port = ':' + str(self.factory.externalPort) else: if self.debugCodePaths: - log.msg('factory running on default port') + self.factory._log('factory running on default port') response_port = '' ## FIXME: check this! But see below .. @@ -3429,7 +3312,7 @@ class WebSocketServerProtocol(WebSocketProtocol): response_body = '' if self.debug: - log.msg("sending HTTP response:\n\n%s%s\n\n" % (response, binascii.b2a_hex(response_body))) + self.factory._log("sending HTTP response:\n\n%s%s\n\n" % (response, binascii.b2a_hex(response_body))) ## save and send out opening HS data ## @@ -3444,7 +3327,7 @@ class WebSocketServerProtocol(WebSocketProtocol): ## if self.openHandshakeTimeoutCall is not None: if self.debugCodePaths: - log.msg("openHandshakeTimeoutCall.cancel") + self.factory._log("openHandshakeTimeoutCall.cancel") self.openHandshakeTimeoutCall.cancel() self.openHandshakeTimeoutCall = None @@ -3475,7 +3358,7 @@ class WebSocketServerProtocol(WebSocketProtocol): if failure.check(HttpException): return self.failHandshake(e.reason, e.code) else: - log.msg("Exception raised in onConnect() - %s" % str(e)) + self.factory._log("Exception raised in onConnect() - %s" % str(e)) return self.failHandshake("Internal Server Error", HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR[0]) @@ -3485,7 +3368,7 @@ class WebSocketServerProtocol(WebSocketProtocol): error response and then drop the connection. """ if self.debug: - log.msg("failing WebSocket opening handshake ('%s')" % reason) + self.factory._log("failing WebSocket opening handshake ('%s')" % reason) self.sendHttpErrorResponse(code, reason, responseHeaders) self.dropConnection(abort = False) @@ -3571,7 +3454,7 @@ class WebSocketServerProtocol(WebSocketProtocol): -class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory): +class WebSocketServerFactory(WebSocketFactory): """ A Twisted factory for WebSocket server protocols. """ @@ -3594,8 +3477,7 @@ class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory): headers = {}, externalPort = None, debug = False, - debugCodePaths = False, - reactor = None): + debugCodePaths = False): """ Create instance of WebSocket server factory. @@ -3618,11 +3500,6 @@ class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory): :param debugCodePaths: Debug code paths mode (default: False). :type debugCodePaths: bool """ - ## lazy import to avoid reactor install upon module import - if reactor is None: - from twisted.internet import reactor - self.reactor = reactor - self.debug = debug self.debugCodePaths = debugCodePaths @@ -3877,7 +3754,7 @@ class WebSocketClientProtocol(WebSocketProtocol): """ WebSocketProtocol.connectionMade(self) if self.debug: - log.msg("connection to %s established" % self.peerstr) + self.factory._log("connection to %s established" % self.peer) if not self.factory.isServer and self.factory.proxy is not None: ## start by doing a HTTP/CONNECT for explicit proxies @@ -3896,7 +3773,7 @@ class WebSocketClientProtocol(WebSocketProtocol): """ WebSocketProtocol.connectionLost(self, reason) if self.debug: - log.msg("connection to %s lost" % self.peerstr) + self.factory._log("connection to %s lost" % self.peer) def startProxyConnect(self): @@ -3911,7 +3788,7 @@ class WebSocketClientProtocol(WebSocketProtocol): request += "\x0d\x0a" if self.debug: - log.msg(request) + self.factory._log(request) self.sendData(request) @@ -3927,7 +3804,7 @@ class WebSocketClientProtocol(WebSocketProtocol): http_response_data = self.data[:end_of_header + 4] if self.debug: - log.msg("received HTTP response:\n\n%s\n\n" % http_response_data) + self.factory._log("received HTTP response:\n\n%s\n\n" % http_response_data) ## extract HTTP status line and headers ## @@ -3936,8 +3813,8 @@ class WebSocketClientProtocol(WebSocketProtocol): ## validate proxy connect response ## if self.debug: - log.msg("received HTTP status line for proxy connect request : %s" % str(http_status_line)) - log.msg("received HTTP headers for proxy connect request : %s" % str(http_headers)) + self.factory._log("received HTTP status line for proxy connect request : %s" % str(http_status_line)) + self.factory._log("received HTTP headers for proxy connect request : %s" % str(http_headers)) ## Response Line ## @@ -3994,7 +3871,7 @@ class WebSocketClientProtocol(WebSocketProtocol): connection. """ if self.debug: - log.msg("failing proxy connect ('%s')" % reason) + self.factory._log("failing proxy connect ('%s')" % reason) self.dropConnection(abort = True) @@ -4112,7 +3989,7 @@ class WebSocketClientProtocol(WebSocketProtocol): self.http_request_data = request if self.debug: - log.msg(self.http_request_data) + self.factory._log(self.http_request_data) self.sendData(self.http_request_data) @@ -4128,7 +4005,7 @@ class WebSocketClientProtocol(WebSocketProtocol): self.http_response_data = self.data[:end_of_header + 4] if self.debug: - log.msg("received HTTP response:\n\n%s\n\n" % self.http_response_data) + self.factory._log("received HTTP response:\n\n%s\n\n" % self.http_response_data) ## extract HTTP status line and headers ## @@ -4137,8 +4014,8 @@ class WebSocketClientProtocol(WebSocketProtocol): ## validate WebSocket opening handshake server response ## if self.debug: - log.msg("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) - log.msg("received HTTP headers in opening handshake : %s" % str(self.http_headers)) + self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) + self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers)) ## Response Line ## @@ -4229,7 +4106,7 @@ class WebSocketClientProtocol(WebSocketProtocol): for (extension, params) in websocket_extensions: if self.debug: - log.msg("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) + self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) ## process permessage-compress extension ## @@ -4304,7 +4181,6 @@ class WebSocketClientProtocol(WebSocketProtocol): ## by server try: connectionResponse = ConnectionResponse(self.peer, - self.peerstr, self.http_headers, None, # FIXME self.websocket_protocol_in_use, @@ -4335,12 +4211,12 @@ class WebSocketClientProtocol(WebSocketProtocol): connection. """ if self.debug: - log.msg("failing WebSocket opening handshake ('%s')" % reason) + self.factory._log("failing WebSocket opening handshake ('%s')" % reason) self.dropConnection(abort = True) -class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory): +class WebSocketClientFactory(WebSocketFactory): """ A Twisted factory for WebSocket client protocols. """ @@ -4364,8 +4240,7 @@ class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory): headers = {}, proxy = None, debug = False, - debugCodePaths = False, - reactor = None): + debugCodePaths = False): """ Create instance of WebSocket client factory. @@ -4390,11 +4265,6 @@ class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory): :param debugCodePaths: Debug code paths mode (default: False). :type debugCodePaths: bool """ - ## lazy import to avoid reactor install upon module import - if reactor is None: - from twisted.internet import reactor - self.reactor = reactor - self.debug = debug self.debugCodePaths = debugCodePaths diff --git a/examples/websocket/echo/Makefile b/examples/websocket/echo/Makefile new file mode 100644 index 00000000..045067f5 --- /dev/null +++ b/examples/websocket/echo/Makefile @@ -0,0 +1,2 @@ +client: + PYTHONPATH=../../../autobahn python client.py ws://127.0.0.1:9000 debug diff --git a/examples/websocket/echo/client.py b/examples/websocket/echo/client.py index 18f1f263..590721ea 100644 --- a/examples/websocket/echo/client.py +++ b/examples/websocket/echo/client.py @@ -21,9 +21,9 @@ import sys from twisted.internet import reactor from twisted.python import log -from autobahn.websocket import WebSocketClientFactory, \ - WebSocketClientProtocol, \ - connectWS +from autobahn.twisted.websocket import WebSocketClientProtocol, \ + WebSocketClientFactory, \ + connectWS class EchoClientProtocol(WebSocketClientProtocol): diff --git a/examples/websocket2/server_twisted.py b/examples/websocket2/server_twisted.py index 70aa7aeb..8eb14ad0 100644 --- a/examples/websocket2/server_twisted.py +++ b/examples/websocket2/server_twisted.py @@ -20,6 +20,9 @@ from autobahn.twisted.websocket import WebSocketServerProtocol, \ WebSocketServerFactory +#from autobahn.websocket import WebSocketServerProtocol, \ +# WebSocketServerFactory + class MyServerProtocol(WebSocketServerProtocol): @@ -27,8 +30,8 @@ class MyServerProtocol(WebSocketServerProtocol): print("message received") self.sendMessage(payload) - def onConnect(self, connectionRequest): - return None + #def onConnect(self, connectionRequest): + # return None class MyServerFactory(WebSocketServerFactory): @@ -39,11 +42,16 @@ class MyServerFactory(WebSocketServerFactory): if __name__ == '__main__': + import sys + from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor + from twisted.python import log - factory = MyServerFactory() + log.startLogging(sys.stdout) - endpoint = TCP4ServerEndpoint(reactor, 8888) + factory = MyServerFactory("ws://localhost:9000", debug = True) + + endpoint = TCP4ServerEndpoint(reactor, 9000) endpoint.listen(factory) reactor.run()