oslo.messaging context must be a dict

oslo.messaging assumes the context is a dict not a RequestContext
and it assumes the payload in json serializable.

This patch ensures this.

Also it removes oslo.messaging mock on some tests and use real oslo.messaging
library with the fake driver.

Change-Id: Ie3c6083bbc4ec83de28e42bb10e7c50c7e135070
Closes-bug: #1275771
Closes-bug: #1317290
This commit is contained in:
Mehdi Abaakouk 2014-05-07 14:42:49 +02:00
parent 34db7d95c8
commit 6f9e46ba5c
9 changed files with 307 additions and 86 deletions

View File

@ -595,7 +595,9 @@ def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
notifier = messaging.get_notifier(publisher_id="ceilometer.api")
notifier.info(None, notification, payload)
# FIXME(sileht): perhaps we need to copy some infos from the
# pecan request headers like nova does
notifier.info(context.RequestContext(), notification, payload)
class OldSample(_Base):

View File

@ -15,9 +15,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
import oslo.messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common import jsonutils
TRANSPORT = None
NOTIFIER = None
@ -28,15 +33,51 @@ _ALIASES = {
}
class RequestContextSerializer(oslo.messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return ctxt.to_dict()
@staticmethod
def deserialize_context(ctxt):
return context.RequestContext(ctxt)
class JsonPayloadSerializer(oslo.messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
if url and url.startswith("fake://"):
# NOTE(sileht): oslo.messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
oslo.messaging.set_transport_defaults('ceilometer')
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
if not NOTIFIER:
NOTIFIER = oslo.messaging.Notifier(TRANSPORT)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
@ -52,15 +93,19 @@ def get_rpc_server(topic, endpoint):
"""Return a configured oslo.messaging rpc server."""
global TRANSPORT
target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet')
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
"""Return a configured oslo.messaging RPCClient."""
global TRANSPORT
target = oslo.messaging.Target(**kwargs)
return oslo.messaging.RPCClient(TRANSPORT, target)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
def get_notification_listener(targets, endpoints, url=None):
@ -83,9 +128,9 @@ def get_notifier(publisher_id):
def convert_to_old_notification_format(priority, ctxt, publisher_id,
event_type, payload, metadata):
#FIXME(sileht): temporary convert notification to old format
#to focus on oslo.messaging migration before refactoring the code to
#use the new oslo.messaging facilities
# FIXME(sileht): temporary convert notification to old format
# to focus on oslo.messaging migration before refactoring the code to
# use the new oslo.messaging facilities
notification = {'priority': priority,
'payload': payload,
'event_type': event_type,

View File

@ -142,7 +142,7 @@ class RPCPublisher(publisher.PublisherBase):
self.flush()
def flush(self):
#note(sileht):
# NOTE(sileht):
# IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to
@ -164,7 +164,7 @@ class RPCPublisher(publisher.PublisherBase):
"dropping %d oldest samples") % count)
def _process_queue(self, queue, policy):
#note(sileht):
# NOTE(sileht):
# the behavior of rpc.cast call depends of rabbit_max_retries
# if rabbit_max_retries <= 0:
# it returns only if the msg has been sent on the amqp queue
@ -178,7 +178,7 @@ class RPCPublisher(publisher.PublisherBase):
context, topic, meters = queue[0]
try:
self.rpc_client.prepare(topic=topic).cast(
context.to_dict(), self.target, data=meters)
context, self.target, data=meters)
except oslo.messaging._drivers.common.RPCException:
samples = sum([len(m) for __, __, m in queue])
if policy == 'queue':

View File

