This commit is contained in:
Tobias Oberstein
2012-01-25 17:13:43 +01:00
parent 3bfa865128
commit 178167f9ca

View File

@@ -245,29 +245,29 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
WebSocketServerProtocol.connectionLost(self, reason)
def _getPubHandler(self, topicuri):
def _getPubHandler(self, topicUri):
## Longest matching prefix based resolution of (full) topic URI to
## publication handler.
## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch)
##
for i in xrange(len(topicuri), -1, -1):
tt = topicuri[:i]
for i in xrange(len(topicUri), -1, -1):
tt = topicUri[:i]
if self.pubHandlers.has_key(tt):
h = self.pubHandlers[tt]
return (tt, topicuri[i:], h[0], h[1], h[2])
return (tt, topicUri[i:], h[0], h[1], h[2])
return None
def _getSubHandler(self, topicuri):
def _getSubHandler(self, topicUri):
## Longest matching prefix based resolution of (full) topic URI to
## subscription handler.
## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch)
##
for i in xrange(len(topicuri), -1, -1):
tt = topicuri[:i]
for i in xrange(len(topicUri), -1, -1):
tt = topicUri[:i]
if self.subHandlers.has_key(tt):
h = self.subHandlers[tt]
return (tt, topicuri[i:], h[0], h[1], h[2])
return (tt, topicUri[i:], h[0], h[1], h[2])
return None
@@ -407,7 +407,7 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
log.msg("registered remote procedure on %s" % uri)
def dispatch(self, topicuri, event, exclude = [], eligible = None):
def dispatch(self, topicUri, event, exclude = [], eligible = None):
"""
Dispatch an event for a topic to all clients subscribed to
and authorized for that topic.
@@ -418,8 +418,8 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
(subscribers - excluded) & eligible
:param topicuri: URI of topic to publish event to.
:type topicuri: str
:param topicUri: URI of topic to publish event to.
:type topicUri: str
:param event: Event to dispatch.
:type event: obj
:param exclude: Optional list of clients (protocol instances) to exclude.
@@ -427,7 +427,7 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
:param eligible: Optional list of clients (protocol instances) eligible at all (or None for all).
:type eligible: list of obj
"""
self.factory._dispatchEvent(topicuri, event, exclude, eligible)
self.factory._dispatchEvent(topicUri, event, exclude, eligible)
def _callProcedure(self, uri, arg = None):
@@ -487,7 +487,7 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
elif leargs == 1:
if type(eargs[0]) not in [str, unicode]:
raise Exception("invalid type %s for errorDesc" % str(type(eargs[0])))
raise Exception("invalid type %s for errorDesc" % type(eargs[0]))
erroruri = WampProtocol.ERROR_URI_GENERIC
errordesc = eargs[0]
errordetails = None
@@ -497,10 +497,10 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
elif leargs in [2, 3]:
if type(eargs[0]) not in [str, unicode]:
raise Exception("invalid type %s for errorUri" % str(type(eargs[0])))
raise Exception("invalid type %s for errorUri" % type(eargs[0]))
erroruri = eargs[0]
if type(eargs[1]) not in [str, unicode]:
raise Exception("invalid type %s for errorDesc" % str(type(eargs[1])))
raise Exception("invalid type %s for errorDesc" % type(eargs[1]))
errordesc = eargs[1]
if leargs > 2:
errordetails = eargs[2] # this must be JSON serializable .. if not, we get exception later in sendMessage
@@ -557,15 +557,15 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
## Subscribe Message
##
elif obj[0] == WampProtocol.MESSAGE_TYPEID_SUBSCRIBE:
topicuri = self.prefixes.resolveOrPass(obj[1])
h = self._getSubHandler(topicuri)
topicUri = self.prefixes.resolveOrPass(obj[1])
h = self._getSubHandler(topicUri)
if h:
## either exact match or prefix match allowed
if h[1] == "" or h[4]:
## direct topic
if h[2] is None and h[3] is None:
self.factory._subscribeClient(self, topicuri)
self.factory._subscribeClient(self, topicUri)
## topic handled by subscription handler
else:
@@ -580,28 +580,28 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
## only subscribe client if handler did return True
if a:
self.factory._subscribeClient(self, topicuri)
self.factory._subscribeClient(self, topicUri)
except:
if self.debugWamp:
log.msg("execption during topic subscription handler")
else:
if self.debugWamp:
log.msg("topic %s matches only by prefix and prefix match disallowed" % topicuri)
log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri)
else:
if self.debugWamp:
log.msg("no topic / subscription handler registered for %s" % topicuri)
log.msg("no topic / subscription handler registered for %s" % topicUri)
## Unsubscribe Message
##
elif obj[0] == WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE:
topicuri = self.prefixes.resolveOrPass(obj[1])
self.factory._unsubscribeClient(self, topicuri)
topicUri = self.prefixes.resolveOrPass(obj[1])
self.factory._unsubscribeClient(self, topicUri)
## Publish Message
##
elif obj[0] == WampProtocol.MESSAGE_TYPEID_PUBLISH:
topicuri = self.prefixes.resolveOrPass(obj[1])
h = self._getPubHandler(topicuri)
topicUri = self.prefixes.resolveOrPass(obj[1])
h = self._getPubHandler(topicUri)
if h:
## either exact match or prefix match allowed
if h[1] == "" or h[4]:
@@ -620,7 +620,7 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
## direct topic
if h[2] is None and h[3] is None:
self.factory._dispatchEvent(topicuri, event, exclude)
self.factory._dispatchEvent(topicUri, event, exclude)
## topic handled by publication handler
else:
@@ -635,16 +635,16 @@ class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
## only dispatch event if handler did return event
if e:
self.factory._dispatchEvent(topicuri, e, exclude)
self.factory._dispatchEvent(topicUri, e, exclude)
except:
if self.debugWamp:
log.msg("execption during topic publication handler")
else:
if self.debugWamp:
log.msg("topic %s matches only by prefix and prefix match disallowed" % topicuri)
log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri)
else:
if self.debugWamp:
log.msg("no topic / publication handler registered for %s" % topicuri)
log.msg("no topic / publication handler registered for %s" % topicUri)
## Define prefix to be used in CURIEs
##
@@ -675,25 +675,25 @@ class WampServerFactory(WebSocketServerFactory):
self.debugWamp = debugWamp
def _subscribeClient(self, proto, topicuri):
def _subscribeClient(self, proto, topicUri):
## Internal method called from proto to subscribe client for topic.
if self.debugWamp:
log.msg("subscribed peer %s for topic %s" % (proto.peerstr, topicuri))
log.msg("subscribed peer %s for topic %s" % (proto.peerstr, topicUri))
if not self.subscriptions.has_key(topicuri):
self.subscriptions[topicuri] = set()
self.subscriptions[topicuri].add(proto)
if not self.subscriptions.has_key(topicUri):
self.subscriptions[topicUri] = set()
self.subscriptions[topicUri].add(proto)
def _unsubscribeClient(self, proto, topicuri = None):
def _unsubscribeClient(self, proto, topicUri = None):
## Internal method called from proto to unsubscribe client from topic.
if topicuri:
if self.subscriptions.has_key(topicuri):
self.subscriptions[topicuri].discard(proto)
if topicUri:
if self.subscriptions.has_key(topicUri):
self.subscriptions[topicUri].discard(proto)
if self.debugWamp:
log.msg("unsubscribed peer %s from topic %s" % (proto.peerstr, topicuri))
log.msg("unsubscribed peer %s from topic %s" % (proto.peerstr, topicUri))
else:
for t in self.subscriptions:
self.subscriptions[t].discard(proto)
@@ -701,13 +701,13 @@ class WampServerFactory(WebSocketServerFactory):
log.msg("unsubscribed peer %s from all topics" % (proto.peerstr))
def _dispatchEvent(self, topicuri, event, exclude = [], eligible = None):
def _dispatchEvent(self, topicUri, event, exclude = [], eligible = None):
"""
Internal method called from proto to publish an received event
to all peers subscribed to the event topic.
:param topicuri: Topic to publish event to.
:type topicuri: str
:param topicUri: Topic to publish event to.
:type topicUri: str
:param event: Event to publish (must be JSON serializable).
:type event: obj
:param exclude: List of protocol instances to exclude from receivers.
@@ -720,13 +720,13 @@ class WampServerFactory(WebSocketServerFactory):
receivers, and requested = number of (subscribers - excluded) & eligible.
"""
if self.debugWamp:
log.msg("publish event %s for topicuri %s" % (str(event), topicuri))
log.msg("publish event %s for topicUri %s" % (str(event), topicUri))
d = Deferred()
if self.subscriptions.has_key(topicuri) and len(self.subscriptions[topicuri]) > 0:
if self.subscriptions.has_key(topicUri) and len(self.subscriptions[topicUri]) > 0:
o = [WampProtocol.MESSAGE_TYPEID_EVENT, topicuri, event]
o = [WampProtocol.MESSAGE_TYPEID_EVENT, topicUri, event]
try:
msg = json.dumps(o)
if self.debugWamp:
@@ -741,9 +741,9 @@ class WampServerFactory(WebSocketServerFactory):
## However, see http://twistedmatrix.com/trac/ticket/1396
if eligible is not None:
subscrbs = set(eligible) & self.subscriptions[topicuri]
subscrbs = set(eligible) & self.subscriptions[topicUri]
else:
subscrbs = self.subscriptions[topicuri]
subscrbs = self.subscriptions[topicUri]
if len(exclude) > 0:
recvs = subscrbs - set(exclude)
@@ -958,10 +958,10 @@ class WampClientProtocol(WebSocketClientProtocol, WampProtocol):
self._protocolError("invalid type for topicid in event message")
return
unresolvedTopicUri = str(obj[1])
topicuri = self.prefixes.resolveOrPass(unresolvedTopicUri)
if self.subscriptions.has_key(topicuri):
topicUri = self.prefixes.resolveOrPass(unresolvedTopicUri)
if self.subscriptions.has_key(topicUri):
event = obj[2]
self.subscriptions[topicuri](topicuri, event)
self.subscriptions[topicUri](topicUri, event)
elif msgtype == WampProtocol.MESSAGE_TYPEID_WELCOME:
if len(obj) != 2:
self._protocolError("event message invalid length")
@@ -1034,27 +1034,27 @@ class WampClientProtocol(WebSocketClientProtocol, WampProtocol):
self.sendMessage(json.dumps(msg))
def publish(self, topicuri, event, excludeMe = True):
def publish(self, topicUri, event, excludeMe = True):
"""
Publish an event under a topic URI. The latter may be abbreviated using a
CURIE which has been previously defined using prefix(). The event must
be JSON serializable.
:param topicuri: The topic URI or CURIE.
:type topicuri: str
:param topicUri: The topic URI or CURIE.
:type topicUri: str
:param event: Event to be published (must be JSON serializable) or None.
:type event: value
:param excludeMe: When True, don't deliver the published event to myself (when I'm subscribed). Default: True.
:type excludeMe: bool
"""
if type(topicuri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % str(type(topicUri)))
if type(topicUri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
if type(excludeMe) != bool:
raise Exception("invalid type for parameter 'excludeMe' - must be bool (was %s)" % str(type(excludeMe)))
raise Exception("invalid type for parameter 'excludeMe' - must be bool (was %s)" % type(excludeMe))
msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicuri, event, excludeMe]
msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, excludeMe]
try:
o = json.dumps(msg)
@@ -1064,45 +1064,45 @@ class WampClientProtocol(WebSocketClientProtocol, WampProtocol):
self.sendMessage(o)
def subscribe(self, topicuri, handler):
def subscribe(self, topicUri, handler):
"""
Subscribe to topic. When already subscribed, will overwrite the handler.
:param topicuri: URI or CURIE of topic to subscribe to.
:type topicuri: str
:param topicUri: URI or CURIE of topic to subscribe to.
:type topicUri: str
:param handler: Event handler to be invoked upon receiving events for topic.
:type handler: Python callable, will be called as in <callable>(eventUri, event).
"""
if type(topicuri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % str(type(topicUri)))
if type(topicUri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
if type(handler) not in [types.FunctionType, types.MethodType, types.BuiltinFunctionType, types.BuiltinMethodType]:
raise Exception("invalid type for parameter 'handler' - must be a callable (was %s)" % str(type(handler)))
raise Exception("invalid type for parameter 'handler' - must be a callable (was %s)" % type(handler))
turi = self.prefixes.resolveOrPass(topicuri)
turi = self.prefixes.resolveOrPass(topicUri)
if not self.subscriptions.has_key(turi):
msg = [WampProtocol.MESSAGE_TYPEID_SUBSCRIBE, topicuri]
msg = [WampProtocol.MESSAGE_TYPEID_SUBSCRIBE, topicUri]
o = json.dumps(msg)
self.sendMessage(o)
self.subscriptions[turi] = handler
def unsubscribe(self, topicuri):
def unsubscribe(self, topicUri):
"""
Unsubscribe from topic. Will do nothing when currently not subscribed to the topic.
:param topicuri: URI or CURIE of topic to unsubscribe from.
:type topicuri: str
:param topicUri: URI or CURIE of topic to unsubscribe from.
:type topicUri: str
"""
if type(topicuri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % str(type(topicUri)))
if type(topicUri) not in [unicode, str]:
raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
turi = self.prefixes.resolveOrPass(topicuri)
turi = self.prefixes.resolveOrPass(topicUri)
if self.subscriptions.has_key(turi):
msg = [WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE, topicuri]
msg = [WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE, topicUri]
o = json.dumps(msg)
self.sendMessage(o)
del self.subscriptions[topicuri]
del self.subscriptions[turi]
class WampClientFactory(WebSocketClientFactory):