Merge "Add object (de)serialization support to cells"
This commit is contained in:
commit
9d02587d3c
@ -36,6 +36,7 @@ from nova.consoleauth import rpcapi as consoleauth_rpcapi
|
||||
from nova import context
|
||||
from nova.db import base
|
||||
from nova import exception
|
||||
from nova.objects import base as objects_base
|
||||
from nova.objects import instance as instance_obj
|
||||
from nova.openstack.common import excutils
|
||||
from nova.openstack.common import importutils
|
||||
@ -163,6 +164,7 @@ class _BaseMessage(object):
|
||||
# Each sub-class should set this when the message is inited
|
||||
self.next_hops = []
|
||||
self.resp_queue = None
|
||||
self.serializer = objects_base.NovaObjectSerializer()
|
||||
|
||||
def __repr__(self):
|
||||
_dict = self._to_dict()
|
||||
@ -305,6 +307,11 @@ class _BaseMessage(object):
|
||||
_dict = self._to_dict()
|
||||
# Convert context to dict.
|
||||
_dict['ctxt'] = _dict['ctxt'].to_dict()
|
||||
# NOTE(comstud): 'method_kwargs' needs special serialization
|
||||
# because it may contain objects.
|
||||
method_kwargs = _dict['method_kwargs']
|
||||
for k, v in method_kwargs.items():
|
||||
method_kwargs[k] = self.serializer.serialize_entity(self.ctxt, v)
|
||||
return jsonutils.dumps(_dict)
|
||||
|
||||
def source_is_us(self):
|
||||
@ -1065,6 +1072,7 @@ class MessageRunner(object):
|
||||
self.our_name = CONF.cells.name
|
||||
for msg_type, cls in _CELL_MESSAGE_TYPE_TO_METHODS_CLS.iteritems():
|
||||
self.methods_by_type[msg_type] = cls(self)
|
||||
self.serializer = objects_base.NovaObjectSerializer()
|
||||
|
||||
def _process_message_locally(self, message):
|
||||
"""Message processing will call this when its determined that
|
||||
@ -1124,10 +1132,16 @@ class MessageRunner(object):
|
||||
another cell.
|
||||
"""
|
||||
message_dict = jsonutils.loads(json_message)
|
||||
message_type = message_dict.pop('message_type')
|
||||
# Need to convert context back.
|
||||
ctxt = message_dict['ctxt']
|
||||
message_dict['ctxt'] = context.RequestContext.from_dict(ctxt)
|
||||
# NOTE(comstud): We also need to re-serialize any objects that
|
||||
# exist in 'method_kwargs'.
|
||||
method_kwargs = message_dict['method_kwargs']
|
||||
for k, v in method_kwargs.items():
|
||||
method_kwargs[k] = self.serializer.deserialize_entity(
|
||||
message_dict['ctxt'], v)
|
||||
message_type = message_dict.pop('message_type')
|
||||
message_cls = _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS[message_type]
|
||||
return message_cls(self, **message_dict)
|
||||
|
||||
|
@ -82,6 +82,9 @@ class CellsRPCDriver(driver.BaseCellsDriver):
|
||||
"""
|
||||
topic_base = CONF.cells.rpc_driver_queue_base
|
||||
proxy_manager = InterCellRPCDispatcher(msg_runner)
|
||||
# NOTE(comstud): We do not need to use the object serializer
|
||||
# on this because object serialization is taken care for us in
|
||||
# the messaging module.
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([proxy_manager])
|
||||
for msg_type in msg_runner.get_message_types():
|
||||
topic = '%s.%s' % (topic_base, msg_type)
|
||||
|
@ -25,6 +25,7 @@ messging module.
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova import exception
|
||||
from nova.objects import base as objects_base
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common.rpc import proxy as rpc_proxy
|
||||
@ -77,6 +78,7 @@ class CellsAPI(rpc_proxy.RpcProxy):
|
||||
CONF.upgrade_levels.cells)
|
||||
super(CellsAPI, self).__init__(topic=CONF.cells.topic,
|
||||
default_version=self.BASE_RPC_API_VERSION,
|
||||
serializer=objects_base.NovaObjectSerializer(),
|
||||
version_cap=version_cap)
|
||||
|
||||
def cast_compute_api_method(self, ctxt, cell_name, method,
|
||||
|
@ -23,6 +23,7 @@ from nova.compute import vm_states
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.objects import base as objects_base
|
||||
from nova.objects import instance as instance_obj
|
||||
from nova.openstack.common import rpc
|
||||
from nova.openstack.common import timeutils
|
||||
@ -260,6 +261,49 @@ class CellsMessageClassesTestCase(test.TestCase):
|
||||
self.assertEqual(method_kwargs, call_info['kwargs'])
|
||||
self.assertEqual(target_cell, call_info['routing_path'])
|
||||
|
||||
def test_child_targeted_message_with_object(self):
|
||||
target_cell = 'api-cell!child-cell1'
|
||||
method = 'our_fake_method'
|
||||
direction = 'down'
|
||||
|
||||
call_info = {}
|
||||
|
||||
class CellsMsgingTestObject(objects_base.NovaObject):
|
||||
"""Test object. We just need 1 field in order to test
|
||||
that this gets serialized properly.
|
||||
"""
|
||||
fields = {'test': str}
|
||||
|
||||
test_obj = CellsMsgingTestObject()
|
||||
test_obj.test = 'meow'
|
||||
|
||||
method_kwargs = dict(obj=test_obj, arg1=1, arg2=2)
|
||||
|
||||
def our_fake_method(message, **kwargs):
|
||||
call_info['context'] = message.ctxt
|
||||
call_info['routing_path'] = message.routing_path
|
||||
call_info['kwargs'] = kwargs
|
||||
|
||||
fakes.stub_tgt_method(self, 'child-cell1', 'our_fake_method',
|
||||
our_fake_method)
|
||||
|
||||
tgt_message = messaging._TargetedMessage(self.msg_runner,
|
||||
self.ctxt, method,
|
||||
method_kwargs, direction,
|
||||
target_cell)
|
||||
tgt_message.process()
|
||||
|
||||
self.assertEqual(self.ctxt, call_info['context'])
|
||||
self.assertEqual(target_cell, call_info['routing_path'])
|
||||
self.assertEqual(3, len(call_info['kwargs']))
|
||||
self.assertEqual(1, call_info['kwargs']['arg1'])
|
||||
self.assertEqual(2, call_info['kwargs']['arg2'])
|
||||
# Verify we get a new object with what we expect.
|
||||
obj = call_info['kwargs']['obj']
|
||||
self.assertTrue(isinstance(obj, CellsMsgingTestObject))
|
||||
self.assertNotEqual(id(test_obj), id(obj))
|
||||
self.assertEqual(test_obj.test, obj.test)
|
||||
|
||||
def test_grandchild_targeted_message(self):
|
||||
target_cell = 'api-cell!child-cell2!grandchild-cell1'
|
||||
method = 'our_fake_method'
|
||||
|
@ -101,7 +101,7 @@ class CellsRPCDriverTestCase(test.TestCase):
|
||||
msg_runner = fakes.get_message_runner('api-cell')
|
||||
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
|
||||
message = messaging._TargetedMessage(msg_runner,
|
||||
self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=False)
|
||||
self.ctxt, 'fake', {}, 'down', cell_state, fanout=False)
|
||||
|
||||
call_info = {}
|
||||
|
||||
@ -139,7 +139,7 @@ class CellsRPCDriverTestCase(test.TestCase):
|
||||
msg_runner = fakes.get_message_runner('api-cell')
|
||||
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
|
||||
message = messaging._TargetedMessage(msg_runner,
|
||||
self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=True)
|
||||
self.ctxt, 'fake', {}, 'down', cell_state, fanout=True)
|
||||
|
||||
call_info = {}
|
||||
|
||||
@ -179,7 +179,7 @@ class CellsRPCDriverTestCase(test.TestCase):
|
||||
msg_runner = fakes.get_message_runner('api-cell')
|
||||
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
|
||||
message = messaging._BroadcastMessage(msg_runner,
|
||||
self.ctxt, 'fake', 'fake', 'down', fanout=True)
|
||||
self.ctxt, 'fake', {}, 'down', fanout=True)
|
||||
message.message_type = 'fake-message-type'
|
||||
|
||||
call_info = {}
|
||||
@ -198,7 +198,7 @@ class CellsRPCDriverTestCase(test.TestCase):
|
||||
msg_runner = fakes.get_message_runner('api-cell')
|
||||
dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
|
||||
message = messaging._BroadcastMessage(msg_runner,
|
||||
self.ctxt, 'fake', 'fake', 'down', fanout=True)
|
||||
self.ctxt, 'fake', {}, 'down', fanout=True)
|
||||
|
||||
call_info = {}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user