@ -19,30 +19,38 @@
import uuid
from ceilometerclient.v2 import alarms
import mock
import eventlet
from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import messaging
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
class TestRPCAlarmNotifier(test.BaseTestCase):
def fake_cast(self, context, method, **args):
self.notified.append((method, args))
class FakeNotifier(object):
def __init__(self):
self.rpc = messaging.get_rpc_server("alarm_notifier", self)
self.notified = []
def start(self, expected_length):
self.expected_length = expected_length
self.rpc.start()
def notify_alarm(self, context, data):
self.notified.append(data)
if len(self.notified) == self.expected_length:
self.rpc.stop()
class TestRPCAlarmNotifier(test.BaseTestCase):
def setUp(self):
super(TestRPCAlarmNotifier, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.notified = []
self.notifier_server = FakeNotifier()
self.notifier = rpc_alarm.RPCAlarmNotifier()
self.useFixture(mockpatch.PatchObject(
self.notifier.client, 'cast',
side_effect=self.fake_cast))
self.alarms = [
alarms.Alarm(None, info={
'name': 'instance_running_hot',
@ -83,29 +91,36 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
self.assertEqual('alarm_notifier', topic)
def test_notify_alarm(self):
self.notifier_server.start(2)
previous = ['alarm', 'ok']
for i, a in enumerate(self.alarms):
self.notifier.notify(a, previous[i], "what? %d" % i,
{'fire': '%d' % i})
self.assertEqual(2, len(self.notified))
self.notifier_server.rpc.wait()
self.assertEqual(2, len(self.notifier_server.notified))
for i, a in enumerate(self.alarms):
actions = getattr(a, models.Alarm.ALARM_ACTIONS_MAP[a.state])
self.assertEqual('notify_alarm', self.notified[i][0])
self.assertEqual(self.alarms[i].alarm_id,
self.notified[i][1]["data"]["alarm_id"])
self.assertEqual(actions, self.notified[i][1]["data"]["actions"])
self.notifier_server.notified[i]["alarm_id"])
self.assertEqual(actions,
self.notifier_server.notified[i]["actions"])
self.assertEqual(previous[i],
self.notified[i][1]["data"]["previous"])
self.notifier_server.notified[i]["previous"])
self.assertEqual(self.alarms[i].state,
self.notified[i][1]["data"]["current"])
self.notifier_server.notified[i]["current"])
self.assertEqual("what? %d" % i,
self.notified[i][1]["data"]["reason"])
self.notifier_server.notified[i]["reason"])
self.assertEqual({'fire': '%d' % i},
self.notified[i][1]["data"]["reason_data"])
self.notifier_server.notified[i]["reason_data"])
def test_notify_non_string_reason(self):
self.notifier_server.start(1)
self.notifier.notify(self.alarms[0], 'ok', 42, {})
reason = self.notified[0][1]['data']['reason']
self.notifier_server.rpc.wait()
reason = self.notifier_server.notified[0]['reason']
self.assertIsInstance(reason, basestring)
def test_notify_no_actions(self):
@ -126,52 +141,99 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
'my_instance'}
})
self.notifier.notify(alarm, 'alarm', "what?", {})
self.assertEqual(0, len(self.notified))
self.assertEqual(0, len(self.notifier_server.notified))
class FakeCoordinator(object):
def __init__(self):
self.rpc = messaging.get_rpc_server(
"alarm_partition_coordination", self)
self.notified = []
def presence(self, context, data):
self._record('presence', data)
def allocate(self, context, data):
self._record('allocate', data)
def assign(self, context, data):
self._record('assign', data)
def _record(self, method, data):
self.notified.append((method, data))
self.rpc.stop()
class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
def fake_fanout_cast(self, context, method, **args):
self.notified.append((method, args))
def fake_prepare(self, fanout):
self.assertTrue(fanout)
cctxt = mock.Mock()
cctxt.cast.side_effect = self.fake_fanout_cast
return cctxt
def setUp(self):
super(TestRPCAlarmPartitionCoordination, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.notified = []
self.coordinator_server = FakeCoordinator()
self.coordinator_server.rpc.start()
eventlet.sleep() # must be sure that fanout queue is created
self.ordination = rpc_alarm.RPCAlarmPartitionCoordination()
self.useFixture(mockpatch.PatchObject(
self.ordination.client, 'prepare',
side_effect=self.fake_prepare))
self.alarms = [mock.MagicMock(), mock.MagicMock()]
self.alarms = [
alarms.Alarm(None, info={
'name': 'instance_running_hot',
'meter_name': 'cpu_util',
'comparison_operator': 'gt',
'threshold': 80.0,
'evaluation_periods': 5,
'statistic': 'avg',
'state': 'ok',
'ok_actions': ['http://host:8080/path'],
'user_id': 'foobar',
'project_id': 'snafu',
'period': 60,
'alarm_id': str(uuid.uuid4()),
'matching_metadata':{'resource_id':
'my_instance'}
}),
alarms.Alarm(None, info={
'name': 'group_running_idle',
'meter_name': 'cpu_util',
'comparison_operator': 'le',
'threshold': 10.0,
'statistic': 'max',
'evaluation_periods': 4,
'state': 'insufficient data',
'insufficient_data_actions': ['http://other_host/path'],
'user_id': 'foobar',
'project_id': 'snafu',
'period': 300,
'alarm_id': str(uuid.uuid4()),
'matching_metadata':{'metadata.user_metadata.AS':
'my_group'}
}),
]
def test_ordination_presence(self):
id = uuid.uuid4()
id = str(uuid.uuid4())
priority = float(timeutils.utcnow().strftime('%s.%f'))
self.ordination.presence(id, priority)
method, args = self.notified[0]
self.assertEqual(id, args['data']['uuid'])
self.assertEqual(priority, args['data']['priority'])
self.coordinator_server.rpc.wait()
method, args = self.coordinator_server.notified[0]
self.assertEqual(id, args['uuid'])
self.assertEqual(priority, args['priority'])
self.assertEqual('presence', method)
def test_ordination_assign(self):
id = uuid.uuid4()
id = str(uuid.uuid4())
self.ordination.assign(id, self.alarms)
method, args = self.notified[0]
self.assertEqual(id, args['data']['uuid'])
self.assertEqual(2, len(args['data']['alarms']))
self.coordinator_server.rpc.wait()
method, args = self.coordinator_server.notified[0]
self.assertEqual(id, args['uuid'])
self.assertEqual(2, len(args['alarms']))
self.assertEqual('assign', method)
def test_ordination_allocate(self):
id = uuid.uuid4()
id = str(uuid.uuid4())
self.ordination.allocate(id, self.alarms)
method, args = self.notified[0]
self.assertEqual(id, args['data']['uuid'])
self.assertEqual(2, len(args['data']['alarms']))
self.coordinator_server.rpc.wait()
method, args = self.coordinator_server.notified[0]
self.assertEqual(id, args['uuid'])
self.assertEqual(2, len(args['alarms']))
self.assertEqual('allocate', method)

View File

@ -19,6 +19,7 @@
"""
from oslo.config import cfg
import oslo.messaging.conffixture
import pecan
import pecan.testing
@ -38,9 +39,11 @@ class FunctionalTest(db_test_base.TestBase):
PATH_PREFIX = ''
def setUp(self):
super(FunctionalTest, self).setUp()
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
self.CONF.set_override("notification_driver", "messaging")
messaging.setup("fake://")
self.addCleanup(messaging.cleanup)
super(FunctionalTest, self).setUp()
self.CONF.set_override("auth_version", "v2.0",
group=OPT_GROUP_NAME)
self.CONF.set_override("policy_file",

View File

@ -26,7 +26,7 @@ import logging
import uuid
import mock
import oslo.messaging.conffixture
from six import moves
from ceilometer import messaging
@ -1844,21 +1844,40 @@ class TestAlarms(FunctionalTest,
}
}
with mock.patch.object(messaging, 'get_notifier') as get_notifier:
notifier = get_notifier.return_value
endpoint = mock.MagicMock()
target = oslo.messaging.Target(topic="notifications",
exchange="ceilometer")
listener = messaging.get_notification_listener([target],
[endpoint])
listener.start()
endpoint.info.side_effect = lambda *args: listener.stop()
self.post_json('/alarms', params=json, headers=self.auth_headers)
listener.wait()
self.post_json('/alarms', params=json, headers=self.auth_headers)
get_notifier.assert_called_once_with(publisher_id='ceilometer.api')
class PayloadMatcher(object):
def __eq__(self, payload):
return payload['detail']['name'] == 'sent_notification' and \
payload['type'] == 'creation' and \
payload['detail']['rule']['meter_name'] == 'ameter' and \
set(['alarm_id', 'detail', 'event_id', 'on_behalf_of',
'project_id', 'timestamp',
'user_id']).issubset(payload.keys())
calls = notifier.info.call_args_list
self.assertEqual(1, len(calls))
args, _ = calls[0]
context, event_type, payload = args
self.assertEqual('alarm.creation', event_type)
self.assertEqual('sent_notification', payload['detail']['name'])
self.assertTrue(set(['alarm_id', 'detail', 'event_id', 'on_behalf_of',
'project_id', 'timestamp', 'type',
'user_id']).issubset(payload.keys()))
endpoint.info.assert_called_once_with(
{'instance_uuid': None,
'domain': None,
'project_domain': None,
'auth_token': None,
'is_admin': False,
'user': None,
'tenant': None,
'read_only': False,
'show_deleted': False,
'user_identity': '- - - - -',
'request_id': mock.ANY,
'user_domain': None},
'ceilometer.api', 'alarm.creation',
PayloadMatcher(), mock.ANY)
def test_alarm_sends_notification(self):
# Hit the AlarmController (with alarm_id supplied) ...

View File

@ -26,6 +26,7 @@ import oslo.messaging
import oslo.messaging._drivers.common
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import test
@ -95,25 +96,36 @@ class TestPublish(test.BaseTestCase):
def setUp(self):
super(TestPublish, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.published = []
def test_published(self):
def test_published_no_mock(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://'))
cast_context = mock.MagicMock()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
prepare.return_value = cast_context
publisher.publish_samples(mock.MagicMock(),
self.test_data)
prepare.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic)
cast_context.cast.assert_called_once_with(
mock.ANY, 'record_metering_data', data=mock.ANY)
endpoint = mock.MagicMock(['record_metering_data'])
collector = messaging.get_rpc_server(
self.CONF.publisher_rpc.metering_topic, endpoint)
endpoint.record_metering_data.side_effect = \
lambda *args, **kwds: collector.stop()
collector.start()
eventlet.sleep()
publisher.publish_samples(context.RequestContext(),
self.test_data)
collector.wait()
class Matcher(object):
@staticmethod
def __eq__(data):
for i, sample in enumerate(data):
if sample['counter_name'] != self.test_data[i].name:
return False
return True
endpoint.record_metering_data.assert_called_once_with(
mock.ANY, data=Matcher())
def test_publish_target(self):
publisher = rpc.RPCPublisher(

View File

@ -25,7 +25,9 @@ from stevedore import extension
from ceilometer import collector
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer.tests import base as tests_base
@ -41,10 +43,11 @@ class TestCollector(tests_base.BaseTestCase):
super(TestCollector, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override("connection", "log://", group='database')
self.srv = collector.CollectorService()
self.CONF.publisher.metering_secret = 'not-so-secret'
self.CONF.set_override('metering_secret', 'not-so-secret',
group='publisher')
self.counter = sample.Sample(
name='foobar',
type='bad',
@ -53,7 +56,7 @@ class TestCollector(tests_base.BaseTestCase):
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp='NOW!',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
).as_dict()
@ -66,12 +69,14 @@ class TestCollector(tests_base.BaseTestCase):
user_id=u'test',
project_id=u'test',
resource_id=u'test_run_tasks',
timestamp=u'NOW!',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={u'name': [([u'TestPublish'])]},
source=u'testsource',
),
'not-so-secret')
self.srv = collector.CollectorService()
def _make_test_manager(self, plugin):
return extension.ExtensionManager.make_test_instance([
extension.Extension(
@ -202,3 +207,18 @@ class TestCollector(tests_base.BaseTestCase):
self.assertTrue(utils.verify_signature(
mock_dispatcher.method_calls[0][1][0],
"not-so-secret"))
@mock.patch('ceilometer.storage.impl_log.LOG')
def test_collector_no_mock(self, mylog):
self.CONF.set_override('udp_address', '', group='collector')
self.srv.start()
mylog.info.side_effect = lambda *args: self.srv.stop()
client = messaging.get_rpc_client(version='1.0')
cclient = client.prepare(topic='metering')
cclient.cast(context.RequestContext(),
'record_metering_data', data=[self.utf8_msg])
self.srv.rpc_server.wait()
mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1')

View File

@ -20,11 +20,15 @@
import mock
import oslo.messaging
import oslo.messaging.conffixture
from stevedore import extension
import yaml
from ceilometer.compute.notifications import instance
from ceilometer import messaging
from ceilometer import notification
from ceilometer.openstack.common import context
from ceilometer.openstack.common import fileutils
from ceilometer.openstack.common.fixture import config
from ceilometer.tests import base as tests_base
@ -158,3 +162,57 @@ class TestNotification(tests_base.BaseTestCase):
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0]
self.assertEqual(1, len(list(event_endpoint.dispatcher_manager)))
class TestRealNotification(tests_base.BaseTestCase):
def setUp(self):
super(TestRealNotification, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
pipeline = yaml.dump([{
'name': 'test_pipeline',
'interval': 5,
'counters': ['*'],
'transformers': [],
'publishers': ['test://'],
}])
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("notification_driver", "messaging")
self.CONF.set_override("control_exchange", "nova")
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.srv = notification.NotificationService()
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
self.srv.start()
fake_publisher = fake_publisher_cls.return_value
fake_publisher.publish_samples.side_effect = \
lambda *args: self.srv.stop()
notifier = messaging.get_notifier("compute.vagrant-precise")
notifier.info(context.RequestContext(), 'compute.instance.create.end',
TEST_NOTICE_PAYLOAD)
self.srv.listeners[0].wait()
class SamplesMatcher(object):
def __eq__(self, samples):
for s in samples:
if s.resource_id != "9f9d01b9-4a58-4271-9e27-398b21ab20d1":
return False
return True
fake_publisher.publish_samples.assert_has_calls([
mock.call(mock.ANY, SamplesMatcher()),
mock.call(mock.ANY, SamplesMatcher()),
mock.call(mock.ANY, SamplesMatcher()),
mock.call(mock.ANY, SamplesMatcher()),
mock.call(mock.ANY, SamplesMatcher()),
])