Add generic serialization support
Copy across Dan Smith's work in commit 93ee6e3.
This commit is contained in:
parent
952f82e197
commit
4ede9a9178
@ -23,6 +23,7 @@ from oslo.config import cfg
|
||||
from openstack.common.gettextutils import _
|
||||
from openstack.common import local
|
||||
from openstack.common import log as logging
|
||||
from openstack.common.messaging import serializer as msg_serializer
|
||||
from openstack.common.messaging import target
|
||||
from openstack.common.messaging import _utils as utils
|
||||
|
||||
@ -49,28 +50,35 @@ class RpcVersionCapError(Exception):
|
||||
|
||||
class _CallContext(object):
|
||||
|
||||
def __init__(self, transport, target,
|
||||
def __init__(self, transport, target, serializer,
|
||||
timeout=None, check_for_lock=None, version_cap=None):
|
||||
self.conf = transport.conf
|
||||
|
||||
self.transport = transport
|
||||
self.target = target
|
||||
self.serializer = serializer
|
||||
self.timeout = timeout
|
||||
self.check_for_lock = check_for_lock
|
||||
self.version_cap = version_cap
|
||||
|
||||
super(_CallContext, self).__init__()
|
||||
|
||||
def _make_message(self, method, args):
|
||||
msg = dict(method=method, args=args)
|
||||
def _make_message(self, ctxt, method, args):
|
||||
msg = dict(method=method)
|
||||
|
||||
msg['args'] = dict()
|
||||
for argname, arg in args.iteritems():
|
||||
msg['args'][argname] = self.serializer.serialize_entity(ctxt, arg)
|
||||
|
||||
if self.target.namespace is not None:
|
||||
msg['namespace'] = self.target.namespace
|
||||
if self.target.version is not None:
|
||||
msg['version'] = self.target.version
|
||||
|
||||
return msg
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
msg = self._make_message(method, kwargs)
|
||||
msg = self._make_message(ctxt, method, kwargs)
|
||||
self.transport._send(target, ctxt, msg)
|
||||
|
||||
def _check_for_lock(self):
|
||||
@ -91,7 +99,7 @@ class _CallContext(object):
|
||||
version_cap=self.version_cap)
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
msg = self._make_message(method, kwargs)
|
||||
msg = self._make_message(ctxt, method, kwargs)
|
||||
|
||||
timeout = self.timeout
|
||||
if self.timeout is None:
|
||||
@ -102,14 +110,16 @@ class _CallContext(object):
|
||||
if self.version_cap:
|
||||
self._check_version_cap(msg.get('version'))
|
||||
|
||||
return self.transport._send(self.target, ctxt, msg,
|
||||
result = self.transport._send(self.target, ctxt, msg,
|
||||
wait_for_reply=True, timeout=timeout)
|
||||
return self.serializer.deserialize_entity(ctxt, result)
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
|
||||
def __init__(self, transport, target,
|
||||
timeout=None, check_for_lock=None, version_cap=None):
|
||||
timeout=None, check_for_lock=None,
|
||||
version_cap=None, serializer=None):
|
||||
self.conf = transport.conf
|
||||
self.conf.register_opts(_client_opts)
|
||||
|
||||
@ -118,6 +128,7 @@ class RPCClient(object):
|
||||
self.timeout = timeout
|
||||
self.check_for_lock = check_for_lock
|
||||
self.version_cap = version_cap
|
||||
self.serializer = serializer or msg_serialier.NoOpSerializer()
|
||||
|
||||
super(RPCClient, self).__init__()
|
||||
|
||||
@ -135,7 +146,9 @@ class RPCClient(object):
|
||||
version_cap = self.version_cap
|
||||
|
||||
return _CallContext(self.transport, target,
|
||||
timeout, check_for_lock, version_cap)
|
||||
self.serializer,
|
||||
timeout, check_for_lock,
|
||||
version_cap)
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
self.prepare().cast(ctxt, method, **kwargs)
|
||||
|
@ -20,6 +20,7 @@ from openstack.common.gettextutils import _
|
||||
from openstack.common import log as logging
|
||||
from openstack.common.messaging import _server as server
|
||||
from openstack.common.messaging import _utils as utils
|
||||
from openstack.common.messaging import serializer as msg_serializer
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -49,14 +50,22 @@ class UnsupportedVersion(RPCDispatcherError):
|
||||
class RPCDispatcher(object):
|
||||
"Pass messages to the API objects for processing."
|
||||
|
||||
def __init__(self, endpoints):
|
||||
def __init__(self, endpoints, serializer=None):
|
||||
self.endpoints = endpoints
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
|
||||
@staticmethod
|
||||
def _is_compatible(endpoint, version):
|
||||
endpoint_version = endpoint.target.version or '1.0'
|
||||
return utils.version_is_compatible(endpoint_version, version)
|
||||
|
||||
def _dispatch(self, endpoint, method, ctxt, args):
|
||||
new_args = dict()
|
||||
for argname, arg in args.iteritems():
|
||||
new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
|
||||
result = getattr(endpoint, method)(ctxt, **new_args)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
def __call__(self, ctxt, message):
|
||||
method = message.get('method')
|
||||
args = message.get('args', {})
|
||||
@ -71,7 +80,7 @@ class RPCDispatcher(object):
|
||||
is_compatible = self._is_compatible(endpoint, version)
|
||||
|
||||
if is_compatible and hasattr(endpoint, method):
|
||||
return getattr(endpoint, method)(ctxt, **args)
|
||||
return self._dispatch(endpoint, method, ctxt, args)
|
||||
|
||||
found_compatible = found_compatible or is_compatible
|
||||
|
||||
|
52
openstack/common/messaging/serializer.py
Normal file
52
openstack/common/messaging/serializer.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Provides the definition of a message serialization handler"""
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class Serializer(object):
|
||||
"""Generic (de-)serialization definition base class"""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize_entity(self, context, entity):
|
||||
"""Serialize something to primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Entity to be serialized
|
||||
:returns: Serialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize_entity(self, context, entity):
|
||||
"""Deserialize something from primitive form.
|
||||
|
||||
:param context: Security context
|
||||
:param entity: Primitive to be deserialized
|
||||
:returns: Deserialized form of entity
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NoOpSerializer(Serializer):
|
||||
"""A serializer that does nothing"""
|
||||
|
||||
def serialize_entity(self, context, entity):
|
||||
return entity
|
||||
|
||||
def deserialize_entity(self, context, entity):
|
||||
return entity
|
Loading…
Reference in New Issue
Block a user