Revert "oslo.messaging context must be a dict"

This reverts commit 5df4cb53ae.

The commit in question appears to have broken the notification agent
as an unintended side-effect. Reverting pending further investigation
as the notification agent is the more crucial component.

Related-to: #1317290
Change-Id: If9b1d638b60b7029c67f45e2e9feda62b9fd042a
This commit is contained in:
Eoghan Glynn 2014-05-07 23:24:21 +01:00
parent 0a21107a6c
commit 36269031a7
5 changed files with 13 additions and 48 deletions

View File

@ -595,7 +595,7 @@ def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
notifier = messaging.get_notifier(publisher_id="ceilometer.api")
notifier.info(context.RequestContext(), notification, payload)
notifier.info(None, notification, payload)
class OldSample(_Base):

View File

@ -18,8 +18,6 @@
from oslo.config import cfg
import oslo.messaging
from ceilometer.openstack.common import context
TRANSPORT = None
NOTIFIER = None
@ -30,27 +28,6 @@ _ALIASES = {
}
class RequestContextSerializer(oslo.messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
def serialize_context(self, ctxt):
return ctxt.to_dict()
def deserialize_context(self, ctxt):
return context.RequestContext(ctxt)
def setup(url=None):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
@ -59,8 +36,7 @@ def setup(url=None):
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
if not NOTIFIER:
serializer = RequestContextSerializer(None)
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
NOTIFIER = oslo.messaging.Notifier(TRANSPORT)
def cleanup():
@ -76,19 +52,15 @@ def get_rpc_server(topic, endpoint):
"""Return a configured oslo.messaging rpc server."""
global TRANSPORT
target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(None)
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
executor='eventlet')
def get_rpc_client(**kwargs):
"""Return a configured oslo.messaging RPCClient."""
global TRANSPORT
target = oslo.messaging.Target(**kwargs)
serializer = RequestContextSerializer(None)
return oslo.messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
return oslo.messaging.RPCClient(TRANSPORT, target)
def get_notification_listener(targets, endpoints, url=None):
@ -99,10 +71,8 @@ def get_notification_listener(targets, endpoints, url=None):
_ALIASES)
else:
transport = TRANSPORT
serializer = RequestContextSerializer(None)
return oslo.messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet',
serializer=serializer)
transport, targets, endpoints, executor='eventlet')
def get_notifier(publisher_id):
@ -113,9 +83,9 @@ def get_notifier(publisher_id):
def convert_to_old_notification_format(priority, ctxt, publisher_id,
event_type, payload, metadata):
# FIXME(sileht): temporary convert notification to old format
# to focus on oslo.messaging migration before refactoring the code to
# use the new oslo.messaging facilities
#FIXME(sileht): temporary convert notification to old format
#to focus on oslo.messaging migration before refactoring the code to
#use the new oslo.messaging facilities
notification = {'priority': priority,
'payload': payload,
'event_type': event_type,

View File

@ -142,7 +142,7 @@ class RPCPublisher(publisher.PublisherBase):
self.flush()
def flush(self):
# NOTE(sileht):
#note(sileht):
# IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to
@ -164,7 +164,7 @@ class RPCPublisher(publisher.PublisherBase):
"dropping %d oldest samples") % count)
def _process_queue(self, queue, policy):
# NOTE(sileht):
#note(sileht):
# the behavior of rpc.cast call depends of rabbit_max_retries
# if rabbit_max_retries <= 0:
# it returns only if the msg has been sent on the amqp queue
@ -178,7 +178,7 @@ class RPCPublisher(publisher.PublisherBase):
context, topic, meters = queue[0]
try:
self.rpc_client.prepare(topic=topic).cast(
context, self.target, data=meters)
context.to_dict(), self.target, data=meters)
except oslo.messaging._drivers.common.RPCException:
samples = sum([len(m) for __, __, m in queue])
if policy == 'queue':

View File

@ -23,7 +23,6 @@ import mock
from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
@ -31,8 +30,7 @@ from ceilometer.storage import models
class TestRPCAlarmNotifier(test.BaseTestCase):
def fake_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
def fake_cast(self, context, method, **args):
self.notified.append((method, args))
def setUp(self):
@ -132,8 +130,7 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
def fake_fanout_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
def fake_fanout_cast(self, context, method, **args):
self.notified.append((method, args))
def fake_prepare(self, fanout):

View File

@ -23,7 +23,6 @@ import datetime
import mock
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import timeutils
from ceilometer.tests.api.v2 import FunctionalTest
@ -33,7 +32,6 @@ from ceilometer.tests import db as tests_db
class TestPostSamples(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def fake_cast(self, ctxt, target, data):
self.assertIsInstance(ctxt, context.RequestContext)
for m in data:
del m['message_signature']
self.published.append(data)