contrib: import tinyrpc library

https://pypi.python.org/pypi/tinyrpc/0.5
https://github.com/mbr/tinyrpc

Signed-off-by: Yoshihiro Kaneko <ykaneko0929@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yoshihiro Kaneko 2014-06-15 19:03:04 +09:00 committed by FUJITA Tomonori
parent dbb143f972
commit 78a9a20270
15 changed files with 1302 additions and 1 deletions

View File

@ -0,0 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .protocols import *
from .exc import *
from .client import *

View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .exc import RPCError
class RPCClient(object):
"""Client for making RPC calls to connected servers.
:param protocol: An :py:class:`~tinyrpc.RPCProtocol` instance.
:param transport: A :py:class:`~tinyrpc.transports.ClientTransport`
instance.
"""
def __init__(self, protocol, transport):
self.protocol = protocol
self.transport = transport
def _send_and_handle_reply(self, req):
# sends and waits for reply
reply = self.transport.send_message(req.serialize())
response = self.protocol.parse_reply(reply)
if hasattr(response, 'error'):
raise RPCError('Error calling remote procedure: %s' %\
response.error)
return response
def call(self, method, args, kwargs, one_way=False):
"""Calls the requested method and returns the result.
If an error occured, an :py:class:`~tinyrpc.exc.RPCError` instance
is raised.
:param method: Name of the method to call.
:param args: Arguments to pass to the method.
:param kwargs: Keyword arguments to pass to the method.
:param one_way: Whether or not a reply is desired.
"""
req = self.protocol.create_request(method, args, kwargs, one_way)
return self._send_and_handle_reply(req).result
def get_proxy(self, prefix='', one_way=False):
"""Convenience method for creating a proxy.
:param prefix: Passed on to :py:class:`~tinyrpc.client.RPCProxy`.
:param one_way: Passed on to :py:class:`~tinyrpc.client.RPCProxy`.
:return: :py:class:`~tinyrpc.client.RPCProxy` instance."""
return RPCProxy(self, prefix, one_way)
def batch_call(self, calls):
"""Experimental, use at your own peril."""
req = self.protocol.create_batch_request()
for call_args in calls:
req.append(self.protocol.create_request(*call_args))
return self._send_and_handle_reply(req)
class RPCProxy(object):
"""Create a new remote proxy object.
Proxies allow calling of methods through a simpler interface. See the
documentation for an example.
:param client: An :py:class:`~tinyrpc.client.RPCClient` instance.
:param prefix: Prefix to prepend to every method name.
:param one_way: Passed to every call of
:py:func:`~tinyrpc.client.call`.
"""
def __init__(self, client, prefix='', one_way=False):
self.client = client
self.prefix = prefix
self.one_way = one_way
def __getattr__(self, name):
"""Returns a proxy function that, when called, will call a function
name ``name`` on the client associated with the proxy.
"""
proxy_func = lambda *args, **kwargs: self.client.call(
self.prefix + name,
args,
kwargs,
one_way=self.one_way
)
return proxy_func

View File

