started with refactoring of WebSocket for asyncio

This commit is contained in:
Tobias Oberstein
2013-12-22 17:15:09 +01:00
parent 3d0dd1fc8f
commit bdc36899e5
5 changed files with 370 additions and 265 deletions

View File

@@ -17,39 +17,264 @@
###############################################################################
from __future__ import absolute_import
import twisted.internet.protocol
from twisted.internet.defer import maybeDeferred
from twisted.python import log
from autobahn.websocket2 import protocol
from autobahn import websocket
class WebSocketServerProtocol(twisted.internet.protocol.Protocol, protocol.WebSocketServerProtocol):
class WebSocketServerProtocol(websocket.WebSocketServerProtocol, twisted.internet.protocol.Protocol):
"""
"""
def connectionMade(self):
peername = str(self.transport.getPeer())
print('connection from {}'.format(peername))
## the peer we are connected to
peer = self.transport.getPeer()
try:
self.peer = "%s:%d" % (peer.host, peer.port)
except:
## eg Unix Domain sockets don't have host/port
self.peer = str(peer)
def dataReceived(self, data):
self._onData(data)
#print('data received: {}'.format(data.decode()))
#self.transport.write(data)
#self.transport.loseConnection()
websocket.WebSocketServerProtocol.connectionMade(self)
## Set "Nagle"
try:
self.transport.setTcpNoDelay(self.tcpNoDelay)
except:
## eg Unix Domain sockets throw Errno 22 on this
pass
# peername = str(self.transport.getPeer())
# print('connection from {}'.format(peername))
# def dataReceived(self, data):
# self._onData(data)
# #print('data received: {}'.format(data.decode()))
# #self.transport.write(data)
# #self.transport.loseConnection()
# def connectionLost(self, reason):
# pass
def _closeConnection(self, abort = False):
if abort:
self.transport.abortConnection()
else:
self.transport.loseConnection()
def _run_onConnect(self, connectionRequest):
## onConnect() will return the selected subprotocol or None
## or a pair (protocol, headers) or raise an HttpException
##
res = maybeDeferred(self.onConnect, connectionRequest)
res.addCallback(self._processHandshake_buildResponse)
res.addErrback(self._processHandshake_failed)
def registerProducer(self, producer, streaming):
"""
Register a Twisted producer with this protocol.
Modes: Hybi, Hixie
:param producer: A Twisted push or pull producer.
:type producer: object
:param streaming: Producer type.
:type streaming: bool
"""
self.transport.registerProducer(producer, streaming)
class WebSocketServerFactory(twisted.internet.protocol.Factory, protocol.WebSocketServerFactory):
protocol = WebSocketServerProtocol
class WebSocketServerFactory(websocket.WebSocketServerFactory, twisted.internet.protocol.ServerFactory):
"""
"""
def __init__(self, reactor = None):
def __init__(self, *args, **kwargs):
#twisted.internet.protocol.ServerFactory.__init__(self)
websocket.WebSocketServerFactory.__init__(self, *args, **kwargs)
## lazy import to avoid reactor install upon module import
if reactor is None:
if kwargs.has_key('reactor'):
self.reactor = kwargs['reactor']
else:
from twisted.internet import reactor
self._reactor = reactor
self.reactor = reactor
def buildProtocol(self, addr):
proto = self.protocol()
proto.factory = self
return proto
def _log(self, msg):
log.msg(msg)
def _callLater(self, delay, fun):
return self.reactor.callLater(delay, fun)
#protocol = WebSocketServerProtocol
# def __init__(self, reactor = None):
# ## lazy import to avoid reactor install upon module import
# if reactor is None:
# from twisted.internet import reactor
# self._reactor = reactor
# def buildProtocol(self, addr):
# proto = self.protocol()
# proto.factory = self
# return proto
class WebSocketClientProtocol(websocket.WebSocketClientProtocol, twisted.internet.protocol.Protocol):
"""
"""
def connectionMade(self):
## the peer we are connected to
peer = self.transport.getPeer()
try:
self.peer = "%s:%d" % (peer.host, peer.port)
except:
## eg Unix Domain sockets don't have host/port
self.peer = str(peer)
websocket.WebSocketClientProtocol.connectionMade(self)
## Set "Nagle"
try:
self.transport.setTcpNoDelay(self.tcpNoDelay)
except:
## eg Unix Domain sockets throw Errno 22 on this
pass
def registerProducer(self, producer, streaming):
"""
Register a Twisted producer with this protocol.
Modes: Hybi, Hixie
:param producer: A Twisted push or pull producer.
:type producer: object
:param streaming: Producer type.
:type streaming: bool
"""
self.transport.registerProducer(producer, streaming)
def _closeConnection(self, abort = False):
if abort:
self.transport.abortConnection()
else:
self.transport.loseConnection()
class WebSocketClientFactory(websocket.WebSocketClientFactory, twisted.internet.protocol.ClientFactory):
"""
"""
def __init__(self, *args, **kwargs):
#twisted.internet.protocol.ClientFactory.__init__(self)
websocket.WebSocketClientFactory.__init__(self, *args, **kwargs)
## lazy import to avoid reactor install upon module import
if kwargs.has_key('reactor'):
self.reactor = kwargs['reactor']
else:
from twisted.internet import reactor
self.reactor = reactor
def _log(self, msg):
log.msg(msg)
def _callLater(self, delay, fun):
return self.reactor.callLater(delay, fun)
def connectWS(factory, contextFactory = None, timeout = 30, bindAddress = None):
"""
Establish WebSocket connection to a server. The connection parameters like target
host, port, resource and others are provided via the factory.
:param factory: The WebSocket protocol factory to be used for creating client protocol instances.
:type factory: An :class:`autobahn.websocket.WebSocketClientFactory` instance.
:param contextFactory: SSL context factory, required for secure WebSocket connections ("wss").
:type contextFactory: A `twisted.internet.ssl.ClientContextFactory <http://twistedmatrix.com/documents/current/api/twisted.internet.ssl.ClientContextFactory.html>`_ instance.
:param timeout: Number of seconds to wait before assuming the connection has failed.
:type timeout: int
:param bindAddress: A (host, port) tuple of local address to bind to, or None.
:type bindAddress: tuple
:returns: obj -- An object which implements `twisted.interface.IConnector <http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IConnector.html>`_.
"""
## lazy import to avoid reactor install upon module import
if hasattr(factory, 'reactor'):
reactor = factory.reactor
else:
from twisted.internet import reactor
if factory.proxy is not None:
if factory.isSecure:
raise Exception("WSS over explicit proxies not implemented")
else:
conn = reactor.connectTCP(factory.proxy['host'], factory.proxy['port'], factory, timeout, bindAddress)
else:
if factory.isSecure:
if contextFactory is None:
# create default client SSL context factory when none given
from twisted.internet import ssl
contextFactory = ssl.ClientContextFactory()
conn = reactor.connectSSL(factory.host, factory.port, factory, contextFactory, timeout, bindAddress)
else:
conn = reactor.connectTCP(factory.host, factory.port, factory, timeout, bindAddress)
return conn
def listenWS(factory, contextFactory = None, backlog = 50, interface = ''):
"""
Listen for incoming WebSocket connections from clients. The connection parameters like
listening port and others are provided via the factory.
:param factory: The WebSocket protocol factory to be used for creating server protocol instances.
:type factory: An :class:`autobahn.websocket.WebSocketServerFactory` instance.
:param contextFactory: SSL context factory, required for secure WebSocket connections ("wss").
:type contextFactory: A twisted.internet.ssl.ContextFactory.
:param backlog: Size of the listen queue.
:type backlog: int
:param interface: The interface (derived from hostname given) to bind to, defaults to '' (all).
:type interface: str
:returns: obj -- An object that implements `twisted.interface.IListeningPort <http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IListeningPort.html>`_.
"""
## lazy import to avoid reactor install upon module import
if hasattr(factory, 'reactor'):
reactor = factory.reactor
else:
from twisted.internet import reactor
if factory.isSecure:
if contextFactory is None:
raise Exception("Secure WebSocket listen requested, but no SSL context factory given")
listener = reactor.listenSSL(factory.port, factory, contextFactory, backlog, interface)
else:
listener = reactor.listenTCP(factory.port, factory, backlog, interface)
return listener

