oslo.messaging context must be a dict

oslo.messaging assume the context is a dict not a RequestContext
This patch ensure this.

Closes-bug: #1275771

Change-Id: Iae58d40f171d1d5780b74b747c26a74cca05b459
This commit is contained in:
Mehdi Abaakouk 2014-05-07 14:42:49 +02:00
parent dbeb6260d3
commit 5df4cb53ae
5 changed files with 48 additions and 13 deletions

View File

@ -595,7 +595,7 @@ def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
notifier = messaging.get_notifier(publisher_id="ceilometer.api")
notifier.info(None, notification, payload)
notifier.info(context.RequestContext(), notification, payload)
class OldSample(_Base):

View File

@ -18,6 +18,8 @@
from oslo.config import cfg
import oslo.messaging
from ceilometer.openstack.common import context
TRANSPORT = None
NOTIFIER = None
@ -28,6 +30,27 @@ _ALIASES = {
}
class RequestContextSerializer(oslo.messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
def serialize_context(self, ctxt):
return ctxt.to_dict()
def deserialize_context(self, ctxt):
return context.RequestContext(ctxt)
def setup(url=None):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
@ -36,7 +59,8 @@ def setup(url=None):
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
if not NOTIFIER:
NOTIFIER = oslo.messaging.Notifier(TRANSPORT)
serializer = RequestContextSerializer(None)
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
@ -52,15 +76,19 @@ def get_rpc_server(topic, endpoint):
"""Return a configured oslo.messaging rpc server."""
global TRANSPORT
target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(None)
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet')
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
"""Return a configured oslo.messaging RPCClient."""
global TRANSPORT
target = oslo.messaging.Target(**kwargs)
return oslo.messaging.RPCClient(TRANSPORT, target)
serializer = RequestContextSerializer(None)
return oslo.messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
def get_notification_listener(targets, endpoints, url=None):
@ -71,8 +99,10 @@ def get_notification_listener(targets, endpoints, url=None):
_ALIASES)
else:
transport = TRANSPORT
serializer = RequestContextSerializer(None)
return oslo.messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet')
transport, targets, endpoints, executor='eventlet',
serializer=serializer)
def get_notifier(publisher_id):
@ -83,9 +113,9 @@ def get_notifier(publisher_id):
def convert_to_old_notification_format(priority, ctxt, publisher_id,
event_type, payload, metadata):
#FIXME(sileht): temporary convert notification to old format
#to focus on oslo.messaging migration before refactoring the code to
#use the new oslo.messaging facilities
# FIXME(sileht): temporary convert notification to old format
# to focus on oslo.messaging migration before refactoring the code to
# use the new oslo.messaging facilities
notification = {'priority': priority,
'payload': payload,
'event_type': event_type,

View File

@ -142,7 +142,7 @@ class RPCPublisher(publisher.PublisherBase):
self.flush()
def flush(self):
#note(sileht):
# NOTE(sileht):
# IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to
@ -164,7 +164,7 @@ class RPCPublisher(publisher.PublisherBase):
"dropping %d oldest samples") % count)
def _process_queue(self, queue, policy):
#note(sileht):
# NOTE(sileht):
# the behavior of rpc.cast call depends of rabbit_max_retries
# if rabbit_max_retries <= 0:
# it returns only if the msg has been sent on the amqp queue
@ -178,7 +178,7 @@ class RPCPublisher(publisher.PublisherBase):
context, topic, meters = queue[0]
try:
self.rpc_client.prepare(topic=topic).cast(
context.to_dict(), self.target, data=meters)
context, self.target, data=meters)
except oslo.messaging._drivers.common.RPCException:
samples = sum([len(m) for __, __, m in queue])
if policy == 'queue':

View File

@ -23,6 +23,7 @@ import mock
from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
@ -30,7 +31,8 @@ from ceilometer.storage import models
class TestRPCAlarmNotifier(test.BaseTestCase):
def fake_cast(self, context, method, **args):
def fake_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
self.notified.append((method, args))
def setUp(self):
@ -130,7 +132,8 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
def fake_fanout_cast(self, context, method, **args):
def fake_fanout_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
self.notified.append((method, args))
def fake_prepare(self, fanout):

View File

@ -23,6 +23,7 @@ import datetime
import mock
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import timeutils
from ceilometer.tests.api.v2 import FunctionalTest
@ -32,6 +33,7 @@ from ceilometer.tests import db as tests_db
class TestPostSamples(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def fake_cast(self, ctxt, target, data):
self.assertIsInstance(ctxt, context.RequestContext)
for m in data:
del m['message_signature']
self.published.append(data)