@ -0,0 +1,201 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
from ..exc import *
def public(name=None):
"""Set RPC name on function.
This function decorator will set the ``_rpc_public_name`` attribute on a
function, causing it to be picked up if an instance of its parent class is
registered using
:py:func:`~tinyrpc.dispatch.RPCDispatcher.register_instance`.
``@public`` is a shortcut for ``@public()``.
:param name: The name to register the function with.
"""
# called directly with function
if callable(name):
f = name
f._rpc_public_name = f.__name__
return f
def _(f):
f._rpc_public_name = name or f.__name__
return f
return _
class RPCDispatcher(object):
"""Stores name-to-method mappings."""
def __init__(self):
self.method_map = {}
self.subdispatchers = {}
def add_subdispatch(self, dispatcher, prefix=''):
"""Adds a subdispatcher, possibly in its own namespace.
:param dispatcher: The dispatcher to add as a subdispatcher.
:param prefix: A prefix. All of the new subdispatchers methods will be
available as prefix + their original name.
"""
self.subdispatchers.setdefault(prefix, []).append(dispatcher)
def add_method(self, f, name=None):
"""Add a method to the dispatcher.
:param f: Callable to be added.
:param name: Name to register it with. If ``None``, ``f.__name__`` will
be used.
"""
assert callable(f), "method argument must be callable"
# catches a few programming errors that are
# commonly silently swallowed otherwise
if not name:
name = f.__name__
if name in self.method_map:
raise RPCError('Name %s already registered')
self.method_map[name] = f
def dispatch(self, request):
"""Fully handle request.
The dispatch method determines which method to call, calls it and
returns a response containing a result.
No exceptions will be thrown, rather, every exception will be turned
into a response using :py:func:`~tinyrpc.RPCRequest.error_respond`.
If a method isn't found, a :py:exc:`~tinyrpc.exc.MethodNotFoundError`
response will be returned. If any error occurs outside of the requested
method, a :py:exc:`~tinyrpc.exc.ServerError` without any error
information will be returend.
If the method is found and called but throws an exception, the
exception thrown is used as a response instead. This is the only case
in which information from the exception is possibly propagated back to
the client, as the exception is part of the requested method.
:py:class:`~tinyrpc.RPCBatchRequest` instances are handled by handling
all its children in order and collecting the results, then returning an
:py:class:`~tinyrpc.RPCBatchResponse` with the results.
:param request: An :py:func:`~tinyrpc.RPCRequest`.
:return: An :py:func:`~tinyrpc.RPCResponse`.
"""
if hasattr(request, 'create_batch_response'):
results = [self._dispatch(req) for req in request]
response = request.create_batch_response()
if response != None:
response.extend(results)
return response
else:
return self._dispatch(request)
def _dispatch(self, request):
try:
try:
method = self.get_method(request.method)
except KeyError as e:
return request.error_respond(MethodNotFoundError(e))
# we found the method
try:
result = method(*request.args, **request.kwargs)
except Exception as e:
# an error occured within the method, return it
return request.error_respond(e)
# respond with result
return request.respond(result)
except Exception as e:
# unexpected error, do not let client know what happened
return request.error_respond(ServerError())
def get_method(self, name):
"""Retrieve a previously registered method.
Checks if a method matching ``name`` has been registered.
If :py:func:`get_method` cannot find a method, every subdispatcher
with a prefix matching the method name is checked as well.
If a method isn't found, a :py:class:`KeyError` is thrown.
:param name: Callable to find.
:param return: The callable.
"""
if name in self.method_map:
return self.method_map[name]
for prefix, subdispatchers in self.subdispatchers.iteritems():
if name.startswith(prefix):
for sd in subdispatchers:
try:
return sd.get_method(name[len(prefix):])
except KeyError:
pass
raise KeyError(name)
def public(self, name=None):
"""Convenient decorator.
Allows easy registering of functions to this dispatcher. Example:
.. code-block:: python
dispatch = RPCDispatcher()
@dispatch.public
def foo(bar):
# ...
class Baz(object):
def not_exposed(self):
# ...
@dispatch.public(name='do_something')
def visible_method(arg1)
# ...
:param name: Name to register callable with
"""
if callable(name):
self.add_method(name)
return name
def _(f):
self.add_method(f, name=name)
return f
return _
def register_instance(self, obj, prefix=''):
"""Create new subdispatcher and register all public object methods on
it.
To be used in conjunction with the :py:func:`tinyrpc.dispatch.public`
decorator (*not* :py:func:`tinyrpc.dispatch.RPCDispatcher.public`).
:param obj: The object whose public methods should be made available.
:param prefix: A prefix for the new subdispatcher.
"""
dispatch = self.__class__()
for name, f in inspect.getmembers(
obj, lambda f: callable(f) and hasattr(f, '_rpc_public_name')
):
dispatch.add_method(f, f._rpc_public_name)
# add to dispatchers
self.add_subdispatch(dispatch, prefix)

