work on WAMPv2

This commit is contained in:
Tobias Oberstein
2014-01-06 01:32:51 +01:00
parent 043685b9a8
commit 698226c2d0
9 changed files with 351 additions and 16 deletions

1
.gitignore vendored
View File

@@ -10,3 +10,4 @@ __pycache__
dropin.cache
*.egg
*.tar.gz
_trial_temp

5
Makefile Normal file
View File

@@ -0,0 +1,5 @@
test:
PYTHONPATH=./autobahn trial.py autobahn/autobahn/wamp2/tests/test_interfaces.py
testp:
PYTHONPATH=./autobahn python autobahn/autobahn/wamp2/tests/test_interfaces.py

View File

@@ -54,6 +54,7 @@ class ApplicationError(Error):
NO_SUCH_REGISTRATION = "wamp.error.no_such_registration"
NO_SUCH_SUBSCRIPTION = "wamp.error.no_such_subscription"
NO_SUCH_PROCEDURE = "wamp.error.no_such_procedure"
CANCELED = "wamp.error.canceled"
def __init__(self, error, reason = None):
"""
@@ -72,7 +73,7 @@ class ApplicationError(Error):
class CallError(ApplicationError):
"""
Wrapper for WAMP remote procedure call errors.
Remote procedure call errors.
"""
def __init__(self, error, problem):
@@ -86,3 +87,16 @@ class CallError(ApplicationError):
"""
ApplicationError.__init__(self, error)
self.problem = problem
class CanceledError(ApplicationError):
"""
Error for canceled calls.
"""
def __init__(self):
"""
Constructor.
"""
ApplicationError.__init__(self, ApplicationError.CANCELED)

View File

