work on long-poll

This commit is contained in:
Tobias Oberstein
2013-12-16 19:14:09 +01:00
parent f9ef1a610a
commit 25ca03376f
8 changed files with 267 additions and 172 deletions

View File

@@ -65,7 +65,7 @@ class Broker:
def remove(self, proto):
assert(proto in self._protos)
self._protos.remove(proto)
for subscribers in self._subscribers:
for subscriptionid, subscribers in self._subscribers.values():
subscribers.discard(proto)

View File

@@ -17,18 +17,21 @@
###############################################################################
import json
from collections import deque
from twisted.python import log
from twisted.web.resource import Resource, NoResource
## Each of the following 2 trigger a reactor import at module level
from twisted.web import http
from twisted.web.server import NOT_DONE_YET
from autobahn.util import newid
from twisted.web.resource import Resource, NoResource
from twisted.web.server import NOT_DONE_YET
from twisted.web import resource, http
from protocol import WampProtocol, parseSubprotocolIdentifier
from collections import deque
from protocol import WampProtocol
from twisted.internet import reactor
class WampHttpResourceSessionSend(Resource):
@@ -37,8 +40,14 @@ class WampHttpResourceSessionSend(Resource):
"""
def __init__(self, parent):
"""
"""
Resource.__init__(self)
self._parent = parent
self._debug = self._parent._parent._debug
self.reactor = self._parent.reactor
def render_POST(self, request):
"""
@@ -46,17 +55,19 @@ class WampHttpResourceSessionSend(Resource):
"""
payload = request.content.read()
try:
print "processing received WAMP message", payload
if self._debug:
log.msg("WAMP session data received (transport ID %s): %s" % (self._parent._transportid, payload))
self._parent.onMessage(payload, False)
except Exception, e:
request.setHeader('content-type', 'text/plain; charset=UTF-8')
request.setResponseCode(http.BAD_REQUEST)
return "could not unserialize WAMP message [%s]" % e
request.setResponseCode(http.NO_CONTENT)
self._parent._parent.setStandardHeaders(request)
request.setHeader('content-type', 'application/json; charset=utf-8')
self._parent._isalive = True
return ""
@@ -67,19 +78,26 @@ class WampHttpResourceSessionReceive(Resource):
"""
def __init__(self, parent):
"""
"""
Resource.__init__(self)
self._parent = parent
self._debug = self._parent._parent._debug
self.reactor = self._parent._parent.reactor
self._queue = deque()
self._request = None
self._killed = False
def printQueue():
print "send queue", self._parent._sessionid, self._queue
if not self._request:
print "no poll request"
reactor.callLater(1, printQueue)
printQueue()
if self._debug:
def logqueue():
if not self._killed:
log.msg("WAMP session send queue length (transport ID %s): %s" % (self._parent._transportid, len(self._queue)))
if not self._request:
log.msg("WAMP session has no XHR poll request (transport ID %s)" % self._parent._transportid)
self.reactor.callLater(1, logqueue)
logqueue()
def queue(self, data):
@@ -87,15 +105,28 @@ class WampHttpResourceSessionReceive(Resource):
self._trigger()
def _kill(self):
if self._request:
self._request.finish()
self._request = None
self._killed = True
def _trigger(self):
if self._request and len(self._queue):
## batched sending of queued messages
##
self._request.write('[')
while len(self._queue) > 0:
msg = self._queue.popleft()
self._request.write(msg)
if len(self._queue):
self._request.write(',')
self._request.write(']')
self._request.finish()
self._request = None
@@ -108,11 +139,13 @@ class WampHttpResourceSessionReceive(Resource):
self._request = request
def cancel(err):
print "cancelling, request gone"
if self._debug:
log.msg("WAMP session XHR poll request gone (transport ID %s" % self._parent._transportid)
self._request = None
request.notifyFinish().addErrback(cancel)
self._parent._isalive = True
self._trigger()
return NOT_DONE_YET
@@ -124,11 +157,24 @@ class WampHttpResourceSession(Resource, WampProtocol):
A Web resource representing an open WAMP session.
"""
def __init__(self, parent, sessionid):
def __init__(self, parent, transportid, serializer):
"""
Create a new Web resource representing a WAMP session.
:param parent: The WAMP Web base resource.
:type parent: Instance of WampHttpResource.
:param serializer: The WAMP serializer in use for this session.
:type serializer: An instance of WampSerializer.
"""
Resource.__init__(self)
self._parent = parent
self._sessionid = sessionid
self._serializer = parent._serializers[1]
self._debug = self._parent._debug
self.reactor = self._parent.reactor
self._transportid = transportid
self._serializer = serializer
self._send = WampHttpResourceSessionSend(self)
self._receive = WampHttpResourceSessionReceive(self)
@@ -136,11 +182,36 @@ class WampHttpResourceSession(Resource, WampProtocol):
self.putChild("send", self._send)
self.putChild("receive", self._receive)
killAfter = self._parent._killAfter
self._isalive = False
def killIfDead():
if not self._isalive:
if self._debug:
log.msg("killing inactive WAMP session (transport ID %s)" % self._transportid)
self.onClose(False, 5000, "Session inactive")
self._receive._kill()
del self._parent._transports[self._transportid]
else:
if self._debug:
log.msg("WAMP session still alive (transport ID %s)" % self._transportid)
self._isalive = False
self.reactor.callLater(killAfter, killIfDead)
self.reactor.callLater(killAfter, killIfDead)
if self._debug:
log.msg("WAMP session resource initialized (transport ID %s)" % self._transportid)
self.onOpen()
def sendMessage(self, bytes, isBinary):
print "send", bytes
if self._debug:
log.msg("WAMP session send bytes (transport ID %s): %s" % (self._transportid, bytes))
self._receive.queue(bytes)
@@ -151,25 +222,56 @@ class WampHttpResourceOpen(Resource):
"""
def __init__(self, parent):
"""
"""
Resource.__init__(self)
self._parent = parent
self._debug = self._parent._debug
self.reactor = self._parent.reactor
def _failRequest(self, request, msg):
request.setHeader('content-type', 'text/plain; charset=UTF-8')
request.setResponseCode(http.BAD_REQUEST)
return msg
def render_POST(self, request):
"""
Request to create a new WAMP session.
"""
self._parent.setStandardHeaders(request)
payload = request.content.read()
try:
options = json.loads(payload)
except Exception, e:
return
return self._failRequest(request, "could not parse WAMP session open request body [%s]" % e)
if type(options) != dict:
return self._failRequest(request, "invalid type for WAMP session open request [was '%s', expected dictionary]" % type(options))
if not options.has_key('protocols'):
return self._failRequest(request, "missing attribute 'protocols' in WAMP session open request")
protocol = None
for p in options['protocols']:
version, serializerId = parseSubprotocolIdentifier(p)
if version == 2 and serializerId in self._parent._serializers.keys():
serializer = self._parent._serializers[serializerId]
protocol = p
break
request.setHeader('content-type', 'application/json; charset=utf-8')
sessionid = newid()
transportid = newid()
self._parent._sessions[sessionid] = self._parent.protocol(self._parent, sessionid)
## create instance of WampHttpResourceSession or subclass thereof ..
##
self._parent._transports[transportid] = self._parent.protocol(self._parent, transportid, serializer)
ret = {'session': sessionid}
ret = {'transport': transportid, 'protocol': protocol}
return json.dumps(ret)
@@ -179,45 +281,84 @@ class WampHttpResource(Resource):
"""
A WAMP Web base resource.
"""
protocol = WampHttpResourceSession
def __init__(self, serializers, debug = False):
def __init__(self,
serializers = None,
timeout = 10,
killAfter = 30,
queueLimitBytes = 128 * 1024,
queueLimitMessages = 100,
debug = False,
reactor = None):
"""
Create new HTTP WAMP Web resource.
:param serializers: List of WAMP serializers.
:type serializers: List of WampSerializer objects.
:param timeout: XHR polling timeout in seconds.
:type timeout: int
:param killAfter: Kill WAMP session after inactivity in seconds.
:type killAfter: int
:param queueLimitBytes: Kill WAMP session after accumulation of this many bytes in send queue (XHR poll).
:type queueLimitBytes: int
:param queueLimitMessages: Kill WAMP session after accumulation of this many message in send queue (XHR poll).
:type queueLimitMessages: int
:param debug: Enable debug logging.
:type debug: bool
"""
## lazy import to avoid reactor install upon module import
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
Resource.__init__(self)
self._serializers = serializers
self._debug = debug
self._sessions = {
'foo': 23
}
#self._options = {
#}
#if options is not None:
# self._options.update(options)
self._timeout = timeout
self._killAfter = killAfter
self._queueLimitBytes = queueLimitBytes
self._queueLimitMessages = queueLimitMessages
print self.protocol
if serializers is None:
serializers = [WampJsonSerializer()]
self._serializers = {}
for ser in serializers:
self._serializers[ser.SERIALIZER_ID] = ser
self._transports = {}
## <Base URL>/open
##
self.putChild("open", WampHttpResourceOpen(self))
if self._debug:
log.msg("WampHttpResource initialized")
def getChild(self, name, request):
"""
Returns send/receive resource for transport.
print "getChild", name, request.postpath
if name not in self._sessions:
return NoResource("No WAMP session '%s'" % name)
<Base URL>/<Transport ID>/send
<Base URL>/<Transport ID>/receive
"""
if name not in self._transports:
return NoResource("No WAMP transport '%s'" % name)
if len(request.postpath) != 1 or request.postpath[0] not in ['send', 'receive']:
return NoResource("Invalid WAMP session operation '%s'" % request.postpath[0])
return NoResource("Invalid WAMP transport operation '%s'" % request.postpath[0])
sessionid = name
op = request.postpath[0]
return self._transports[name]
print sessionid, op
print self._sessions
res = self._sessions[sessionid]
print res
return res
def setStandardHeaders(self, request):
"""
Set standard HTTP response headers.
"""
origin = request.getHeader("Origin")
if origin is None or origin == "null":
origin = "*"
@@ -228,100 +369,3 @@ class WampHttpResource(Resource):
headers = request.getHeader('Access-Control-Request-Headers')
if headers is not None:
request.setHeader('Access-Control-Allow-Headers', headers)
# def putChild(self, path, child):
# child.parent = self
# Resource.putChild(self, path, child)
# # Just in case somebody wants to mess with these
# self._methods = {
# 'xhr': XHR,
# 'xhr_send': XHRSend,
# 'xhr_streaming': XHRStream,
# 'eventsource': EventSource,
# 'htmlfile': HTMLFile,
# 'jsonp': JSONP,
# 'jsonp_send': JSONPSend,
# }
# self._writeMethods = ('xhr_send','jsonp_send')
# # Static Resources
# self.putChild("info",Info())
# self.putChild("iframe.html",IFrame())
# self.putChild("websocket",RawWebSocket())
# # Since it's constant, we can declare the websocket handler up here
# self._websocket = WebSocket()
# self._websocket.parent = self
# def getChild(self, name, request):
# # Check if it is the greeting url
# if not name and not request.postpath:
# return self
# # Hacks to resove the iframe even when people are dumb
# if len(name) > 10 and name[:6] == "iframe" and name[-5:] == ".html":
# return self.children["iframe.html"]
# # Sessions must have 3 parts, name is already the first. Also, no periods in the loadbalancer
# if len(request.postpath) != 2 or "." in name or not name:
# return resource.NoResource("No such child resource.")
# # Extract session & request type. Discard load balancer
# session, name = request.postpath
# # No periods in the session
# if "." in session or not session:
# return resource.NoResource("No such child resource.")
# # Websockets are a special case
# if name == "websocket":
# return self._websocket
# # Reject invalid methods
# if name not in self._methods:
# return resource.NoResource("No such child resource.")
# # Reject writes to invalid sessions, unless just checking options
# if name in self._writeMethods and session not in self._sessions and request.method != "OPTIONS":
# return resource.NoResource("No such child resource.")
# # Generate session if doesn't exist, unless just checking options
# if session not in self._sessions and request.method != "OPTIONS":
# self._sessions[session] = Stub(self, session)
# # Delegate request to appropriate handler
# return self._methods[name](self, self._sessions[session] if request.method != "OPTIONS" else None)
# def setBaseHeaders(self, request, cookie=True):
# origin = request.getHeader("Origin")
# headers = request.getHeader('Access-Control-Request-Headers')
# if origin is None or origin == "null":
# origin = "*"
# request.setHeader('access-control-allow-origin', origin)
# request.setHeader('access-control-allow-credentials', 'true')
# request.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
# if headers is not None:
# request.setHeader('Access-Control-Allow-Headers', headers)
# if self._options["cookie_needed"] and cookie:
# cookie = request.getCookie("JSESSIONID") if request.getCookie("JSESSIONID") else "dummy"
# request.addCookie("JSESSIONID", cookie, path="/")
def render_GET(self, request):
# self.setBaseHeaders(request,False)
request.setHeader('content-type', 'text/plain; charset=UTF-8')
return "Welcome to SockJS!\n"
def render_POST(self, request):
#return "Hello"
try:
payload = request.content.read()
options = json.loads(payload)
except Exception, e:
return
print options
def finish():
request.write("Hello delayed")
request.finish()
def cancel(err, call):
print "cancelling, request gone"
call.cancel()
call = reactor.callLater(1, finish)
request.notifyFinish().addErrback(cancel, call)
return NOT_DONE_YET