View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class RPCError(Exception):
"""Base class for all excetions thrown by :py:mod:`tinyrpc`."""
class BadRequestError(RPCError):
"""Base class for all errors that caused the processing of a request to
abort before a request object could be instantiated."""
def error_respond(self):
"""Create :py:class:`~tinyrpc.RPCErrorResponse` to respond the error.
:return: A error responce instance or ``None``, if the protocol decides
to drop the error silently."""
raise RuntimeError('Not implemented')
class BadReplyError(RPCError):
"""Base class for all errors that caused processing of a reply to abort
before it could be turned in a response object."""
class InvalidRequestError(BadRequestError):
"""A request made was malformed (i.e. violated the specification) and could
not be parsed."""
class InvalidReplyError(BadReplyError):
"""A reply received was malformed (i.e. violated the specification) and
could not be parsed into a response."""
class MethodNotFoundError(RPCError):
"""The desired method was not found."""
class ServerError(RPCError):
"""An internal error in the RPC system occured."""

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python
from ..exc import *
class RPCRequest(object):
unique_id = None
"""A unique ID to remember the request by. Protocol specific, may or
may not be set. This value should only be set by
:py:func:`~tinyrpc.RPCProtocol.create_request`.
The ID allows client to receive responses out-of-order and still allocate
them to the correct request.
Only supported if the parent protocol has
:py:attr:`~tinyrpc.RPCProtocol.supports_out_of_order` set to ``True``.
"""
method = None
"""The name of the method to be called."""
args = []
"""The positional arguments of the method call."""
kwargs = {}
"""The keyword arguments of the method call."""
def error_respond(self, error):
"""Creates an error response.
Create a response indicating that the request was parsed correctly,
but an error has occured trying to fulfill it.
:param error: An exception or a string describing the error.
:return: A response or ``None`` to indicate that no error should be sent
out.
"""
raise NotImplementedError()
def respond(self, result):
"""Create a response.
Call this to return the result of a successful method invocation.
This creates and returns an instance of a protocol-specific subclass of
:py:class:`~tinyrpc.RPCResponse`.
:param result: Passed on to new response instance.
:return: A response or ``None`` to indicate this request does not expect a
response.
"""
raise NotImplementedError()
def serialize(self):
"""Returns a serialization of the request.
:return: A string to be passed on to a transport.
"""
raise NotImplementedError()
class RPCBatchRequest(list):
"""Multiple requests batched together.
A batch request is a subclass of :py:class:`list`. Protocols that support
multiple requests in a single message use this to group them together.
Handling a batch requests is done in any order, responses must be gathered
in a batch response and be in the same order as their respective requests.
Any item of a batch request is either a request or a subclass of
:py:class:`~tinyrpc.BadRequestError`, which indicates that there has been
an error in parsing the request.
"""
def create_batch_response(self):
"""Creates a response suitable for responding to this request.
:return: An :py:class:`~tinyrpc.RPCBatchResponse` or ``None``, if no
response is expected."""
raise NotImplementedError()
def serialize(self):
raise NotImplementedError()
class RPCResponse(object):
"""RPC call response class.
Base class for all deriving responses.
Has an attribute ``result`` containing the result of the RPC call, unless
an error occured, in which case an attribute ``error`` will contain the
error message."""
unique_id = None
def serialize(self):
"""Returns a serialization of the response.
:return: A reply to be passed on to a transport.
"""
raise NotImplementedError()
class RPCErrorResponse(RPCResponse):
pass
class RPCBatchResponse(list):
"""Multiple response from a batch request. See
:py:class:`~tinyrpc.RPCBatchRequest` on how to handle.
Items in a batch response need to be
:py:class:`~tinyrpc.RPCResponse` instances or None, meaning no reply should
generated for the request.
"""
def serialize(self):
"""Returns a serialization of the batch response."""
raise NotImplementedError()
class RPCProtocol(object):
"""Base class for all protocol implementations."""
supports_out_of_order = False
"""If true, this protocol can receive responses out of order correctly.
Note that this usually depends on the generation of unique_ids, the
generation of these may or may not be thread safe, depending on the
protocol. Ideally, only once instance of RPCProtocol should be used per
client."""
def create_request(self, method, args=None, kwargs=None, one_way=False):
"""Creates a new RPCRequest object.
It is up to the implementing protocol whether or not ``args``,
``kwargs``, one of these, both at once or none of them are supported.
:param method: The method name to invoke.
:param args: The positional arguments to call the method with.
:param kwargs: The keyword arguments to call the method with.
:param one_way: The request is an update, i.e. it does not expect a
reply.
:return: A new :py:class:`~tinyrpc.RPCRequest` instance.
"""
raise NotImplementedError()
def parse_request(self, data):
"""Parses a request given as a string and returns an
:py:class:`RPCRequest` instance.
:return: An instanced request.
"""
raise NotImplementedError()
def parse_reply(self, data):
"""Parses a reply and returns an :py:class:`RPCResponse` instance.
:return: An instanced response.
"""
raise NotImplementedError()
class RPCBatchProtocol(RPCProtocol):
def create_batch_request(self, requests=None):
"""Create a new :py:class:`tinyrpc.RPCBatchRequest` object.
:param requests: A list of requests.
"""
raise NotImplementedError()