View File

@@ -16,10 +16,10 @@
##
###############################################################################
from __future__ import absolute_import
__all__ = ["createWsUrl",
"parseWsUrl",
"connectWS",
"listenWS",
"HttpException",
"ConnectionRequest",
@@ -60,20 +60,17 @@ from collections import deque
from zope.interface import implementer
from twisted.internet import protocol
from twisted.internet.defer import maybeDeferred
from twisted.python import log
from autobahn._version import __version__
from _version import __version__
from autobahn.interfaces import IWebSocketChannel, \
IWebSocketChannelFrameApi, \
IWebSocketChannelStreamingApi
from interfaces import IWebSocketChannel, \
IWebSocketChannelFrameApi, \
IWebSocketChannelStreamingApi
from utf8validator import Utf8Validator
from xormasker import XorMaskerNull, createXorMasker
from httpstatus import *
from util import Stopwatch
from compress import *
from autobahn.utf8validator import Utf8Validator
from autobahn.xormasker import XorMaskerNull, createXorMasker
from autobahn.httpstatus import *
from autobahn.util import Stopwatch
from autobahn.compress import *
def createWsUrl(hostname, port = None, isSecure = False, path = None, params = None):
@@ -160,78 +157,6 @@ def parseWsUrl(url):
def connectWS(factory, contextFactory = None, timeout = 30, bindAddress = None):
"""
Establish WebSocket connection to a server. The connection parameters like target
host, port, resource and others are provided via the factory.
:param factory: The WebSocket protocol factory to be used for creating client protocol instances.
:type factory: An :class:`autobahn.websocket.WebSocketClientFactory` instance.
:param contextFactory: SSL context factory, required for secure WebSocket connections ("wss").
:type contextFactory: A `twisted.internet.ssl.ClientContextFactory <http://twistedmatrix.com/documents/current/api/twisted.internet.ssl.ClientContextFactory.html>`_ instance.
:param timeout: Number of seconds to wait before assuming the connection has failed.
:type timeout: int
:param bindAddress: A (host, port) tuple of local address to bind to, or None.
:type bindAddress: tuple
:returns: obj -- An object which implements `twisted.interface.IConnector <http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IConnector.html>`_.
"""
## lazy import to avoid reactor install upon module import
if hasattr(factory, 'reactor'):
reactor = factory.reactor
else:
from twisted.internet import reactor
if factory.proxy is not None:
if factory.isSecure:
raise Exception("WSS over explicit proxies not implemented")
else:
conn = reactor.connectTCP(factory.proxy['host'], factory.proxy['port'], factory, timeout, bindAddress)
else:
if factory.isSecure:
if contextFactory is None:
# create default client SSL context factory when none given
from twisted.internet import ssl
contextFactory = ssl.ClientContextFactory()
conn = reactor.connectSSL(factory.host, factory.port, factory, contextFactory, timeout, bindAddress)
else:
conn = reactor.connectTCP(factory.host, factory.port, factory, timeout, bindAddress)
return conn
def listenWS(factory, contextFactory = None, backlog = 50, interface = ''):
"""
Listen for incoming WebSocket connections from clients. The connection parameters like
listening port and others are provided via the factory.
:param factory: The WebSocket protocol factory to be used for creating server protocol instances.
:type factory: An :class:`autobahn.websocket.WebSocketServerFactory` instance.
:param contextFactory: SSL context factory, required for secure WebSocket connections ("wss").
:type contextFactory: A twisted.internet.ssl.ContextFactory.
:param backlog: Size of the listen queue.
:type backlog: int
:param interface: The interface (derived from hostname given) to bind to, defaults to '' (all).
:type interface: str
:returns: obj -- An object that implements `twisted.interface.IListeningPort <http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IListeningPort.html>`_.
"""
## lazy import to avoid reactor install upon module import
if hasattr(factory, 'reactor'):
reactor = factory.reactor
else:
from twisted.internet import reactor
if factory.isSecure:
if contextFactory is None:
raise Exception("Secure WebSocket listen requested, but no SSL context factory given")
listener = reactor.listenSSL(factory.port, factory, contextFactory, backlog, interface)
else:
listener = reactor.listenTCP(factory.port, factory, backlog, interface)
return listener
class TrafficStats:
def __init__(self):
@@ -366,14 +291,12 @@ class ConnectionRequest:
provided in :meth:`autobahn.websocket.WebSocketServerProtocol.onConnect` when a WebSocket
client establishes a connection to a WebSocket server.
"""
def __init__(self, peer, peerstr, headers, host, path, params, version, origin, protocols, extensions):
def __init__(self, peer, headers, host, path, params, version, origin, protocols, extensions):
"""
Constructor.
:param peer: IP address/port of the connecting client.
:type peer: object
:param peerstr: IP address/port of the connecting client as string.
:type peerstr: str
:param peer: Descriptor of the connecting client (eg IP address/port in case of TCP transports).
:type peer: str
:param headers: HTTP headers from opening handshake request.
:type headers: dict
:param host: Host from opening handshake HTTP header.
@@ -392,7 +315,6 @@ class ConnectionRequest:
:type extensions: array of strings
"""
self.peer = peer
self.peerstr = peerstr
self.headers = headers
self.host = host
self.path = path
@@ -405,7 +327,6 @@ class ConnectionRequest:
def __json__(self):
return {'peer': self.peer,
'peerstr': self.peerstr,
'headers': self.headers,
'host': self.host,
'path': self.path,
@@ -427,14 +348,12 @@ class ConnectionResponse():
provided in :meth:`autobahn.websocket.WebSocketClientProtocol.onConnect` when a WebSocket
client has established a connection to a WebSocket server.
"""
def __init__(self, peer, peerstr, headers, version, protocol, extensions):
def __init__(self, peer, headers, version, protocol, extensions):
"""
Constructor.
:param peer: IP address/port of the connected server.
:type peer: object
:param peerstr: IP address/port of the connected server as string.
:type peerstr: str
:param peer: Descriptor of the connected server (e.g. IP address/port in case of TCP transport).
:type peer: str
:param headers: HTTP headers from opening handshake response.
:type headers: dict
:param version: The WebSocket protocol version that is spoken.
@@ -445,7 +364,6 @@ class ConnectionResponse():
:type extensions: array of strings
"""
self.peer = peer
self.peerstr = peerstr
self.headers = headers
self.version = version
self.protocol = protocol
@@ -454,7 +372,6 @@ class ConnectionResponse():
def __json__(self):
return {'peer': self.peer,
'peerstr': self.peerstr,
'headers': self.headers,
'version': self.version,
'protocol': self.protocol,
@@ -571,7 +488,7 @@ class Timings:
@implementer(IWebSocketChannel)
@implementer(IWebSocketChannelFrameApi)
@implementer(IWebSocketChannelStreamingApi)
class WebSocketProtocol(protocol.Protocol):
class WebSocketProtocol:
"""
A Twisted Protocol class for WebSocket. This class is used by both WebSocket
client and server protocol version. It is unusable standalone, for example
@@ -757,7 +674,7 @@ class WebSocketProtocol(protocol.Protocol):
Modes: Hybi, Hixie
"""
if self.debugCodePaths:
log.msg("WebSocketProtocol.onOpen")
self.factory._log("WebSocketProtocol.onOpen")
def onMessageBegin(self, opcode):
@@ -884,7 +801,7 @@ class WebSocketProtocol(protocol.Protocol):
:type binary: bool
"""
if self.debug:
log.msg("WebSocketProtocol.onMessage")
self.factory._log("WebSocketProtocol.onMessage")
def onPing(self, payload):
@@ -898,7 +815,7 @@ class WebSocketProtocol(protocol.Protocol):
:type payload: str
"""
if self.debug:
log.msg("WebSocketProtocol.onPing")
self.factory._log("WebSocketProtocol.onPing")
if self.state == WebSocketProtocol.STATE_OPEN:
self.sendPong(payload)
@@ -913,7 +830,7 @@ class WebSocketProtocol(protocol.Protocol):
:param payload: Payload of Pong, when there was any. Can be arbitrary, up to 125 octets.
"""
if self.debug:
log.msg("WebSocketProtocol.onPong")
self.factory._log("WebSocketProtocol.onPong")
def onClose(self, wasClean, code, reason):
@@ -943,7 +860,7 @@ class WebSocketProtocol(protocol.Protocol):
s += "self.localCloseReason=%s\n" % self.localCloseReason
s += "self.remoteCloseCode=%s\n" % self.remoteCloseCode
s += "self.remoteCloseReason=%s\n" % self.remoteCloseReason
log.msg(s)
self.factory._log(s)
def onCloseFrame(self, code, reasonRaw):
@@ -965,7 +882,7 @@ class WebSocketProtocol(protocol.Protocol):
:type reason: str
"""
if self.debugCodePaths:
log.msg("WebSocketProtocol.onCloseFrame")
self.factory._log("WebSocketProtocol.onCloseFrame")
self.remoteCloseCode = code
self.remoteCloseReason = reasonRaw
@@ -995,7 +912,7 @@ class WebSocketProtocol(protocol.Protocol):
##
if self.closeHandshakeTimeoutCall is not None:
if self.debugCodePaths:
log.msg("closeHandshakeTimeoutCall.cancel")
self.factory._log("closeHandshakeTimeoutCall.cancel")
self.closeHandshakeTimeoutCall.cancel()
self.closeHandshakeTimeoutCall = None
@@ -1008,7 +925,7 @@ class WebSocketProtocol(protocol.Protocol):
## When we are a client, the server should drop the TCP
## If that doesn't happen, we do. And that will set wasClean = False.
if self.serverConnectionDropTimeout > 0:
self.serverConnectionDropTimeoutCall = self.factory.reactor.callLater(self.serverConnectionDropTimeout, self.onServerConnectionDropTimeout)
self.serverConnectionDropTimeoutCall = self.factory._callLater(self.serverConnectionDropTimeout, self.onServerConnectionDropTimeout)
elif self.state == WebSocketProtocol.STATE_OPEN:
## The peer initiates a closing handshake, so we reply
@@ -1050,14 +967,14 @@ class WebSocketProtocol(protocol.Protocol):
self.serverConnectionDropTimeoutCall = None
if self.state != WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("onServerConnectionDropTimeout")
self.factory._log("onServerConnectionDropTimeout")
self.wasClean = False
self.wasNotCleanReason = "server did not drop TCP connection (in time)"
self.wasServerConnectionDropTimeout = True
self.dropConnection(abort = True)
else:
if self.debugCodePaths:
log.msg("skipping onServerConnectionDropTimeout since connection is already closed")
self.factory._log("skipping onServerConnectionDropTimeout since connection is already closed")
def onOpenHandshakeTimeout(self):
@@ -1071,20 +988,20 @@ class WebSocketProtocol(protocol.Protocol):
self.openHandshakeTimeoutCall = None
if self.state in [WebSocketProtocol.STATE_CONNECTING, WebSocketProtocol.STATE_PROXY_CONNECTING]:
if self.debugCodePaths:
log.msg("onOpenHandshakeTimeout fired")
self.factory._log("onOpenHandshakeTimeout fired")
self.wasClean = False
self.wasNotCleanReason = "peer did not finish (in time) the opening handshake"
self.wasOpenHandshakeTimeout = True
self.dropConnection(abort = True)
elif self.state == WebSocketProtocol.STATE_OPEN:
if self.debugCodePaths:
log.msg("skipping onOpenHandshakeTimeout since WebSocket connection is open (opening handshake already finished)")
self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is open (opening handshake already finished)")
elif self.state == WebSocketProtocol.STATE_CLOSING:
if self.debugCodePaths:
log.msg("skipping onOpenHandshakeTimeout since WebSocket connection is closing")
self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is closing")
elif self.state == WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("skipping onOpenHandshakeTimeout since WebSocket connection already closed")
self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection already closed")
else:
# should not arrive here
raise Exception("logic error")
@@ -1101,14 +1018,14 @@ class WebSocketProtocol(protocol.Protocol):
self.closeHandshakeTimeoutCall = None
if self.state != WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("onCloseHandshakeTimeout fired")
self.factory._log("onCloseHandshakeTimeout fired")
self.wasClean = False
self.wasNotCleanReason = "peer did not respond (in time) in closing handshake"
self.wasCloseHandshakeTimeout = True
self.dropConnection(abort = True)
else:
if self.debugCodePaths:
log.msg("skipping onCloseHandshakeTimeout since connection is already closed")
self.factory._log("skipping onCloseHandshakeTimeout since connection is already closed")
def dropConnection(self, abort = False):
@@ -1122,17 +1039,14 @@ class WebSocketProtocol(protocol.Protocol):
"""
if self.state != WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("dropping connection")
self.factory._log("dropping connection")
self.droppedByMe = True
self.state = WebSocketProtocol.STATE_CLOSED
if abort:
self.transport.abortConnection()
else:
self.transport.loseConnection()
self._closeConnection(abort)
else:
if self.debugCodePaths:
log.msg("skipping dropConnection since connection is already closed")
self.factory._log("skipping dropConnection since connection is already closed")
def failConnection(self, code = CLOSE_STATUS_CODE_GOING_AWAY, reason = "Going Away"):
@@ -1146,7 +1060,7 @@ class WebSocketProtocol(protocol.Protocol):
"""
if self.state != WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("Failing connection : %s - %s" % (code, reason))
self.factory._log("Failing connection : %s - %s" % (code, reason))
self.failedByMe = True
if self.failByDrop:
## brutally drop the TCP connection
@@ -1159,10 +1073,10 @@ class WebSocketProtocol(protocol.Protocol):
self.sendCloseFrame(code = code, reasonUtf8 = reason.encode("UTF-8"), isReply = False)
else:
if self.debugCodePaths:
log.msg("skipping failConnection since connection is already closing")
self.factory._log("skipping failConnection since connection is already closing")
else:
if self.debugCodePaths:
log.msg("skipping failConnection since connection is already closed")
self.factory._log("skipping failConnection since connection is already closed")
def protocolViolation(self, reason):
@@ -1180,7 +1094,7 @@ class WebSocketProtocol(protocol.Protocol):
:returns: bool -- True, when any further processing should be discontinued.
"""
if self.debugCodePaths:
log.msg("Protocol violation : %s" % reason)
self.factory._log("Protocol violation : %s" % reason)
self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_PROTOCOL_ERROR, reason)
if self.failByDrop:
return True
@@ -1207,7 +1121,7 @@ class WebSocketProtocol(protocol.Protocol):
:returns: bool -- True, when any further processing should be discontinued.
"""
if self.debugCodePaths:
log.msg("Invalid payload : %s" % reason)
self.factory._log("Invalid payload : %s" % reason)
self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_INVALID_PAYLOAD, reason)
if self.failByDrop:
return True
@@ -1254,8 +1168,8 @@ class WebSocketProtocol(protocol.Protocol):
configAttrLog.append((configAttr, getattr(self, configAttr), configAttrSource))
if self.debug:
#log.msg(configAttrLog)
log.msg("\n" + pformat(configAttrLog))
#self.factory._log(configAttrLog)
self.factory._log("\n" + pformat(configAttrLog))
## permessage-compress extension
self._perMessageCompress = None
@@ -1267,21 +1181,6 @@ class WebSocketProtocol(protocol.Protocol):
## Traffic stats
self.trafficStats = TrafficStats()
## Set "Nagle"
try:
self.transport.setTcpNoDelay(self.tcpNoDelay)
except:
## eg Unix Domain sockets throw Errno 22 on this
pass
## the peer we are connected to
self.peer = self.transport.getPeer()
try:
self.peerstr = "%s:%d" % (self.peer.host, self.peer.port)
except:
## eg Unix Domain sockets don't have host/port
self.peerstr = str(self.peer)
## initial state
if not self.factory.isServer and self.factory.proxy is not None:
self.state = WebSocketProtocol.STATE_PROXY_CONNECTING
@@ -1355,7 +1254,7 @@ class WebSocketProtocol(protocol.Protocol):
# set opening handshake timeout handler
if self.openHandshakeTimeout > 0:
self.openHandshakeTimeoutCall = self.factory.reactor.callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout)
self.openHandshakeTimeoutCall = self.factory._callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout)
def connectionLost(self, reason):
@@ -1368,7 +1267,7 @@ class WebSocketProtocol(protocol.Protocol):
##
if not self.factory.isServer and self.serverConnectionDropTimeoutCall is not None:
if self.debugCodePaths:
log.msg("serverConnectionDropTimeoutCall.cancel")
self.factory._log("serverConnectionDropTimeoutCall.cancel")
self.serverConnectionDropTimeoutCall.cancel()
self.serverConnectionDropTimeoutCall = None
@@ -1387,7 +1286,7 @@ class WebSocketProtocol(protocol.Protocol):
Modes: Hybi, Hixie
"""
log.msg("RX Octets from %s : octets = %s" % (self.peerstr, binascii.b2a_hex(data)))
self.factory._log("RX Octets from %s : octets = %s" % (self.peer, binascii.b2a_hex(data)))
def logTxOctets(self, data, sync):
@@ -1396,7 +1295,7 @@ class WebSocketProtocol(protocol.Protocol):
Modes: Hybi, Hixie
"""
log.msg("TX Octets to %s : sync = %s, octets = %s" % (self.peerstr, sync, binascii.b2a_hex(data)))
self.factory._log("TX Octets to %s : sync = %s, octets = %s" % (self.peer, sync, binascii.b2a_hex(data)))
def logRxFrame(self, frameHeader, payload):
@@ -1406,7 +1305,7 @@ class WebSocketProtocol(protocol.Protocol):
Modes: Hybi
"""
data = ''.join(payload)
info = (self.peerstr,
info = (self.peer,
frameHeader.fin,
frameHeader.rsv,
frameHeader.opcode,
@@ -1414,7 +1313,7 @@ class WebSocketProtocol(protocol.Protocol):
frameHeader.length,
data if frameHeader.opcode == 1 else binascii.b2a_hex(data))
log.msg("RX Frame from %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, payload = %s" % info)
self.factory._log("RX Frame from %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, payload = %s" % info)
def logTxFrame(self, frameHeader, payload, repeatLength, chopsize, sync):
@@ -1423,7 +1322,7 @@ class WebSocketProtocol(protocol.Protocol):
Modes: Hybi
"""
info = (self.peerstr,
info = (self.peer,
frameHeader.fin,
frameHeader.rsv,
frameHeader.opcode,
@@ -1434,7 +1333,7 @@ class WebSocketProtocol(protocol.Protocol):
sync,
payload if frameHeader.opcode == 1 else binascii.b2a_hex(payload))
log.msg("TX Frame to %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, repeat_length = %s, chopsize = %s, sync = %s, payload = %s" % info)
self.factory._log("TX Frame to %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, repeat_length = %s, chopsize = %s, sync = %s, payload = %s" % info)
def dataReceived(self, data):
@@ -1493,7 +1392,7 @@ class WebSocketProtocol(protocol.Protocol):
## ignore any data received after WS was closed
##
if self.debugCodePaths:
log.msg("received data in STATE_CLOSED")
self.factory._log("received data in STATE_CLOSED")
## should not arrive here (invalid state)
##
@@ -1519,20 +1418,6 @@ class WebSocketProtocol(protocol.Protocol):
raise Exception("must implement handshake (client or server) in derived class")
def registerProducer(self, producer, streaming):
"""
Register a Twisted producer with this protocol.
Modes: Hybi, Hixie
:param producer: A Twisted push or pull producer.
:type producer: object
:param streaming: Producer type.
:type streaming: bool
"""
self.transport.registerProducer(producer, streaming)
def _trigger(self):
"""
Trigger sending stuff from send queue (which is only used for chopped/synched writes).
@@ -1567,13 +1452,13 @@ class WebSocketProtocol(protocol.Protocol):
self.logTxOctets(e[0], e[1])
else:
if self.debugCodePaths:
log.msg("skipped delayed write, since connection is closed")
self.factory._log("skipped delayed write, since connection is closed")
# we need to reenter the reactor to make the latter
# reenter the OS network stack, so that octets
# can get on the wire. Note: this is a "heuristic",
# since there is no (easy) way to really force out
# octets from the OS network stack to wire.
self.factory.reactor.callLater(WebSocketProtocol._QUEUED_WRITE_DELAY, self._send)
self.factory._callLater(WebSocketProtocol._QUEUED_WRITE_DELAY, self._send)
else:
self.triggered = False
@@ -2017,7 +1902,7 @@ class WebSocketProtocol(protocol.Protocol):
if self._isMessageCompressed:
compressedLen = len(payload)
if self.debug:
log.msg("RX compressed [%d]: %s" % (compressedLen, binascii.b2a_hex(payload)))
self.factory._log("RX compressed [%d]: %s" % (compressedLen, binascii.b2a_hex(payload)))
payload = self._perMessageCompress.decompressMessageData(payload)
uncompressedLen = len(payload)
@@ -2074,7 +1959,7 @@ class WebSocketProtocol(protocol.Protocol):
return False
if self.debug:
log.msg("Traffic statistics:\n" + str(self.trafficStats))
self.factory._log("Traffic statistics:\n" + str(self.trafficStats))
if self.state == WebSocketProtocol.STATE_OPEN:
self.trafficStats.incomingWebSocketMessages += 1
@@ -2283,11 +2168,11 @@ class WebSocketProtocol(protocol.Protocol):
"""
if self.state == WebSocketProtocol.STATE_CLOSING:
if self.debugCodePaths:
log.msg("ignoring sendCloseFrame since connection is closing")
self.factory._log("ignoring sendCloseFrame since connection is closing")
elif self.state == WebSocketProtocol.STATE_CLOSED:
if self.debugCodePaths:
log.msg("ignoring sendCloseFrame since connection already closed")
self.factory._log("ignoring sendCloseFrame since connection already closed")
elif self.state in [WebSocketProtocol.STATE_PROXY_CONNECTING, WebSocketProtocol.STATE_CONNECTING]:
raise Exception("cannot close a connection not yet connected")
@@ -2315,7 +2200,7 @@ class WebSocketProtocol(protocol.Protocol):
## drop connection when timeout on receiving close handshake reply
if self.closedByMe and self.closeHandshakeTimeout > 0:
self.closeHandshakeTimeoutCall = self.factory.reactor.callLater(self.closeHandshakeTimeout, self.onCloseHandshakeTimeout)
self.closeHandshakeTimeoutCall = self.factory._callLater(self.closeHandshakeTimeout, self.onCloseHandshakeTimeout)
else:
raise Exception("logic error")
@@ -2742,7 +2627,7 @@ class WebSocketProtocol(protocol.Protocol):
i += pfs
if self.debug:
log.msg("Traffic statistics:\n" + str(self.trafficStats))
self.factory._log("Traffic statistics:\n" + str(self.trafficStats))
def _parseExtensionsHeader(self, header, removeQuotes = True):
@@ -2957,7 +2842,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
WebSocketProtocol.connectionMade(self)
self.factory.countConnections += 1
if self.debug:
log.msg("connection accepted from peer %s" % self.peerstr)
self.factory._log("connection accepted from peer %s" % self.peer)
def connectionLost(self, reason):
@@ -2970,7 +2855,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
WebSocketProtocol.connectionLost(self, reason)
self.factory.countConnections -= 1
if self.debug:
log.msg("connection from %s lost" % self.peerstr)
self.factory._log("connection from %s lost" % self.peer)
def processProxyConnect(self):
@@ -2995,7 +2880,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
self.http_request_data = self.data[:end_of_header + 4]
if self.debug:
log.msg("received HTTP request:\n\n%s\n\n" % self.http_request_data)
self.factory._log("received HTTP request:\n\n%s\n\n" % self.http_request_data)
## extract HTTP status line and headers
##
@@ -3004,8 +2889,8 @@ class WebSocketServerProtocol(WebSocketProtocol):
## validate WebSocket opening handshake client request
##
if self.debug:
log.msg("received HTTP status line in opening handshake : %s" % str(self.http_status_line))
log.msg("received HTTP headers in opening handshake : %s" % str(self.http_headers))
self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line))
self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers))
## HTTP Request line : METHOD, VERSION
##
@@ -3087,15 +2972,15 @@ class WebSocketServerProtocol(WebSocketProtocol):
if self.http_request_params.has_key('after') and len(self.http_request_params['after']) > 0:
after = int(self.http_request_params['after'][0])
if self.debugCodePaths:
log.msg("HTTP Upgrade header missing : render server status page and meta-refresh-redirecting to %s after %d seconds" % (url, after))
self.factory._log("HTTP Upgrade header missing : render server status page and meta-refresh-redirecting to %s after %d seconds" % (url, after))
self.sendServerStatus(url, after)
else:
if self.debugCodePaths:
log.msg("HTTP Upgrade header missing : 303-redirecting to %s" % url)
self.factory._log("HTTP Upgrade header missing : 303-redirecting to %s" % url)
self.sendRedirect(url)
else:
if self.debugCodePaths:
log.msg("HTTP Upgrade header missing : render server status page")
self.factory._log("HTTP Upgrade header missing : render server status page")
self.sendServerStatus()
self.dropConnection(abort = False)
return
@@ -3125,14 +3010,14 @@ class WebSocketServerProtocol(WebSocketProtocol):
##
if not self.http_headers.has_key("sec-websocket-version"):
if self.debugCodePaths:
log.msg("Hixie76 protocol detected")
self.factory._log("Hixie76 protocol detected")
if self.allowHixie76:
version = 0
else:
return self.failHandshake("WebSocket connection denied - Hixie76 protocol mode disabled.")
else:
if self.debugCodePaths:
log.msg("Hybi protocol detected")
self.factory._log("Hybi protocol detected")
if http_headers_cnt["sec-websocket-version"] > 1:
return self.failHandshake("HTTP Sec-WebSocket-Version header appears more than once in opening handshake request")
try:
@@ -3240,7 +3125,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
else:
key3 = self.data[end_of_header + 4:end_of_header + 4 + 8]
if self.debug:
log.msg("received HTTP request body containing key3 for Hixie-76: %s" % key3)
self.factory._log("received HTTP request body containing key3 for Hixie-76: %s" % key3)
## Ok, got complete HS input, remember rest (if any)
##
@@ -3249,6 +3134,12 @@ class WebSocketServerProtocol(WebSocketProtocol):
else:
self.data = self.data[end_of_header + 4:]
## store WS key
##
if self.websocket_version == 0:
self._wskey = (key1, key2, key3)
else:
self._wskey = key
## WebSocket handshake validated => produce opening handshake response
@@ -3257,7 +3148,6 @@ class WebSocketServerProtocol(WebSocketProtocol):
## may return a protocol from the protocols provided by client or None.
##
self.connectionRequest = ConnectionRequest(self.peer,
self.peerstr,
self.http_headers,
self.http_request_host,
self.http_request_path,
@@ -3267,19 +3157,10 @@ class WebSocketServerProtocol(WebSocketProtocol):
self.websocket_protocols,
self.websocket_extensions)
## onConnect() will return the selected subprotocol or None
## or a pair (protocol, headers) or raise an HttpException
##
protocol = maybeDeferred(self.onConnect, self.connectionRequest)
if self.websocket_version == 0:
key = key1, key2, key3
protocol.addCallback(self._processHandshake_buildResponse, key)
protocol.addErrback(self._processHandshake_failed)
self._run_onConnect(self.connectionRequest)
def _processHandshake_buildResponse(self, res, key):
def _processHandshake_buildResponse(self, res):
"""
Callback for Deferred returned by self.onConnect. Generates the response for the handshake.
"""
@@ -3299,7 +3180,9 @@ class WebSocketServerProtocol(WebSocketProtocol):
self.websocket_protocol_in_use = protocol
if self.websocket_version == 0:
key1, key2, key3 = key
key1, key2, key3 = self._wskey
else:
key = self._wskey
## extensions effectively in use for this connection
@@ -3317,7 +3200,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
for (extension, params) in self.websocket_extensions:
if self.debug:
log.msg("parsed WebSocket extension '%s' with params '%s'" % (extension, params))
self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params))
## process permessage-compress extension
##
@@ -3333,7 +3216,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
else:
if self.debug:
log.msg("client requested '%s' extension we don't support or which is not activated" % extension)
self.factory._log("client requested '%s' extension we don't support or which is not activated" % extension)
## handle permessage-compress offers by the client
##
@@ -3346,7 +3229,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
extensionResponse.append(accept.getExtensionString())
else:
if self.debug:
log.msg("client request permessage-compress extension, but we did not accept any offer [%s]" % pmceOffers)
self.factory._log("client request permessage-compress extension, but we did not accept any offer [%s]" % pmceOffers)
## build response to complete WebSocket handshake
@@ -3378,15 +3261,15 @@ class WebSocketServerProtocol(WebSocketProtocol):
response += "Sec-WebSocket-Origin: %s\x0d\x0a" % str(self.websocket_origin)
if self.debugCodePaths:
log.msg('factory isSecure = %s port = %s' % (self.factory.isSecure, self.factory.externalPort))
self.factory._log('factory isSecure = %s port = %s' % (self.factory.isSecure, self.factory.externalPort))
if (self.factory.isSecure and self.factory.externalPort != 443) or ((not self.factory.isSecure) and self.factory.externalPort != 80):
if self.debugCodePaths:
log.msg('factory running on non-default port')
self.factory._log('factory running on non-default port')
response_port = ':' + str(self.factory.externalPort)
else:
if self.debugCodePaths:
log.msg('factory running on default port')
self.factory._log('factory running on default port')
response_port = ''
## FIXME: check this! But see below ..
@@ -3429,7 +3312,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
response_body = ''
if self.debug:
log.msg("sending HTTP response:\n\n%s%s\n\n" % (response, binascii.b2a_hex(response_body)))
self.factory._log("sending HTTP response:\n\n%s%s\n\n" % (response, binascii.b2a_hex(response_body)))
## save and send out opening HS data
##
@@ -3444,7 +3327,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
##
if self.openHandshakeTimeoutCall is not None:
if self.debugCodePaths:
log.msg("openHandshakeTimeoutCall.cancel")
self.factory._log("openHandshakeTimeoutCall.cancel")
self.openHandshakeTimeoutCall.cancel()
self.openHandshakeTimeoutCall = None
@@ -3475,7 +3358,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
if failure.check(HttpException):
return self.failHandshake(e.reason, e.code)
else:
log.msg("Exception raised in onConnect() - %s" % str(e))
self.factory._log("Exception raised in onConnect() - %s" % str(e))
return self.failHandshake("Internal Server Error", HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR[0])
@@ -3485,7 +3368,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
error response and then drop the connection.
"""
if self.debug:
log.msg("failing WebSocket opening handshake ('%s')" % reason)
self.factory._log("failing WebSocket opening handshake ('%s')" % reason)
self.sendHttpErrorResponse(code, reason, responseHeaders)
self.dropConnection(abort = False)
@@ -3571,7 +3454,7 @@ class WebSocketServerProtocol(WebSocketProtocol):
class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory):
class WebSocketServerFactory(WebSocketFactory):
"""
A Twisted factory for WebSocket server protocols.
"""
@@ -3594,8 +3477,7 @@ class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory):
headers = {},
externalPort = None,
debug = False,
debugCodePaths = False,
reactor = None):
debugCodePaths = False):
"""
Create instance of WebSocket server factory.
@@ -3618,11 +3500,6 @@ class WebSocketServerFactory(protocol.ServerFactory, WebSocketFactory):
:param debugCodePaths: Debug code paths mode (default: False).
:type debugCodePaths: bool
"""
## lazy import to avoid reactor install upon module import
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
self.debug = debug
self.debugCodePaths = debugCodePaths
@@ -3877,7 +3754,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
"""
WebSocketProtocol.connectionMade(self)
if self.debug:
log.msg("connection to %s established" % self.peerstr)
self.factory._log("connection to %s established" % self.peer)
if not self.factory.isServer and self.factory.proxy is not None:
## start by doing a HTTP/CONNECT for explicit proxies
@@ -3896,7 +3773,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
"""
WebSocketProtocol.connectionLost(self, reason)
if self.debug:
log.msg("connection to %s lost" % self.peerstr)
self.factory._log("connection to %s lost" % self.peer)
def startProxyConnect(self):
@@ -3911,7 +3788,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
request += "\x0d\x0a"
if self.debug:
log.msg(request)
self.factory._log(request)
self.sendData(request)
@@ -3927,7 +3804,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
http_response_data = self.data[:end_of_header + 4]
if self.debug:
log.msg("received HTTP response:\n\n%s\n\n" % http_response_data)
self.factory._log("received HTTP response:\n\n%s\n\n" % http_response_data)
## extract HTTP status line and headers
##
@@ -3936,8 +3813,8 @@ class WebSocketClientProtocol(WebSocketProtocol):
## validate proxy connect response
##
if self.debug:
log.msg("received HTTP status line for proxy connect request : %s" % str(http_status_line))
log.msg("received HTTP headers for proxy connect request : %s" % str(http_headers))
self.factory._log("received HTTP status line for proxy connect request : %s" % str(http_status_line))
self.factory._log("received HTTP headers for proxy connect request : %s" % str(http_headers))
## Response Line
##
@@ -3994,7 +3871,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
connection.
"""
if self.debug:
log.msg("failing proxy connect ('%s')" % reason)
self.factory._log("failing proxy connect ('%s')" % reason)
self.dropConnection(abort = True)
@@ -4112,7 +3989,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
self.http_request_data = request
if self.debug:
log.msg(self.http_request_data)
self.factory._log(self.http_request_data)
self.sendData(self.http_request_data)
@@ -4128,7 +4005,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
self.http_response_data = self.data[:end_of_header + 4]
if self.debug:
log.msg("received HTTP response:\n\n%s\n\n" % self.http_response_data)
self.factory._log("received HTTP response:\n\n%s\n\n" % self.http_response_data)
## extract HTTP status line and headers
##
@@ -4137,8 +4014,8 @@ class WebSocketClientProtocol(WebSocketProtocol):
## validate WebSocket opening handshake server response
##
if self.debug:
log.msg("received HTTP status line in opening handshake : %s" % str(self.http_status_line))
log.msg("received HTTP headers in opening handshake : %s" % str(self.http_headers))
self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line))
self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers))
## Response Line
##
@@ -4229,7 +4106,7 @@ class WebSocketClientProtocol(WebSocketProtocol):
for (extension, params) in websocket_extensions:
if self.debug:
log.msg("parsed WebSocket extension '%s' with params '%s'" % (extension, params))
self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params))
## process permessage-compress extension
##
@@ -4304,7 +4181,6 @@ class WebSocketClientProtocol(WebSocketProtocol):
## by server
try:
connectionResponse = ConnectionResponse(self.peer,
self.peerstr,
self.http_headers,
None, # FIXME
self.websocket_protocol_in_use,
@@ -4335,12 +4211,12 @@ class WebSocketClientProtocol(WebSocketProtocol):
connection.
"""
if self.debug:
log.msg("failing WebSocket opening handshake ('%s')" % reason)
self.factory._log("failing WebSocket opening handshake ('%s')" % reason)
self.dropConnection(abort = True)
class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory):
class WebSocketClientFactory(WebSocketFactory):
"""
A Twisted factory for WebSocket client protocols.
"""
@@ -4364,8 +4240,7 @@ class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory):
headers = {},
proxy = None,
debug = False,
debugCodePaths = False,
reactor = None):
debugCodePaths = False):
"""
Create instance of WebSocket client factory.
@@ -4390,11 +4265,6 @@ class WebSocketClientFactory(protocol.ClientFactory, WebSocketFactory):
:param debugCodePaths: Debug code paths mode (default: False).
:type debugCodePaths: bool
"""
## lazy import to avoid reactor install upon module import
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
self.debug = debug
self.debugCodePaths = debugCodePaths

View File

@@ -0,0 +1,2 @@
client:
PYTHONPATH=../../../autobahn python client.py ws://127.0.0.1:9000 debug

View File

@@ -21,9 +21,9 @@ import sys
from twisted.internet import reactor
from twisted.python import log
from autobahn.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
from autobahn.twisted.websocket import WebSocketClientProtocol, \
WebSocketClientFactory, \
connectWS
class EchoClientProtocol(WebSocketClientProtocol):

View File

@@ -20,6 +20,9 @@ from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory
#from autobahn.websocket import WebSocketServerProtocol, \
# WebSocketServerFactory
class MyServerProtocol(WebSocketServerProtocol):
@@ -27,8 +30,8 @@ class MyServerProtocol(WebSocketServerProtocol):
print("message received")
self.sendMessage(payload)
def onConnect(self, connectionRequest):
return None
#def onConnect(self, connectionRequest):
# return None
class MyServerFactory(WebSocketServerFactory):
@@ -39,11 +42,16 @@ class MyServerFactory(WebSocketServerFactory):
if __name__ == '__main__':
import sys
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
from twisted.python import log
factory = MyServerFactory()
log.startLogging(sys.stdout)
endpoint = TCP4ServerEndpoint(reactor, 8888)
factory = MyServerFactory("ws://localhost:9000", debug = True)
endpoint = TCP4ServerEndpoint(reactor, 9000)
endpoint.listen(factory)
reactor.run()