blazar/climate/tests/notification/test_notifier.py
Cristian A Sanchez 1d1ad49da0 Sends notifications at lease events
Climate now uses oslo.notify to send notifications when
CRUD lease operations are executed and when relevant lease
events happen (start_lease, before_end_lease, end_lease).
This implementation also adds one new event type called
before_end_lease to be sent when a lease is close to be ended.
Moreover, a new oslo.notify wrapper class was also added.

Change-Id: I861540e5ec5d1309ded2de56b2135bae5b59dad9
Implements: blueprint notifications
2014-04-21 12:11:13 -03:00

82 lines
2.9 KiB
Python

# Copyright 2014 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from climate.notification import notifier as notification
from climate import tests
CONF = cfg.CONF
class FakeNotifier(object):
def info(self):
pass
class NotifierTestCase(tests.TestCase):
def setUp(self):
super(NotifierTestCase, self).setUp()
self.group = 'notifications'
CONF.set_override('publisher_id', 'lease-service', self.group)
# Fake Oslo notifier
self.fake_notifier = self.patch(messaging, 'Notifier')
self.fake_notifier.return_value = FakeNotifier()
self.fake_transport = self.patch(messaging,
'get_transport').return_value
self.info_method = self.patch(FakeNotifier, 'info')
self.context = {'user_id': 1, 'token': 'aabbcc'}
self.payload = {'id': 1, 'name': 'Lease1', 'start-date': 'now'}
notification.init()
self.notifier = notification.Notifier()
def test_notify_with_wrong_level(self):
self.notifier._notify(self.context, 'wrong', 'event', self.payload)
self.info_method.assert_called_once_with(self.context,
'event', self.payload)
def test_send_lease_event(self):
self.notifier.send_lease_notification(self.context, self.payload,
'start')
self.info_method.assert_called_once_with(self.context,
'start',
self.payload)
def test_cleanup(self):
notification.cleanup()
self.fake_transport.cleanup.assert_called_once_with()
self.assertIsNone(notification.NOTIFIER)
self.assertIsNone(notification.TRANSPORT)
def test_init(self):
self.fake_transport.called_once
self.fake_notifier.called_once_with(self.fake_transport,
publisher_id='lease-service')
def test_init_called_twice_returns_same_instance(self):
prev_notifier = notification.NOTIFIER
prev_transport = notification.TRANSPORT
notification.init()
self.assertIs(prev_notifier, notification.NOTIFIER)
self.assertIs(prev_transport, notification.TRANSPORT)