View File

@ -0,0 +1,291 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .. import RPCBatchProtocol, RPCRequest, RPCResponse, RPCErrorResponse,\
InvalidRequestError, MethodNotFoundError, ServerError,\
InvalidReplyError, RPCError, RPCBatchRequest, RPCBatchResponse
import json
class FixedErrorMessageMixin(object):
def __init__(self, *args, **kwargs):
if not args:
args = [self.message]
super(FixedErrorMessageMixin, self).__init__(*args, **kwargs)
def error_respond(self):
response = JSONRPCErrorResponse()
response.error = self.message
response.unique_id = None
response._jsonrpc_error_code = self.jsonrpc_error_code
return response
class JSONRPCParseError(FixedErrorMessageMixin, InvalidRequestError):
jsonrpc_error_code = -32700
message = 'Parse error'
class JSONRPCInvalidRequestError(FixedErrorMessageMixin, InvalidRequestError):
jsonrpc_error_code = -32600
message = 'Invalid Request'
class JSONRPCMethodNotFoundError(FixedErrorMessageMixin, MethodNotFoundError):
jsonrpc_error_code = -32601
message = 'Method not found'
class JSONRPCInvalidParamsError(FixedErrorMessageMixin, InvalidRequestError):
jsonrpc_error_code = -32602
message = 'Invalid params'
class JSONRPCInternalError(FixedErrorMessageMixin, InvalidRequestError):
jsonrpc_error_code = -32603
message = 'Internal error'
class JSONRPCServerError(FixedErrorMessageMixin, InvalidRequestError):
jsonrpc_error_code = -32000
message = ''
class JSONRPCSuccessResponse(RPCResponse):
def _to_dict(self):
return {
'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
'id': self.unique_id,
'result': self.result,
}
def serialize(self):
return json.dumps(self._to_dict())
class JSONRPCErrorResponse(RPCErrorResponse):
def _to_dict(self):
return {
'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
'id': self.unique_id,
'error': {
'message': str(self.error),
'code': self._jsonrpc_error_code,
}
}
def serialize(self):
return json.dumps(self._to_dict())
def _get_code_and_message(error):
assert isinstance(error, (Exception, basestring))
if isinstance(error, Exception):
if hasattr(error, 'jsonrpc_error_code'):
code = error.jsonrpc_error_code
msg = str(error)
elif isinstance(error, InvalidRequestError):
code = JSONRPCInvalidRequestError.jsonrpc_error_code
msg = JSONRPCInvalidRequestError.message
elif isinstance(error, MethodNotFoundError):
code = JSONRPCMethodNotFoundError.jsonrpc_error_code
msg = JSONRPCMethodNotFoundError.message
else:
# allow exception message to propagate
code = JSONRPCServerError.jsonrpc_error_code
msg = str(error)
else:
code = -32000
msg = error
return code, msg
class JSONRPCRequest(RPCRequest):
def error_respond(self, error):
if not self.unique_id:
return None
response = JSONRPCErrorResponse()
code, msg = _get_code_and_message(error)
response.error = msg
response.unique_id = self.unique_id
response._jsonrpc_error_code = code
return response
def respond(self, result):
response = JSONRPCSuccessResponse()
if not self.unique_id:
return None
response.result = result
response.unique_id = self.unique_id
return response
def _to_dict(self):
jdata = {
'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
'method': self.method,
}
if self.args:
jdata['params'] = self.args
if self.kwargs:
jdata['params'] = self.kwargs
if self.unique_id != None:
jdata['id'] = self.unique_id
return jdata
def serialize(self):
return json.dumps(self._to_dict())
class JSONRPCBatchRequest(RPCBatchRequest):
def create_batch_response(self):
if self._expects_response():
return JSONRPCBatchResponse()
def _expects_response(self):
for request in self:
if isinstance(request, Exception):
return True
if request.unique_id != None:
return True
return False
def serialize(self):
return json.dumps([req._to_dict() for req in self])
class JSONRPCBatchResponse(RPCBatchResponse):
def serialize(self):
return json.dumps([resp._to_dict() for resp in self if resp != None])
class JSONRPCProtocol(RPCBatchProtocol):
"""JSONRPC protocol implementation.
Currently, only version 2.0 is supported."""
JSON_RPC_VERSION = "2.0"
_ALLOWED_REPLY_KEYS = sorted(['id', 'jsonrpc', 'error', 'result'])
_ALLOWED_REQUEST_KEYS = sorted(['id', 'jsonrpc', 'method', 'params'])
def __init__(self, *args, **kwargs):
super(JSONRPCProtocol, self).__init__(*args, **kwargs)
self._id_counter = 0
def _get_unique_id(self):
self._id_counter += 1
return self._id_counter
def create_batch_request(self, requests=None):
return JSONRPCBatchRequest(requests or [])
def create_request(self, method, args=None, kwargs=None, one_way=False):
if args and kwargs:
raise InvalidRequestError('Does not support args and kwargs at '\
'the same time')
request = JSONRPCRequest()
if not one_way:
request.unique_id = self._get_unique_id()
request.method = method
request.args = args
request.kwargs = kwargs
return request
def parse_reply(self, data):
try:
rep = json.loads(data)
except Exception as e:
raise InvalidReplyError(e)
for k in rep.iterkeys():
if not k in self._ALLOWED_REPLY_KEYS:
raise InvalidReplyError('Key not allowed: %s' % k)
if not 'jsonrpc' in rep:
raise InvalidReplyError('Missing jsonrpc (version) in response.')
if rep['jsonrpc'] != self.JSON_RPC_VERSION:
raise InvalidReplyError('Wrong JSONRPC version')
if not 'id' in rep:
raise InvalidReplyError('Missing id in response')
if ('error' in rep) == ('result' in rep):
raise InvalidReplyError(
'Reply must contain exactly one of result and error.'
)
if 'error' in rep:
response = JSONRPCErrorResponse()
error = rep['error']
response.error = error['message']
response._jsonrpc_error_code = error['code']
else:
response = JSONRPCSuccessResponse()
response.result = rep.get('result', None)
response.unique_id = rep['id']
return response
def parse_request(self, data):
try:
req = json.loads(data)
except Exception as e:
raise JSONRPCParseError()
if isinstance(req, list):
# batch request
requests = JSONRPCBatchRequest()
for subreq in req:
try:
requests.append(self._parse_subrequest(subreq))
except RPCError as e:
requests.append(e)
except Exception as e:
requests.append(JSONRPCInvalidRequestError())
if not requests:
raise JSONRPCInvalidRequestError()
return requests
else:
return self._parse_subrequest(req)
def _parse_subrequest(self, req):
for k in req.iterkeys():
if not k in self._ALLOWED_REQUEST_KEYS:
raise JSONRPCInvalidRequestError()
if req.get('jsonrpc', None) != self.JSON_RPC_VERSION:
raise JSONRPCInvalidRequestError()
if not isinstance(req['method'], basestring):
raise JSONRPCInvalidRequestError()
request = JSONRPCRequest()
request.method = str(req['method'])
request.unique_id = req.get('id', None)
params = req.get('params', None)
if params != None:
if isinstance(params, list):
request.args = req['params']
elif isinstance(params, dict):
request.kwargs = req['params']
else:
raise JSONRPCInvalidParamsError()
return request