View File

@@ -42,6 +42,19 @@ from message import WampMessageHello, \
from error import WampProtocolError
def parseSubprotocolIdentifier(subprotocol):
try:
s = subprotocol.split('.')
if s[0] != "wamp":
raise Exception("invalid protocol %s" % s[0])
version = int(s[1])
serializerId = s[2]
return version, serializerId
except:
return None, None
class WampProtocol:
def sendWampMessage(self, msg):
@@ -117,6 +130,8 @@ class WampProtocol:
def onClose(self, wasClean, code, reason):
self.setBroker(None)
self.setDealer(None)
self.onSessionClose()

View File

@@ -30,30 +30,17 @@ from autobahn.websocket import WebSocketServerProtocol, \
from autobahn.websocket import HttpException
from autobahn.httpstatus import HTTP_STATUS_CODE_BAD_REQUEST
from protocol import WampProtocol
from protocol import WampProtocol, parseSubprotocolIdentifier
from serializer import WampJsonSerializer, WampMsgPackSerializer
def _parse_subprotocol(subprotocol):
try:
s = subprotocol.split('.')
if s[0] != "wamp":
raise Exception()
version = int(s[1])
serializerId = s[2]
return version, serializerId
except:
return None, None
class WampWebSocketServerProtocol(WampProtocol, WebSocketServerProtocol):
def onConnect(self, connectionRequest):
headers = {}
for subprotocol in connectionRequest.protocols:
version, serializerId = _parse_subprotocol(subprotocol)
version, serializerId = parseSubprotocolIdentifier(subprotocol)
if version == 2 and serializerId in self.factory._serializers.keys():
self._serializer = self.factory._serializers[serializerId]
return subprotocol, headers
@@ -69,7 +56,7 @@ class WampWebSocketClientProtocol(WampProtocol, WebSocketClientProtocol):
if connectionResponse.protocol not in self.factory.protocols:
raise Exception("Server does not speak any of the WebSocket subprotocols we requested (%s)." % ', '.join(self.factory.protocols))
version, serializerId = _parse_subprotocol(connectionResponse.protocol)
version, serializerId = parseSubprotocolIdentifier(connectionResponse.protocol)
self._serializer = self.factory._serializers[serializerId]

