Sync RPC serializer changes from Oslo
Pulls in the serializer object and changes to the dispatcher and proxy implementations to support it. Needed for bp/rpc-support-for-objects Change-Id: I349de94c80a183f244cbb10009b6922996cc4d82
This commit is contained in:
@@ -84,6 +84,7 @@ minimum version that supports the new parameter should be specified.
|
||||
"""
|
||||
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
from nova.openstack.common.rpc import serializer as rpc_serializer
|
||||
|
||||
|
||||
class RpcDispatcher(object):
|
||||
@@ -93,16 +94,38 @@ class RpcDispatcher(object):
|
||||
contains a list of underlying managers that have an API_VERSION attribute.
|
||||
"""
|
||||
|
||||
def __init__(self, callbacks):
|
||||
def __init__(self, callbacks, serializer=None):
|
||||
"""Initialize the rpc dispatcher.
|
||||
|
||||
:param callbacks: List of proxy objects that are an instance
|
||||
of a class with rpc methods exposed. Each proxy
|
||||
object should have an RPC_API_VERSION attribute.
|
||||
:param serializer: The Serializer object that will be used to
|
||||
deserialize arguments before the method call and
|
||||
to serialize the result after it returns.
|
||||
"""
|
||||
self.callbacks = callbacks
|
||||
if serializer is None:
|
||||
serializer = rpc_serializer.NoOpSerializer()
|
||||
self.serializer = serializer
|
||||
super(RpcDispatcher, self).__init__()
|
||||
|
||||
def _deserialize_args(self, context, kwargs):
|
||||
"""Helper method called to deserialize args before dispatch.
|
||||
|
||||
This calls our serializer on each argument, returning a new set of
|
||||
args that have been deserialized.
|
||||
|
||||
:param context: The request context
|
||||
:param kwargs: The arguments to be deserialized
|
||||
:returns: A new set of deserialized args
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in kwargs.iteritems():
|
||||
new_kwargs[argname] = self.serializer.deserialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
||||
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||
"""Dispatch a message based on a requested version.
|
||||
|
||||
@@ -145,7 +168,9 @@ class RpcDispatcher(object):
|
||||
if not hasattr(proxyobj, method):
|
||||
continue
|
||||
if is_compatible:
|
||||
return getattr(proxyobj, method)(ctxt, **kwargs)
|
||||
kwargs = self._deserialize_args(ctxt, kwargs)
|
||||
result = getattr(proxyobj, method)(ctxt, **kwargs)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
if had_compatible:
|
||||
raise AttributeError("No such RPC function '%s'" % method)
|
||||
|
||||
@@ -24,6 +24,7 @@ For more information about rpc API version numbers, see:
|
||||
|
||||
from nova.openstack.common import rpc
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
from nova.openstack.common.rpc import serializer as rpc_serializer
|
||||
|
||||
|
||||
class RpcProxy(object):
|
||||
@@ -38,7 +39,8 @@ class RpcProxy(object):
|
||||
# The default namespace, which can be overriden in a subclass.
|
||||
RPC_API_NAMESPACE = None
|
||||
|
||||
def __init__(self, topic, default_version, version_cap=None):
|
||||
def __init__(self, topic, default_version, version_cap=None,
|
||||
serializer=None):
|
||||
"""Initialize an RpcProxy.
|
||||
|
||||
:param topic: The topic to use for all messages.
|
||||
@@ -47,10 +49,15 @@ class RpcProxy(object):
|
||||
basis.
|
||||
:param version_cap: Optionally cap the maximum version used for sent
|
||||
messages.
|
||||
:param serializer: Optionaly (de-)serialize entities with a
|
||||
provided helper.
|
||||
"""
|
||||
self.topic = topic
|
||||
self.default_version = default_version
|
||||
self.version_cap = version_cap
|
||||
if serializer is None:
|
||||
serializer = rpc_serializer.NoOpSerializer()
|
||||
self.serializer = serializer
|
||||
super(RpcProxy, self).__init__()
|
||||
|
||||
def _set_version(self, msg, vers):
|
||||
@@ -77,6 +84,22 @@ class RpcProxy(object):
|
||||
return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
|
||||
**kwargs)
|
||||
|
||||
def _serialize_msg_args(self, context, kwargs):
|
||||
"""Helper method called to serialize message arguments.
|
||||
|
||||
This calls our serializer on each argument, returning a new
|
||||
set of args that have been serialized.
|
||||
|
||||
:param context: The request context
|
||||
:param kwargs: The arguments to serialize
|
||||
:returns: A new set of serialized arguments
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in kwargs.iteritems():
|
||||
new_kwargs[argname] = self.serializer.serialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
||||
def call(self, context, msg, topic=None, version=None, timeout=None):
|
||||
"""rpc.call() a remote method.
|
||||
|
||||
@@ -92,9 +115,11 @@ class RpcProxy(object):
|
||||
:returns: The return value from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.call(context, real_topic, msg, timeout)
|
||||
result = rpc.call(context, real_topic, msg, timeout)
|
||||
return self.serializer.deserialize_entity(context, result)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
@@ -115,9 +140,11 @@ class RpcProxy(object):
|
||||
from the remote method as they arrive.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.multicall(context, real_topic, msg, timeout)
|
||||
result = rpc.multicall(context, real_topic, msg, timeout)
|
||||
return self.serializer.deserialize_entity(context, result)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
@@ -135,6 +162,7 @@ class RpcProxy(object):
|
||||
remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast(self, context, msg, topic=None, version=None):
|
||||
@@ -150,6 +178,7 @@ class RpcProxy(object):
|
||||
from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.fanout_cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||
@@ -168,6 +197,7 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
|
||||
@@ -186,5 +216,6 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
msg['args'] = self._serialize_msg_args(context, msg['args'])
|
||||
rpc.fanout_cast_to_server(context, server_params,
|
||||
self._get_topic(topic), msg)
|
||||
|
||||
52
nova/openstack/common/rpc/serializer.py
Normal file
52
nova/openstack/common/rpc/serializer.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Provides the definition of an RPC serialization handler"""
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class Serializer(object):
|
||||
"""Generic (de-)serialization definition base class"""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize_entity(self, context, entity):
|
||||
"""Serialize something to primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Entity to be serialized
|
||||
:returns: Serialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize_entity(self, context, entity):
|
||||
"""Deserialize something from primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Primitive to be deserialized
|
||||
:returns: Deserialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NoOpSerializer(Serializer):
|
||||
"""A serializer that does nothing"""
|
||||
|
||||
def serialize_entity(self, context, entity):
|
||||
return entity
|
||||
|
||||
def deserialize_entity(self, context, entity):
|
||||
return entity
|
||||
Reference in New Issue
Block a user