View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# FIXME: needs unittests
# FIXME: needs checks for out-of-order, concurrency, etc as attributes
from tinyrpc.exc import RPCError
class RPCServer(object):
"""High level RPC server.
:param transport: The :py:class:`~tinyrpc.transports.RPCTransport` to use.
:param protocol: The :py:class:`~tinyrpc.RPCProtocol` to use.
:param dispatcher: The :py:class:`~tinyrpc.dispatch.RPCDispatcher` to use.
"""
def __init__(self, transport, protocol, dispatcher):
self.transport = transport
self.protocol = protocol
self.dispatcher = dispatcher
def serve_forever(self):
"""Handle requests forever.
Starts the server loop in which the transport will be polled for a new
message.
After a new message has arrived,
:py:func:`~tinyrpc.server.RPCServer._spawn` is called with a handler
function and arguments to handle the request.
The handler function will try to decode the message using the supplied
protocol, if that fails, an error response will be sent. After decoding
the message, the dispatcher will be asked to handle the resultung
request and the return value (either an error or a result) will be sent
back to the client using the transport.
After calling :py:func:`~tinyrpc.server.RPCServer._spawn`, the server
will fetch the next message and repeat.
"""
while True:
context, message = self.transport.receive_message()
# assuming protocol is threadsafe and dispatcher is theadsafe, as
# long as its immutable
def handle_message(context, message):
try:
request = self.protocol.parse_request(message)
except RPCError as e:
response = e.error_respond()
else:
response = self.dispatcher.dispatch(request)
# send reply
self.transport.send_reply(context, response.serialize())
self._spawn(handle_message, context, message)
def _spawn(self, func, *args, **kwargs):
"""Spawn a handler function.
This function is overridden in subclasses to provide concurrency.
In the base implementation, it simply calls the supplied function
``func`` with ``*args`` and ``**kwargs``. This results in a
single-threaded, single-process, synchronous server.
:param func: A callable to call.
:param args: Arguments to ``func``.
:param kwargs: Keyword arguments to ``func``.
"""
func(*args, **kwargs)

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import gevent
from . import RPCServer
class RPCServerGreenlets(RPCServer):
# documentation in docs because of dependencies
def _spawn(self, func, *args, **kwargs):
gevent.spawn(func, *args, **kwargs)