View File

@@ -7,18 +7,18 @@
window.onload = function() {
console.log("test");
var options = {'subprotocols': ['wamp.2.msgpack', 'wamp.2.json']};
var options = {'protocols': ['wamp.2.msgpack', 'wamp.2.json']};
httppost('http://127.0.0.1:8080/wamp/open', options).then(
function (res) {
console.log("ok", res);
console.log(res.session);
console.log(res.transport);
httppost('http://127.0.0.1:8080/wamp/' + res.session + '/send', [1, "ksjdfsdf"]);
httppost('http://127.0.0.1:8080/wamp/' + res.transport + '/send', [1, "ksjdfsdf"]);
var msg = [3, "kshdfsdf", "http://example.com/myEvent1"];
httppost('http://127.0.0.1:8080/wamp/' + res.session + '/send', msg).then(
httppost('http://127.0.0.1:8080/wamp/' + res.transport + '/send', msg).then(
function (res) {
console.log("ok 2", res);
@@ -29,7 +29,7 @@
);
function poll() {
httppost('http://127.0.0.1:8080/wamp/' + res.session + '/receive').then(
httppost('http://127.0.0.1:8080/wamp/' + res.transport + '/receive').then(
function (res) {
console.log("poll ok", res);
poll();

View File

@@ -27,17 +27,43 @@ from autobahn.wamp2.http import WampHttpResourceSession, \
WampHttpResource
from autobahn.wamp2.protocol import WampProtocol
class MyWampSession(WampProtocol):
def __init__(self, broker):
self._broker = broker
def onSessionOpen(self):
self.setBroker(self._broker)
class MyWampSessionFactory:
def __init__(self):
self._broker = Broker()
def createSession(self):
session = MyWampSession(self._broker)
return session
class MyPubSubResourceSession(WampHttpResourceSession):
def onSessionOpen(self):
self.setBroker(self._parent._broker)
def onSessionClose(self):
print "SESSION CLOSED"
class MyPubSubResource(WampHttpResource):
protocol = MyPubSubResourceSession
def __init__(self, serializers, broker, debug = False):
def __init__(self, serializers, broker, debug = True):
WampHttpResource.__init__(self, serializers = serializers, debug = debug)
self._broker = broker
@@ -76,14 +102,16 @@ if __name__ == '__main__':
broker = Broker()
serializers = [WampMsgPackSerializer(), WampJsonSerializer()]
jsonSerializer = WampJsonSerializer()
serializers = [WampMsgPackSerializer(), jsonSerializer]
wampfactory = PubSubServerFactory("ws://localhost:9000", serializers, broker, debug = False)
wampserver = serverFromString(reactor, "tcp:9000")
wampserver.listen(wampfactory)
wampResource = MyPubSubResource(serializers, broker)
wampResource = MyPubSubResource([jsonSerializer], broker)
root = File("longpoll")
root.putChild("wamp", wampResource)

View File

@@ -4,13 +4,13 @@ function httppost(url, data) {
req.onreadystatechange = function (evt) {
console.log("onreadystatechange", evt, req.readyState);
/* console.log("onreadystatechange", evt, req.readyState);
console.log(req.readyState);
console.log(req.response);
console.log(req.responseText);
console.log(req.responseType);
*/
if (req.readyState === 4) {
if (req.status === 200) {

21
examples/wamp2/test2.py Normal file
View File

@@ -0,0 +1,21 @@
# http://docs.python.org/2/library/inspect.html
# https://github.com/dotcloud/zerorpc-python/blob/master/zerorpc/decorators.py
import inspect
def add2(a, b = 3, *args, **kwargs):
"""
Adds 2 numbers.
:param a: First number.
:type a: int
:param b: Second number.
:type b: int
:returns: int -- The sum of both numbers.
"""
return a + b
print inspect.getargspec(add2)
print inspect.cleandoc(add2.__doc__)