improve API

This commit is contained in:
Tobias Oberstein
2014-01-30 17:51:08 +01:00
parent 30f82acd52
commit 66459232a0
10 changed files with 511 additions and 185 deletions

View File

@@ -26,7 +26,6 @@ publish: clean
test:
trial.py autobahn.wamp.tests
# trial.py autobahn.wamp.tests.test_interfaces
# trial.py autobahn.wamp.tests.test_message
# trial.py autobahn.wamp.tests.test_protocol
# trial.py autobahn.wamp.tests.test_protocol_peer

View File

@@ -292,6 +292,39 @@ class ICaller(ISession):
class IRegistration(Interface):
"""
Represents a registration.
"""
id = Attribute("The WAMP registration ID for this registration.")
active = Attribute("Flag indicating if registration is active.")
def unregister():
"""
Unregister this registration that was previously created from
:func:`autobahn.wamp.interfaces.ICallee.register`.
After a registration has been unregistered, calls won't get routed
to the endpoint any more.
This will return a deferred/future, that when resolved signals
successful unregistration.
If the unregistration fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp.error.ApplicationError`.
:param registration: The registration to unregister from.
:type registration: An instance of :class:`autobahn.wamp.types.Registration`
that was previously registered.
:returns: obj -- A deferred/future for the unregistration -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
"""
class ICallee(ISession):
"""
Interface for WAMP peers implementing role "Callee".
@@ -323,27 +356,12 @@ class ICallee(ISession):
"""
def unregister(registration):
"""
Unregister the endpoint registration that was previously registered.
After a registration has been unregistered, calls won't get routed
to the endpoint any more.
class IPublication(Interface):
"""
"""
id = Attribute("The WAMP publication ID for this publication.")
This will return a deferred/future, that when resolved signals
successful unregistration.
If the unregistration fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp.error.ApplicationError`.
:param registration: The registration to unregister from.
:type registration: An instance of :class:`autobahn.wamp.types.Registration`
that was previously registered.
:returns: obj -- A deferred/future for the unregistration -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
"""
@@ -356,20 +374,57 @@ class IPublisher(ISession):
"""
Publish an event to a topic.
This will return a deferred/future, that when resolved provides the publication ID
for the published event.
If `kwargs` contains an `options` keyword argument that is an instance of
:class:`autobahn.wamp.types.PublishOptions`, this will provide
specific options for the publish to perform.
If the publication fails, the returned deferred/future will be rejected with an instance
of :class:`autobahn.wamp.error.ApplicationError`.
If publication acknowledgement is requested via `options.acknowledgement == True`,
this function returns a Deferred/Future:
- if the publication succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.IPublication`.
- if the publication fails the Deferred/Future will reject with an instance
of :class:`autobahn.error.RuntimeError`.
:param topic: The URI of the topic to publish to, e.g. "com.myapp.mytopic1".
:type topic: str
:param payload: The payload for the event to be published.
:type payload: obj
:param options: Options for publishing.
:type options: None or an instance of :class:`autobahn.wamp.types.PublishOptions`
:param args: Arbitrary application payload for the event (positional arguments).
:type args: list
:param kwargs: Arbitrary application payload for the event (keyword arguments).
:type kwargs: dict
:returns: obj -- A deferred/future for the publication -
:returns: obj -- `None` for non-acknowledged publications or,
for non-acknowledged publications, an instance of
:class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
"""
class ISubscription(Interface):
"""
Represents a subscription.
"""
id = Attribute("The WAMP subscription ID for this subscription.")
active = Attribute("Flag indicating if subscription is active.")
def unsubscribe():
"""
Unsubscribe this subscription that was previously created from
:func:`autobahn.wamp.interfaces.ISubscriber.subscribe`.
After a subscription has been unsubscribed, events won't get
routed to the handler anymore.
This will return a deferred/future, that when resolved signals
successful unsubscription.
If the unsubscription fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp.error.ApplicationError`.
:returns: obj -- A deferred/future for the unsubscription -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
"""
@@ -389,9 +444,12 @@ class ISubscriber(ISession):
then `topic` must be provided and an instance of
:class:`twisted.internet.defer.Deferred` (when running on Twisted) or an instance
of :class:`asyncio.Future` (when running on asyncio) is returned.
If the subscription succeeds the Deferred/Future will resolve to an instance
of :class:`autobahn.wamp.types.Subscription`. If the subscription fails the
Deferred/Future will reject with an instance of :class:`autobahn.error.RuntimeError`.
If the subscription succeeds the Deferred/Future will resolve to an object
that implements :class:`autobahn.wamp.interfaces.ISubscription`.
If the subscription fails the Deferred/Future will reject with an instance
of :class:`autobahn.error.RuntimeError`.
If `handler` is an object, then each of the object's methods that are decorated
with :func:`autobahn.wamp.topic` are subscribed as event handlers, and a list of
@@ -413,31 +471,6 @@ class ISubscriber(ISession):
"""
def unsubscribe(subscription):
"""
Unsubscribe a subscription that was previously created with
:func:`autobahn.wamp.interfaces.ISubscriber.subscribe`.
After a subscription has been unsubscribed, watchers won't get notified
any more, and you cannot use the subscription anymore.
This will return a deferred/future, that when resolved signales
successful unsubscription.
If the unsubscription fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp.error.ApplicationError`.
:param subscription: The subscription to unscribe from.
:type subscription: An instance of :class:`autobahn.wamp.types.Subscription`
that was previously subscribed.
:returns: obj -- A (list of) Deferred(s)/Future(s) for the unsubscription(s) -
instance(s) of :class:`twisted.internet.defer.Deferred` (when
running under Twisted) or instance(s) of :class:`asyncio.Future`
(when running under asyncio).
"""
class IRouter(Interface):
"""

View File

@@ -23,9 +23,12 @@ from zope.interface import implementer
from twisted.internet.defer import Deferred, maybeDeferred
from autobahn.wamp.interfaces import ISession, \
IPublication, \
IPublisher, \
ISubscription, \
ISubscriber, \
ICaller, \
IRegistration, \
ICallee, \
ITransportHandler
@@ -44,6 +47,8 @@ from autobahn.wamp.dealer import Dealer
class Endpoint:
"""
"""
def __init__(self, fn, details_arg = None):
self.fn = fn
@@ -52,6 +57,8 @@ class Endpoint:
class Handler:
"""
"""
def __init__(self, fn, details_arg = None):
self.fn = fn
@@ -59,6 +66,55 @@ class Handler:
@implementer(IPublication)
class Publication:
"""
Object representing a publication.
This class implements :class:`autobahn.wamp.interfaces.IPublication`.
"""
def __init__(self, id):
self.id = id
@implementer(ISubscription)
class Subscription:
"""
Object representing a subscription.
This class implements :class:`autobahn.wamp.interfaces.ISubscription`.
"""
def __init__(self, session, id):
self._session = session
self.active = True
self.id = id
def unsubscribe(self):
"""
Implements :func:`autobahn.wamp.interfaces.ISubscription.unsubscribe`
"""
return self._session._unsubscribe(self)
@implementer(IRegistration)
class Registration:
"""
Object representing a registration.
This class implements :class:`autobahn.wamp.interfaces.IRegistration`.
"""
def __init__(self, session, id):
self._session = session
self.active = True
self.id = id
def unregister(self):
"""
Implements :func:`autobahn.wamp.interfaces.IRegistration.unregister`
"""
return self._session._unregister(self)
@implementer(ISession)
class WampBaseSession:
"""
@@ -342,7 +398,8 @@ class WampAppSession(WampBaseSession):
if msg.request in self._publish_reqs:
d, opts = self._publish_reqs.pop(msg.request)
d.callback(msg.publication)
p = Publication(msg.publication)
d.callback(p)
else:
raise ProtocolError("PUBLISHED received for non-pending request ID {}".format(msg.request))
@@ -354,7 +411,8 @@ class WampAppSession(WampBaseSession):
self._subscriptions[msg.subscription] = Handler(fn, options.details_arg)
else:
self._subscriptions[msg.subscription] = Handler(fn)
d.callback(msg.subscription)
s = Subscription(self, msg.subscription)
d.callback(s)
else:
raise ProtocolError("SUBSCRIBED received for non-pending request ID {}".format(msg.request))
@@ -362,8 +420,9 @@ class WampAppSession(WampBaseSession):
if msg.request in self._unsubscribe_reqs:
d, subscription = self._unsubscribe_reqs.pop(msg.request)
if subscription in self._subscriptions:
del self._subscriptions[subscription]
if subscription.id in self._subscriptions:
del self._subscriptions[subscription.id]
subscription.active = False
d.callback(None)
else:
raise ProtocolError("UNSUBSCRIBED received for non-pending request ID {}".format(msg.request))
@@ -502,7 +561,8 @@ class WampAppSession(WampBaseSession):
self._registrations[msg.registration] = Endpoint(fn, options.details_arg)
else:
self._registrations[msg.registration] = Endpoint(fn)
d.callback(msg.registration)
r = Registration(self, msg.registration)
d.callback(r)
else:
raise ProtocolError("REGISTERED received for non-pending request ID {}".format(msg.request))
@@ -510,8 +570,9 @@ class WampAppSession(WampBaseSession):
if msg.request in self._unregister_reqs:
d, registration = self._unregister_reqs.pop(msg.request)
if registration in self._registrations:
del self._registrations[registration]
if registration.id in self._registrations:
del self._registrations[registration.id]
registration.active = False
d.callback(None)
else:
raise ProtocolError("UNREGISTERED received for non-pending request ID {}".format(msg.request))
@@ -658,12 +719,13 @@ class WampAppSession(WampBaseSession):
return d
def unsubscribe(self, subscription):
def _unsubscribe(self, subscription):
"""
Implements :func:`autobahn.wamp.interfaces.ISubscriber.unsubscribe`
Called from :meth:`autobahn.wamp.protocol.Subscription.unsubscribe`
"""
assert(type(subscription) in [int, long])
assert(subscription in self._subscriptions)
assert(isinstance(subscription, Subscription))
assert(subscription.active)
assert(subscription.id in self._subscriptions)
if not self._transport:
raise exception.TransportLost()
@@ -673,7 +735,7 @@ class WampAppSession(WampBaseSession):
d = Deferred()
self._unsubscribe_reqs[request] = (d, subscription)
msg = message.Unsubscribe(request, subscription)
msg = message.Unsubscribe(request, subscription.id)
self._transport.send(msg)
return d
@@ -733,12 +795,13 @@ class WampAppSession(WampBaseSession):
return d
def unregister(self, registration):
def _unregister(self, registration):
"""
Implements :func:`autobahn.wamp.interfaces.ICallee.unregister`
Called from :meth:`autobahn.wamp.protocol.Registration.unregister`
"""
assert(type(registration) in [int, long])
assert(registration in self._registrations)
assert(isinstance(registration, Registration))
assert(registration.active)
assert(registration.id in self._registrations)
if not self._transport:
raise exception.TransportLost()
@@ -748,7 +811,7 @@ class WampAppSession(WampBaseSession):
d = Deferred()
self._unregister_reqs[request] = (d, registration)
msg = message.Unregister(request, registration)
msg = message.Unregister(request, registration.id)
self._transport.send(msg)
return d

View File

@@ -150,19 +150,19 @@ class TestPublisher(unittest.TestCase):
transport = MockTransport(handler)
publication = yield handler.publish('com.myapp.topic1', options = types.PublishOptions(acknowledge = True))
self.assertTrue(type(publication) in (int, long))
self.assertTrue(type(publication.id) in (int, long))
publication = yield handler.publish('com.myapp.topic1', 1, 2, 3, options = types.PublishOptions(acknowledge = True))
self.assertTrue(type(publication) in (int, long))
self.assertTrue(type(publication.id) in (int, long))
publication = yield handler.publish('com.myapp.topic1', 1, 2, 3, foo = 23, bar = 'hello', options = types.PublishOptions(acknowledge = True))
self.assertTrue(type(publication) in (int, long))
self.assertTrue(type(publication.id) in (int, long))
publication = yield handler.publish('com.myapp.topic1', options = types.PublishOptions(excludeMe = False, acknowledge = True))
self.assertTrue(type(publication) in (int, long))
self.assertTrue(type(publication.id) in (int, long))
publication = yield handler.publish('com.myapp.topic1', 1, 2, 3, foo = 23, bar = 'hello', options = types.PublishOptions(excludeMe = False, exclude = [100, 200, 300], acknowledge = True))
self.assertTrue(type(publication) in (int, long))
self.assertTrue(type(publication.id) in (int, long))
@inlineCallbacks
@@ -236,10 +236,10 @@ class TestPublisher(unittest.TestCase):
print "got event"
subscription = yield handler.subscribe(on_event, 'com.myapp.topic1')
self.assertTrue(type(subscription) in (int, long))
self.assertTrue(type(subscription.id) in (int, long))
subscription = yield handler.subscribe(on_event, 'com.myapp.topic1', options = types.SubscribeOptions(match = 'wildcard'))
self.assertTrue(type(subscription) in (int, long))
self.assertTrue(type(subscription.id) in (int, long))
@inlineCallbacks
@@ -251,7 +251,7 @@ class TestPublisher(unittest.TestCase):
print "got event"
subscription = yield handler.subscribe(on_event, 'com.myapp.topic1')
yield handler.unsubscribe(subscription)
yield subscription.unsubscribe()
@inlineCallbacks
@@ -263,10 +263,10 @@ class TestPublisher(unittest.TestCase):
print "got call"
registration = yield handler.register(on_call, 'com.myapp.procedure1')
self.assertTrue(type(registration) in (int, long))
self.assertTrue(type(registration.id) in (int, long))
registration = yield handler.register(on_call, 'com.myapp.procedure1', options = types.RegisterOptions(pkeys = [0, 1, 2]))
self.assertTrue(type(registration) in (int, long))
self.assertTrue(type(registration.id) in (int, long))
@inlineCallbacks
@@ -278,7 +278,7 @@ class TestPublisher(unittest.TestCase):
print "got call"
registration = yield handler.register(on_call, 'com.myapp.procedure1')
yield handler.unregister(registration)
yield registration.unregister()
@inlineCallbacks

View File

@@ -247,101 +247,3 @@ class CallResult:
def __str__(self):
return "CallResult(results = {}, kwresults = {})".format(self.results, self.kwresults)
class Registration:
"""
"""
def __init__(self, id, procedure, endpoint):
self._id = id
self._procedure = procedure
self._endpoint = endpoint
self._isActive = True
class Invocation:
"""
"""
def __init__(self, caller = None):
self.caller = caller
def progress(self, *args, **kwargs):
pass
class Subscription:
"""
"""
def __init__(self, id, topic):
self._id = id
self._topic = topic
self._watchers = []
self._isActive = True
def watch(self, watcher):
"""
Adds a watcher to the subscription.
If the given watcher is already watching, silently ignore the call. Otherwise
add the watcher (which must be a callable) to the list of watchers.
:param watcher: The watcher who should be notified upon receiving events on the
given subscription. This must be a callable which will get called
with the topic and event payload as arguments upon receiving of
events.
:type watcher: callable
"""
assert(self._isActive)
assert(callable(watcher))
if not watcher in self._watchers:
self._watchers.append(watcher)
def unwatch(self, watcher = None):
"""
Remove a watcher from the subscription.
If the given watcher is no watching, silently ignore the call. Otherwise
remote the watcher from the list of watchers.
:param watcher: The watcher who should be removed from the list of current watchers
or None to remove all watchers.
:type watcher: callable
"""
assert(self._isActive)
if watcher:
if watcher in self._watchers:
self._watchers.remove(watcher)
else:
self._watchers = []
def notify(self, event):
"""
Notify all current watcher for this subscription.
Watchers will be notified in the order how they were added to this subscription.
:param topic: The URI of the topic.
:type topic: str
:param event: The event (payload).
:type event: obj
"""
assert(self._isActive)
assert(isinstance(event, Event))
for watcher in self._watchers:
watcher(event)

View File

@@ -105,6 +105,23 @@ Router
Protocol
--------
.. autoclass:: autobahn.wamp.protocol.Publication
:show-inheritance:
:members: __init__
.. autoclass:: autobahn.wamp.protocol.Subscription
:show-inheritance:
:members: __init__,
unsubscribe
.. autoclass:: autobahn.wamp.protocol.Registration
:show-inheritance:
:members: __init__,
unregister
.. autoclass:: autobahn.wamp.protocol.WampBaseSession
:show-inheritance:
:members: define

View File

@@ -49,6 +49,11 @@ all:
@echo " client_pubsub_options_frontend"
@echo " client_pubsub_options_backend"
@echo ""
@echo "PubSub Unsubscribe:"
@echo " server_with_pubsub_unsubscribe_backend"
@echo " client_pubsub_unsubscribe_frontend"
@echo " client_pubsub_unsubscribe_backend"
@echo ""
server:
@@ -143,3 +148,13 @@ client_pubsub_options_frontend:
client_pubsub_options_backend:
PYTHONPATH=../../../../autobahn python client.py --component "pubsub.pubsuboptions.PubSubOptionsTestBackend"
server_with_pubsub_unsubscribe_backend:
PYTHONPATH=../../../../autobahn python server.py --component "pubsub.unsubscribe.UnsubscribeTestBackend"
client_pubsub_unsubscribe_frontend:
PYTHONPATH=../../../../autobahn python client.py --component "pubsub.unsubscribe.UnsubscribeTestFrontend"
client_pubsub_unsubscribe_backend:
PYTHONPATH=../../../../autobahn python client.py --component "pubsub.unsubscribe.UnsubscribeTestBackend"

View File

@@ -37,7 +37,7 @@ class PubSubOptionsTestBackend(WampAppSession):
while True:
publication = yield self.publish('com.myapp.topic1', counter,
options = PublishOptions(acknowledge = True, discloseMe = True))
print("Event published with publication ID {}".format(publication))
print("Event published with publication ID {}".format(publication.id))
counter += 1
yield sleep(1)
@@ -61,7 +61,7 @@ class PubSubOptionsTestFrontend(WampAppSession):
self.closeSession()
yield self.subscribe(on_event, 'com.myapp.topic1',
options = SubscribeOptions(details_arg = 'details'))
options = SubscribeOptions(details_arg = 'details'))
def onSessionClose(self, details):

View File

@@ -0,0 +1,80 @@
###############################################################################
##
## Copyright (C) 2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.wamp.protocol import WampAppSession
from autobahn.twisted.util import sleep
class UnsubscribeTestBackend(WampAppSession):
"""
An application component that publishes an event every second.
"""
@inlineCallbacks
def onSessionOpen(self, details):
counter = 0
while True:
self.publish('com.myapp.topic1', counter)
counter += 1
yield sleep(1)
class UnsubscribeTestFrontend(WampAppSession):
"""
An application component that subscribes and receives events.
After receiving 5 events, it unsubscribes, sleeps and then
resubscribes for another run. Then it stops.
"""
@inlineCallbacks
def test(self):
self.received = 0
@inlineCallbacks
def on_event(i):
print("Got event: {}".format(i))
self.received += 1
if self.received > 5:
self.runs += 1
if self.runs > 1:
self.closeSession()
else:
yield self.subscription.unsubscribe()
print("Unsubscribed .. continue in 2s ..")
reactor.callLater(2, self.test)
self.subscription = yield self.subscribe(on_event, 'com.myapp.topic1')
print("Subscribed with subscription ID {}".format(self.subscription.id))
@inlineCallbacks
def onSessionOpen(self, details):
self.runs = 0
yield self.test()
def onSessionClose(self, details):
reactor.stop()

View File

@@ -0,0 +1,217 @@
###############################################################################
##
## Copyright (C) 2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
from __future__ import absolute_import
#from twisted.trial import unittest
import unittest
from zope.interface import implementer
from autobahn.wamp.interfaces import *
from autobahn.wamp.types import *
from autobahn.wamp.exception import ApplicationError, ProtocolError
from twisted.internet.defer import Deferred, inlineCallbacks
import random
def newid():
return random.randint(0, 2**53)
@implementer(ISubscriber)
@implementer(IPublisher)
@implementer(ICallee)
@implementer(ICaller)
class MockSession:
def __init__(self):
self._subscriptions = {}
self._registrations = {}
def subscribe(self, topic, options = None):
assert(type(topic) == str)
assert(options is None or isinstance(options, SubscribeOptions))
if not self._subscriptions.has_key(topic):
self._subscriptions[topic] = Subscription(newid(), topic)
d = Deferred()
d.callback(self._subscriptions[topic])
return d
def unsubscribe(self, subscription):
assert(isinstance(subscription, Subscription))
assert(subscription._isActive)
assert(subscription._topic in self._subscriptions)
subscription._isActive = False
del self._subscriptions[subscription._topic]
d = Deferred()
d.callback(None)
return d
def publish(self, topic, payload = None, options = None):
assert(type(topic) == str)
assert(options is None or isinstance(options, PublishOptions))
d = Deferred()
if topic not in ["com.myapp.mytopic1"]:
d.errback(ApplicationError(ApplicationError.NOT_AUTHORIZED))
else:
id = newid()
if self._subscriptions.has_key(topic):
event = Event(topic, payload, id)
self._subscriptions[topic].notify(event)
d.callback(id)
return d
def register(self, procedure, endpoint, options = None):
assert(type(procedure) == str)
assert(options is None or isinstance(options, RegisterOptions))
if not self._registrations.has_key(procedure):
self._registrations[procedure] = Registration(newid(), procedure, endpoint)
d = Deferred()
d.callback(self._registrations[procedure])
return d
def unregister(self, registration):
assert(isinstance(registration, Registration))
assert(registration._isActive)
assert(registration._procedure in self._registrations)
registration._isActive = False
del self._registrations[registration._procedure]
d = Deferred()
d.callback(None)
return d
def call(self, procedure, *args, **kwargs):
assert(type(procedure) == str)
if 'options' in kwargs:
options = kwargs['options']
del kwargs['options']
assert(isinstance(options, CallOptions))
d = Deferred()
if procedure == "com.myapp.echo":
if len(args) != 1 or len(kwargs) != 0 or type(args[0]) != str:
d.errback(ApplicationError(ApplicationError.INVALID_ARGUMENT, "procedure takes exactly 1 positional argument of type string"))
else:
d.callback(args[0])
elif procedure == "com.myapp.complex":
d.callback(CallResult(23, 7, foo = "bar"))
elif self._registrations.has_key(procedure):
try:
res = self._registrations[procedure]._endpoint(*args, **kwargs)
except TypeError as err:
d.errback(ApplicationError(ApplicationError.INVALID_ARGUMENT, str(err)))
else:
d.callback(res)
else:
d.errback(ApplicationError(ApplicationError.NO_SUCH_PROCEDURE, "no procedure with URI {}".format(procedure)))
return d
@inlineCallbacks
def test_rpc(session):
def hello(msg):
return "You said {}. I say hello!".format(msg)
try:
reg1 = yield session.register("com.myapp.hello", hello)
print(reg1)
except ApplicationError as err:
print(err)
else:
res = yield session.call("com.myapp.hello", "foooo")
print (res)
yield session.unregister(reg1)
res = yield session.call("com.myapp.hello", "baaar")
print (res)
try:
# res = yield session.call("com.myapp.echo", "Hello, world!", 23)
# res = yield session.call("com.myapp.complex", "Hello, world!", 23)
res = yield session.call("com.myapp.complex", "Hello, world!", 23, options = CallOptions(timeout = 2))
print(res.results)
print(res.kwresults)
except ApplicationError as err:
print(err)
@inlineCallbacks
def test_pubsub(session):
try:
sub1 = yield session.subscribe("com.myapp.mytopic1", SubscribeOptions(match = 'prefix'))
print(sub1)
except ApplicationError as err:
print(err)
else:
def watcher1(event):
print("watcher1: publication {} on topic {} with payload {}".format(event.publication, event.topic, event.payload))
def watcher2(event):
print("watcher1: publication {} on topic {} with payload {}".format(event.publication, event.topic, event.payload))
sub1.watch(watcher1)
sub1.watch(watcher2)
session.publish("com.myapp.mytopic1", "Hello, world!")
sub1.unwatch(watcher1)
publicationId = yield session.publish("com.myapp.mytopic1", "Hello, world!")
print(publicationId)
session.publish("com.myapp.mytopic2", "Hello, world!")
class Publisher(unittest.TestCase):
def setUp(self):
self.session = MockSession()
def tearDown(self):
pass
@inlineCallbacks
def test_register(self):
def hello(msg):
return "You said {}. I say hello!".format(msg)
try:
reg1 = yield self.session.register("com.myapp.hello", hello)
except ApplicationError as err:
print(err)
if __name__ == '__main__':
unittest.main()