View File

@ -0,0 +1,115 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gevent
import zmq.green as zmq
from logbook import Logger
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.dispatch import RPCDispatcher
from tinyrpc import RPCError, ServerError, MethodNotFoundError
class Server(object):
def __init__(transport, protocol, dispatcher):
self.transport = transport
self.protocol = protocol
self.dispatcher = dispatcher
def run(self):
while True:
try:
context, message = self.transport.receive_message()
except Exception as e:
self.exception(e)
continue
# assuming protocol is threadsafe and dispatcher is theadsafe, as long
# as its immutable
self.handle_client(context, message)
def handle_client(self, context, message):
try:
request = self.protocol.parse_request(message)
except RPCError as e:
self.exception(e)
response = e.error_respond()
else:
response = dispatcher.dispatch(request)
# send reply
reply = response.serialize()
self.transport.send_reply(context, reply)
class ConcurrentServerMixin(object):
def handle_client(self, context, message):
self.spawn(
super(ConcurrentServer, self).handle_client, context, message
)
class ZmqRouterTransport(object):
def __init__(self, socket):
self.socket = socket
def receive_message(self):
msg = socket.recv_multipart()
return msg[:-1], [-1]
def send_reply(self, context, reply):
self.send_multipart(context + [reply])
class GeventConcurrencyMixin(ConcurrentServerMixin):
def spawn(self, func, *args, **kwargs):
gevent.spawn(func, *args, **kwargs)
def rpc_server(socket, protocol, dispatcher):
log = Logger('rpc_server')
log.debug('starting up...')
while True:
try:
message = socket.recv_multipart()
except Exception as e:
log.warning('Failed to receive message from client, ignoring...')
log.exception(e)
continue
log.debug('Received message %s from %r' % (message[-1], message[0]))
# assuming protocol is threadsafe and dispatcher is theadsafe, as long
# as its immutable
def handle_client(message):
try:
request = protocol.parse_request(message[-1])
except RPCError as e:
log.exception(e)
response = e.error_respond()
else:
response = dispatcher.dispatch(request)
log.debug('Response okay: %r' % response)
# send reply
message[-1] = response.serialize()
log.debug('Replying %s to %r' % (message[-1], message[0]))
socket.send_multipart(message)
gevent.spawn(handle_client, message)
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind("tcp://127.0.0.1:12345")
dispatcher = RPCDispatcher()
@dispatcher.public
def throw_up():
return 'asad'
raise Exception('BLARGH')
rpc_server(socket, JSONRPCProtocol(), dispatcher)

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class ServerTransport(object):
"""Base class for all server transports."""
def receive_message(self):
"""Receive a message from the transport.
Blocks until another message has been received. May return a context
opaque to clients that should be passed on
:py:func:`~tinyrpc.transport.Transport.send_reply` to identify the
client later on.
:return: A tuple consisting of ``(context, message)``.
"""
raise NotImplementedError()
def send_reply(self, context, reply):
"""Sends a reply to a client.
The client is usually identified by passing ``context`` as returned
from the original
:py:func:`~tinyrpc.transport.Transport.receive_message` call.
Messages must be strings, it is up to the sender to convert the
beforehand. A non-string value raises a :py:exc:`TypeError`.
:param context: A context returned by
:py:func:`~tinyrpc.transport.Transport.receive_message`.
:param reply: A string to send back as the reply.
"""
raise NotImplementedError
class ClientTransport(object):
"""Base class for all client transports."""
def send_message(self, message, expect_reply=True):
"""Send a message to the server and possibly receive a reply.
Sends a message to the connected server.
Messages must be strings, it is up to the sender to convert the
beforehand. A non-string value raises a :py:exc:`TypeError`.
This function will block until one reply has been received.
:param message: A string to send.
:return: A string containing the server reply.
"""
raise NotImplementedError