@@ -243,6 +243,54 @@ class ICallee(IPeerRole):
Interface for WAMP peers implementing role "Callee".
"""
def register(procedure, endpoint, options = None):
"""
Register an endpoint on a procedure to (subsequently) receive calls
calling that procedure.
This will return a deferred/future, that when resolved provides
an instance of :class:`autobahn.wamp2.types.Registration`.
If the registration fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp2.error.ApplicationError`.
:param procedure: The URI (or URI pattern) of the procedure to register for,
e.g. "com.myapp.myprocedure1".
:type procedure: str
:param endpoint: The endpoint called under the procedure.
:type endpoint: callable
:param options: Options for registering.
:type options: An instance of :class:`autobahn.wamp2.types.RegisterOptions`.
:returns: obj -- A deferred/future for the registration -
an instance of :class:`twisted.internet.defer.Deferred`
(when running under Twisted) or an instance of
:class:`asyncio.Future` (when running under asyncio).
"""
def unregister(registration):
"""
Unregister the endpoint registration that was previously registered.
After a registration has been unregistered, calls won't get routed
to the endpoint any more.
This will return a deferred/future, that when resolved signals
successful unregistration.
If the unregistration fails, the returned deferred/future will be rejected
with an instance of :class:`autobahn.wamp2.error.ApplicationError`.
:param registration: The registration to unregister from.
:type registration: An instance of :class:`autobahn.wamp2.types.Registration`
that was previously registered.
:returns: obj -- A deferred/future for the unregistration -
an instance of :class:`twisted.internet.defer.Deferred` (when running under Twisted)
or an instance of :class:`asyncio.Future` (when running under asyncio).
"""
class IPublisher(IPeerRole):

View File

@@ -0,0 +1,17 @@
###############################################################################
##
## Copyright (C) 2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################

View File

@@ -0,0 +1,224 @@
###############################################################################
##
## Copyright (C) 2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
from __future__ import absolute_import
#from twisted.trial import unittest
import unittest
from zope.interface import implementer
from autobahn.wamp2.interfaces import *
from autobahn.wamp2.types import *
from autobahn.wamp2.error import ApplicationError, ProtocolError
from twisted.internet.defer import Deferred, inlineCallbacks
import random
def newid():
return random.randint(0, 2**53)
@implementer(ISubscriber)
@implementer(IPublisher)
@implementer(ICallee)
@implementer(ICaller)
class MockSession:
def __init__(self):
self._subscriptions = {}
self._registrations = {}
def subscribe(self, topic, options = None):
assert(type(topic) == str)
assert(options is None or isinstance(options, SubscribeOptions))
if not self._subscriptions.has_key(topic):
self._subscriptions[topic] = Subscription(newid(), topic)
d = Deferred()
d.callback(self._subscriptions[topic])
return d
def unsubscribe(self, subscription):
assert(isinstance(subscription, Subscription))
assert(subscription._isActive)
assert(subscription._topic in self._subscriptions)
subscription._isActive = False
del self._subscriptions[subscription._topic]
d = Deferred()
d.callback(None)
return d
def publish(self, topic, payload = None, options = None):
assert(type(topic) == str)
assert(options is None or isinstance(options, PublishOptions))
d = Deferred()
if topic not in ["com.myapp.mytopic1"]:
d.errback(ApplicationError(ApplicationError.NOT_AUTHORIZED))
else:
id = newid()
if self._subscriptions.has_key(topic):
event = Event(topic, payload, id)
self._subscriptions[topic].notify(event)
d.callback(id)
return d
def register(self, procedure, endpoint, options = None):
assert(type(procedure) == str)
assert(options is None or isinstance(options, RegisterOptions))
if not self._registrations.has_key(procedure):
self._registrations[procedure] = Registration(newid(), procedure, endpoint)
d = Deferred()
d.callback(self._registrations[procedure])
return d
def unregister(self, registration):
assert(isinstance(registration, Registration))
assert(registration._isActive)
assert(registration._procedure in self._registrations)
registration._isActive = False
del self._registrations[registration._procedure]
d = Deferred()
d.callback(None)
return d
def call(self, procedure, *args, **kwargs):
assert(type(procedure) == str)
if 'options' in kwargs:
options = kwargs['options']
del kwargs['options']
assert(isinstance(options, CallOptions))
d = Deferred()
if procedure == "com.myapp.echo":
if len(args) != 1 or len(kwargs) != 0 or type(args[0]) != str:
d.errback(ApplicationError(ApplicationError.INVALID_ARGUMENT, "procedure takes exactly 1 positional argument of type string"))
else:
d.callback(args[0])
elif procedure == "com.myapp.complex":
d.callback(CallResult(23, 7, foo = "bar"))
elif self._registrations.has_key(procedure):
try:
res = self._registrations[procedure]._endpoint(*args, **kwargs)
except TypeError as err:
d.errback(ApplicationError(ApplicationError.INVALID_ARGUMENT, str(err)))
else:
d.callback(res)
else:
d.errback(ApplicationError(ApplicationError.NO_SUCH_PROCEDURE, "no procedure with URI {}".format(procedure)))
return d
@inlineCallbacks
def test_rpc(session):
def hello(msg):
return "You said {}. I say hello!".format(msg)
try:
reg1 = yield session.register("com.myapp.hello", hello)
print(reg1)
except ApplicationError as err:
print(err)
else:
res = yield session.call("com.myapp.hello", "foooo")
print (res)
yield session.unregister(reg1)
res = yield session.call("com.myapp.hello", "baaar")
print (res)
try:
# res = yield session.call("com.myapp.echo", "Hello, world!", 23)
# res = yield session.call("com.myapp.complex", "Hello, world!", 23)
res = yield session.call("com.myapp.complex", "Hello, world!", 23, options = CallOptions(timeout = 2))
print(res.results)
print(res.kwresults)
except ApplicationError as err:
print(err)
@inlineCallbacks
def test_pubsub(session):
try:
sub1 = yield session.subscribe("com.myapp.mytopic1", SubscribeOptions(match = 'prefix'))
print(sub1)
except ApplicationError as err:
print(err)
else:
def watcher1(event):
print("watcher1: publication {} on topic {} with payload {}".format(event.publication, event.topic, event.payload))
def watcher2(event):
print("watcher1: publication {} on topic {} with payload {}".format(event.publication, event.topic, event.payload))
sub1.watch(watcher1)
sub1.watch(watcher2)
session.publish("com.myapp.mytopic1", "Hello, world!")
sub1.unwatch(watcher1)
publicationId = yield session.publish("com.myapp.mytopic1", "Hello, world!")
print(publicationId)
session.publish("com.myapp.mytopic2", "Hello, world!")
class Publisher(unittest.TestCase):
def setUp(self):
self.session = MockSession()
def tearDown(self):
pass
@inlineCallbacks
def testPublish(self):
def hello(msg):
return "You said {}. I say hello!".format(msg)
try:
reg1 = yield self.session.register("com.myapp.hello", hello)
print(reg1)
except ApplicationError as err:
print(err)
@inlineCallbacks
def test_sample(self):
with self.assertRaises(ApplicationError):
yield self.session.register("com.myapp.hello", None)
if __name__ == '__main__':
unittest.main()

View File

@@ -22,6 +22,10 @@ from __future__ import absolute_import
class RegisterOptions:
"""
"""
def __init__(self,
match = None):
assert(match is None or (type(match) == str and match in ['exact', 'prefix', 'wildcard']))
self.match = match
@@ -101,12 +105,23 @@ class CallResult:
self.kwresults = kwresults
class Invocation:
"""
"""
def __init__(self, caller = None):
self.caller = caller
def progress(self, *args, **kwargs):
pass
class SubscribeOptions:
"""
"""
def __init__(self, match = None):
assert(match is None or (type(match) == str and match in ['exact', 'prefix', 'wildcard']))
self.match = match
@@ -217,8 +232,3 @@ class Event:
self.publication = publication
self.publisher = publisher
class InvocationDetails:
"""
"""

View File

@@ -187,7 +187,8 @@ def deleteTask(invocation = Invocation):
# .. and notify all but the caller
session.publish("com.myapp.task.{}.on_delete".format(taskId))
yield session.register("com.myapp.task..delete", deleteTask)
yield session.register("com.myapp.task..delete", deleteTask,
options = RegisterOptions(match = "wildcard"))
```
This endpoint can now be called
@@ -196,16 +197,16 @@ This endpoint can now be called
yield session.call("com.myapp.task.t130.delete")
```
### Progressive invocations
### Invocation producing progressive results
The following endpoint will produce progressive call results:
```python
def longop(n, invocation = Invocation):
for i in range(n):
invocation.progress(i)
yield sleep(1)
return n
for i in range(n):
invocation.progress(i)
yield sleep(1)
return n
yield session.register("com.myapp.longop", longop)
```

View File

@@ -88,10 +88,16 @@ class MockSession:
def call(self, procedure, *args, **kwargs):
assert(type(procedure) == str)
invocation = Invocation()
if 'options' in kwargs:
options = kwargs['options']
del kwargs['options']
assert(isinstance(options, CallOptions))
if options.discloseMe:
invocation.caller = newid()
if options.onProgress:
invocation.progress = options.onProgress
d = Deferred()
if procedure == "com.myapp.echo":
@@ -104,6 +110,7 @@ class MockSession:
elif self._registrations.has_key(procedure):
try:
kwargs['invocation'] = invocation
res = self._registrations[procedure]._endpoint(*args, **kwargs)
except TypeError as err:
d.errback(ApplicationError(ApplicationError.INVALID_ARGUMENT, str(err)))
@@ -115,24 +122,32 @@ class MockSession:
return d
import inspect
@inlineCallbacks
def test_rpc(session):
def hello(msg):
def hello(msg, invocation = Invocation):
for i in range(5):
invocation.progress(i)
return "You said {}. I say hello!".format(msg)
print inspect.getargspec(hello)
try:
reg1 = yield session.register("com.myapp.hello", hello)
print(reg1)
except ApplicationError as err:
print(err)
else:
res = yield session.call("com.myapp.hello", "foooo")
print (res)
def onProgress(i):
print("progress {}".format(i))
res = yield session.call("com.myapp.hello", "foooo", options = CallOptions(discloseMe = True, onProgress = onProgress))
print(res)
yield session.unregister(reg1)
res = yield session.call("com.myapp.hello", "baaar")
print (res)
print(res)
try:
# res = yield session.call("com.myapp.echo", "Hello, world!", 23)