Remove duplicated logic about oslo notifier
In a previous patch, we added a duplicated module for RPC notifier. It seems not necessary at all. This patch removes it. This patch also removes some unnecessary logics in the common messaging module. Change-Id: I96e4ef9e7cdeb77b30f2f3379a6b02c6a70ea0aa
This commit is contained in:
parent
7a9f5f5fca
commit
4f3a25c031
|
@ -11,18 +11,19 @@
|
|||
# under the License.
|
||||
|
||||
import eventlet
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging
|
||||
from oslo_serialization import jsonutils
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from senlin.common import context
|
||||
|
||||
# An alias for the default serializer
|
||||
JsonPayloadSerializer = messaging.JsonPayloadSerializer
|
||||
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
|
||||
class RequestContextSerializer(oslo_messaging.Serializer):
|
||||
class RequestContextSerializer(messaging.Serializer):
|
||||
def __init__(self, base):
|
||||
self._base = base
|
||||
|
||||
|
@ -45,37 +46,32 @@ class RequestContextSerializer(oslo_messaging.Serializer):
|
|||
return context.RequestContext.from_dict(ctxt)
|
||||
|
||||
|
||||
class JsonPayloadSerializer(oslo_messaging.NoOpSerializer):
|
||||
@classmethod
|
||||
def serialize_entity(cls, context, entity):
|
||||
return jsonutils.to_primitive(entity, convert_instances=True)
|
||||
|
||||
|
||||
def setup(url=None, optional=False):
|
||||
"""Initialise the oslo_messaging layer."""
|
||||
global TRANSPORT, NOTIFIER
|
||||
|
||||
if url and url.startswith("fake://"):
|
||||
# NOTE(sileht): oslo_messaging fake driver uses time.sleep
|
||||
# NOTE: oslo_messaging fake driver uses time.sleep
|
||||
# for task switch, so we need to monkey_patch it
|
||||
eventlet.monkey_patch(time=True)
|
||||
|
||||
if not TRANSPORT:
|
||||
oslo_messaging.set_transport_defaults('senlin')
|
||||
messaging.set_transport_defaults('senlin')
|
||||
exmods = ['senlin.common.exception']
|
||||
try:
|
||||
TRANSPORT = oslo_messaging.get_transport(
|
||||
TRANSPORT = messaging.get_transport(
|
||||
cfg.CONF, url, allowed_remote_exmods=exmods)
|
||||
except oslo_messaging.InvalidTransportURL as e:
|
||||
except messaging.InvalidTransportURL as e:
|
||||
TRANSPORT = None
|
||||
if not optional or e.url:
|
||||
# NOTE(sileht): oslo_messaging is configured but unloadable
|
||||
# NOTE: oslo_messaging is configured but unloadable
|
||||
# so reraise the exception
|
||||
raise
|
||||
|
||||
if not NOTIFIER and TRANSPORT:
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||
topics=['versioned_notifications'])
|
||||
|
||||
|
||||
def cleanup():
|
||||
|
@ -83,25 +79,26 @@ def cleanup():
|
|||
global TRANSPORT, NOTIFIER
|
||||
if TRANSPORT:
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFIER = None
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
|
||||
def get_rpc_server(target, endpoint):
|
||||
"""Return a configured oslo_messaging rpc server."""
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
|
||||
executor='eventlet',
|
||||
serializer=serializer)
|
||||
return messaging.get_rpc_server(TRANSPORT, target, [endpoint],
|
||||
executor='eventlet',
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_rpc_client(**kwargs):
|
||||
"""Return a configured oslo_messaging RPCClient."""
|
||||
target = oslo_messaging.Target(**kwargs)
|
||||
target = messaging.Target(**kwargs)
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
return oslo_messaging.RPCClient(TRANSPORT, target,
|
||||
serializer=serializer)
|
||||
return messaging.RPCClient(TRANSPORT, target, serializer=serializer)
|
||||
|
||||
|
||||
def get_notifier(publisher_id):
|
||||
"""Return a configured oslo_messaging notifier."""
|
||||
global NOTIFIER
|
||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||
|
|
|
@ -10,9 +10,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from senlin.common import messaging
|
||||
from senlin.objects import base
|
||||
from senlin.objects import fields
|
||||
from senlin.rpc import notifier as rpc
|
||||
|
||||
|
||||
@base.SenlinObjectRegistry.register
|
||||
|
@ -100,7 +100,7 @@ class NotificationBase(base.SenlinObject):
|
|||
}
|
||||
|
||||
def _emit(self, context, event_type, publisher_id, payload):
|
||||
notifier = rpc.get_notifier(publisher_id)
|
||||
notifier = messaging.get_notifier(publisher_id)
|
||||
notify = getattr(notifier, self.priority)
|
||||
notify(context, event_type=event_type, payload=payload)
|
||||
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Senlin RPC Notifier."""
|
||||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from senlin.common import context as senlin_context
|
||||
from senlin.common import exception as senlin_exc
|
||||
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
|
||||
class JsonPayloadSerializer(messaging.NoOpSerializer):
|
||||
|
||||
@staticmethod
|
||||
def serialize_entity(context, entity):
|
||||
return jsonutils.to_primitive(entity, convert_instances=True)
|
||||
|
||||
|
||||
class RequestContextSerializer(messaging.Serializer):
|
||||
|
||||
def __init__(self, base):
|
||||
self._base = base
|
||||
|
||||
def serialize_entity(self, context, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.serialize_entity(context, entity)
|
||||
|
||||
def deserialize_entity(self, context, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.deserialize_entity(context, entity)
|
||||
|
||||
def serialize_context(self, context):
|
||||
return context.to_dict()
|
||||
|
||||
def deserialize_context(self, context):
|
||||
return senlin_context.RequestContext.from_dict(context)
|
||||
|
||||
|
||||
def init(conf):
|
||||
global TRANSPORT, NOTIFIER
|
||||
|
||||
exmods = [senlin_exc.__name__]
|
||||
TRANSPORT = messaging.get_notification_transport(
|
||||
conf, allowed_remote_exmods=exmods)
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||
topics=['versioned_notifications'])
|
||||
|
||||
|
||||
def cleanup():
|
||||
global TRANSPORT, NOTIFIER
|
||||
assert TRANSPORT is not None
|
||||
assert NOTIFIER is not None
|
||||
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
|
||||
def get_notifier(publisher_id):
|
||||
assert NOTIFIER is not None
|
||||
|
||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
|
@ -139,7 +139,7 @@ class TestNotificationBase(test_base.SenlinTestCase):
|
|||
actual_payload = mock_notify.call_args[1]['payload']
|
||||
self.assertJsonEqual(expected_payload, actual_payload)
|
||||
|
||||
@mock.patch('senlin.rpc.notifier.NOTIFIER')
|
||||
@mock.patch('senlin.common.messaging.NOTIFIER')
|
||||
def test_emit_notification(self, mock_notifier):
|
||||
|
||||
mock_context = mock.Mock()
|
||||
|
@ -152,7 +152,7 @@ class TestNotificationBase(test_base.SenlinTestCase):
|
|||
expected_event_type='test_object.update.start',
|
||||
expected_payload=self.expected_payload)
|
||||
|
||||
@mock.patch('senlin.rpc.notifier.NOTIFIER')
|
||||
@mock.patch('senlin.common.messaging.NOTIFIER')
|
||||
def test_emit_with_host_and_binary_as_publisher(self, mock_notifier):
|
||||
event_type = notification.EventType(
|
||||
object='test_object',
|
||||
|
@ -175,7 +175,7 @@ class TestNotificationBase(test_base.SenlinTestCase):
|
|||
expected_event_type='test_object.update',
|
||||
expected_payload=self.expected_payload)
|
||||
|
||||
@mock.patch('senlin.rpc.notifier.NOTIFIER')
|
||||
@mock.patch('senlin.common.messaging.NOTIFIER')
|
||||
def test_emit_event_type_without_phase(self, mock_notifier):
|
||||
noti = TestNotification(
|
||||
event_type=notification.EventType(
|
||||
|
@ -196,7 +196,7 @@ class TestNotificationBase(test_base.SenlinTestCase):
|
|||
expected_event_type='test_object.update',
|
||||
expected_payload=self.expected_payload)
|
||||
|
||||
@mock.patch('senlin.rpc.notifier.NOTIFIER')
|
||||
@mock.patch('senlin.common.messaging.NOTIFIER')
|
||||
def test_not_possible_to_emit_if_not_populated(self, mock_notifier):
|
||||
# create a non-populated payload
|
||||
payload = TestPayload(extra_field='test string')
|
||||
|
@ -216,7 +216,7 @@ class TestNotificationBase(test_base.SenlinTestCase):
|
|||
self.assertRaises(AssertionError, noti.emit, mock_context)
|
||||
self.assertFalse(mock_notifier.called)
|
||||
|
||||
@mock.patch('senlin.rpc.notifier.NOTIFIER')
|
||||
@mock.patch('senlin.common.messaging.NOTIFIER')
|
||||
def test_empty_schema(self, mock_notifier):
|
||||
# create a non-populated payload
|
||||
payload = TestPayloadEmptySchema(extra_field='test string')
|
||||
|
|
Loading…
Reference in New Issue