View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
import threading
import requests
from . import ServerTransport, ClientTransport
class HttpPostClientTransport(ClientTransport):
"""HTTP POST based client transport.
Requires :py:mod:`requests`. Submits messages to a server using the body of
an ``HTTP`` ``POST`` request. Replies are taken from the responses body.
:param endpoint: The URL to send ``POST`` data to.
:param kwargs: Additional parameters for :py:func:`requests.post`.
"""
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.request_kwargs = kwargs
def send_message(self, message, expect_reply=True):
if not isinstance(message, str):
raise TypeError('str expected')
r = requests.post(self.endpoint, data=message, **self.request_kwargs)
if expect_reply:
return r.content

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
import struct
import threading
from SocketServer import TCPServer, BaseRequestHandler, ThreadingMixIn
from . import RPCRequestResponseServer
def _read_length_prefixed_msg(sock, prefix_format='!I'):
prefix_bytes = struct.calcsize(prefix_format)
sock.recv(prefix_bytes)
def _read_n_bytes(sock, n):
buf = []
while n > 0:
data = sock.recv(n)
n -= len(data)
buf.append(data)
return ''.join(buf)
def create_length_prefixed_tcp_handler():
queue = Queue()
class LengthPrefixedTcpHandler(BaseRequestHandler):
def handle(self):
#msg = _read_length_prefixed_msg(self.request)
# this will run inside a new thread
self.request.send("hello\n")
while True:
b = _read_n_bytes(self.request, 10)
self.request.send("you sent: %s" % b)
queue.put(b)
return queue, LengthPrefixedTcpHandler
def tcp_test_main():
class Server(ThreadingMixIn, TCPServer):
pass
queue, Handler = create_length_prefixed_tcp_handler()
server = Server(('localhost', 12345), Handler)
server.allow_reuse_address = True
server.serve_forever()

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import Queue
from werkzeug.wrappers import Response, Request
from . import ServerTransport
class WsgiServerTransport(ServerTransport):
"""WSGI transport.
Requires :py:mod:`werkzeug`.
Due to the nature of WSGI, this transport has a few pecularities: It must
be run in a thread, greenlet or some other form of concurrent execution
primitive.
This is due to
:py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.handle` blocking
while waiting for a call to
:py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.send_reply`.
The parameter ``queue_class`` must be used to supply a proper queue class
for the chosen concurrency mechanism (i.e. when using :py:mod:`gevent`,
set it to :py:class:`gevent.queue.Queue`).
:param max_content_length: The maximum request content size allowed. Should
be set to a sane value to prevent DoS-Attacks.
:param queue_class: The Queue class to use.
:param allow_origin: The ``Access-Control-Allow-Origin`` header. Defaults
to ``*`` (so change it if you need actual security).
"""
def __init__(self, max_content_length=4096, queue_class=Queue.Queue,
allow_origin='*'):
self._queue_class = queue_class
self.messages = queue_class()
self.max_content_length = max_content_length
self.allow_origin = allow_origin
def receive_message(self):
return self.messages.get()
def send_reply(self, context, reply):
if not isinstance(reply, str):
raise TypeError('str expected')
context.put(reply)
def handle(self, environ, start_response):
"""WSGI handler function.
The transport will serve a request by reading the message and putting
it into an internal buffer. It will then block until another
concurrently running function sends a reply using
:py:func:`~tinyrpc.transports.WsgiServerTransport.send_reply`.
The reply will then be sent to the client being handled and handle will
return.
"""
request = Request(environ)
request.max_content_length = self.max_content_length
access_control_headers = {
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Origin': self.allow_origin,
'Access-Control-Allow-Headers': \
'Content-Type, X-Requested-With, Accept, Origin'
}
if request.method == 'OPTIONS':
response = Response(headers=access_control_headers)
elif request.method == 'POST':
# message is encoded in POST, read it...
msg = request.stream.read()
# create new context
context = self._queue_class()
self.messages.put((context, msg))
# ...and send the reply
response = Response(context.get(), headers=access_control_headers)
else:
# nothing else supported at the moment
response = Response('Only POST supported', 405)
return response(environ, start_response)

View File

@ -0,0 +1,76 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import # needed for zmq import
import zmq
from . import ServerTransport, ClientTransport
class ZmqServerTransport(ServerTransport):
"""Server transport based on a :py:const:`zmq.ROUTER` socket.
:param socket: A :py:const:`zmq.ROUTER` socket instance, bound to an
endpoint.
"""
def __init__(self, socket):
self.socket = socket
def receive_message(self):
msg = self.socket.recv_multipart()
return msg[:-1], msg[-1]
def send_reply(self, context, reply):
self.socket.send_multipart(context + [reply])
@classmethod
def create(cls, zmq_context, endpoint):
"""Create new server transport.
Instead of creating the socket yourself, you can call this function and
merely pass the :py:class:`zmq.core.context.Context` instance.
By passing a context imported from :py:mod:`zmq.green`, you can use
green (gevent) 0mq sockets as well.
:param zmq_context: A 0mq context.
:param endpoint: The endpoint clients will connect to.
"""
socket = zmq_context.socket(zmq.ROUTER)
socket.bind(endpoint)
return cls(socket)
class ZmqClientTransport(ClientTransport):
"""Client transport based on a :py:const:`zmq.REQ` socket.
:param socket: A :py:const:`zmq.REQ` socket instance, connected to the
server socket.
"""
def __init__(self, socket):
self.socket = socket
def send_message(self, message, expect_reply=True):
self.socket.send(message)
if expect_reply:
return self.socket.recv()
@classmethod
def create(cls, zmq_context, endpoint):
"""Create new client transport.
Instead of creating the socket yourself, you can call this function and
merely pass the :py:class:`zmq.core.context.Context` instance.
By passing a context imported from :py:mod:`zmq.green`, you can use
green (gevent) 0mq sockets as well.
:param zmq_context: A 0mq context.
:param endpoint: The endpoint the server is bound to.
"""
socket = zmq_context.socket(zmq.REQ)
socket.connect(endpoint)
return cls(socket)

View File

@ -7,4 +7,3 @@ paramiko
routes
six>=1.4.0
webob>=1.0.8
tinyrpc