Short-circuit notifications when not enabled
When running Cinder with notifications disabled we are still doing all the work and calls to Oslo message notifier and it's there, at the very last moment when it's going to send the message that it checks that there's no actual extension loaded in the driver manager and skips the actual send of the data. This is considerably wasteful considering that for some of the notifications we are actually querying the DB to get data, for example volume attachments and glance metadata information when notifying about volume usage. This patch proposes short-circuiting notification methods as much as possible to optimize code execution when Cinder has no notification transport mechanism configured, as is the case when deployed as a standalone SDS service. Closes-Bug: #1660303 Change-Id: I77f655d3ef90088ce71304da5d4ea7b543991e90
This commit is contained in:
@@ -47,6 +47,7 @@ class QoSSpecsController(wsgi.Controller):
|
||||
_view_builder_class = view_qos_specs.ViewBuilder
|
||||
|
||||
@staticmethod
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_qos_specs_error(context, method, payload):
|
||||
rpc.get_notifier('QoSSpecs').error(context,
|
||||
method,
|
||||
|
||||
@@ -39,12 +39,14 @@ class VolumeTypesManageController(wsgi.Controller):
|
||||
|
||||
_view_builder_class = views_types.ViewBuilder
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_volume_type_error(self, context, method, err,
|
||||
volume_type=None, id=None, name=None):
|
||||
payload = dict(
|
||||
volume_types=volume_type, name=name, id=id, error_message=err)
|
||||
rpc.get_notifier('volumeType').error(context, method, payload)
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_volume_type_info(self, context, method, volume_type):
|
||||
payload = dict(volume_types=volume_type)
|
||||
rpc.get_notifier('volumeType').info(context, method, payload)
|
||||
|
||||
@@ -42,12 +42,14 @@ class GroupTypesController(wsgi.Controller):
|
||||
}
|
||||
policy.enforce(context, 'group:group_types_manage', target)
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_group_type_error(self, context, method, err,
|
||||
group_type=None, id=None, name=None):
|
||||
payload = dict(
|
||||
group_types=group_type, name=name, id=id, error_message=err)
|
||||
rpc.get_notifier('groupType').error(context, method, payload)
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_group_type_info(self, context, method, group_type):
|
||||
payload = dict(group_types=group_type)
|
||||
rpc.get_notifier('groupType').info(context, method, payload)
|
||||
|
||||
@@ -22,6 +22,7 @@ from oslo_utils import timeutils
|
||||
from cinder.i18n import _LW
|
||||
from cinder import objects
|
||||
from cinder import rpc
|
||||
from cinder import utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
@@ -182,15 +183,19 @@ class ImageVolumeCache(object):
|
||||
|
||||
return True
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_cache_hit(self, context, image_id, host):
|
||||
self._notify_cache_action(context, image_id, host, 'hit')
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_cache_miss(self, context, image_id, host):
|
||||
self._notify_cache_action(context, image_id, host, 'miss')
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_cache_eviction(self, context, image_id, host):
|
||||
self._notify_cache_action(context, image_id, host, 'evict')
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_cache_action(self, context, image_id, host, action):
|
||||
data = {
|
||||
'image_id': image_id,
|
||||
|
||||
+11
-3
@@ -38,6 +38,7 @@ import cinder.exception
|
||||
from cinder.i18n import _, _LE, _LI
|
||||
from cinder import objects
|
||||
from cinder.objects import base
|
||||
from cinder import utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -74,9 +75,15 @@ def init(conf):
|
||||
allowed_remote_exmods=exmods,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
|
||||
serializer = RequestContextSerializer(messaging.JsonPayloadSerializer())
|
||||
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer)
|
||||
# get_notification_transport has loaded oslo_messaging_notifications config
|
||||
# group, so we can now check if notifications are actually enabled.
|
||||
if conf.oslo_messaging_notifications.transport_url:
|
||||
json_serializer = messaging.JsonPayloadSerializer()
|
||||
serializer = RequestContextSerializer(json_serializer)
|
||||
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer)
|
||||
else:
|
||||
NOTIFIER = utils.DO_NOTHING
|
||||
|
||||
|
||||
def initialized():
|
||||
@@ -164,6 +171,7 @@ def get_server(target, endpoints, serializer=None):
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
assert NOTIFIER is not None
|
||||
if not publisher_id:
|
||||
|
||||
@@ -99,6 +99,7 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
LOG.error(_LE("Failed to run task %(name)s: %(cause)s"),
|
||||
{'cause': cause, 'name': self.name})
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_failure(self, context, request_spec, cause):
|
||||
"""When scheduling fails send out an event that it failed."""
|
||||
payload = {
|
||||
|
||||
@@ -51,6 +51,7 @@ from cinder import service
|
||||
from cinder.tests import fixtures as cinder_fixtures
|
||||
from cinder.tests.unit import conf_fixture
|
||||
from cinder.tests.unit import fake_notifier
|
||||
from cinder.tests.unit import utils as test_utils
|
||||
from cinder.volume import utils
|
||||
|
||||
|
||||
@@ -233,6 +234,8 @@ class TestCase(testtools.TestCase):
|
||||
coordination.COORDINATOR.start()
|
||||
self.addCleanup(coordination.COORDINATOR.stop)
|
||||
|
||||
test_utils.set_normal_rpc_notifier(self)
|
||||
|
||||
def _restore_obj_registry(self):
|
||||
objects_base.CinderObjectRegistry._registry._obj_classes = \
|
||||
self._base_test_obj_backup
|
||||
|
||||
@@ -28,8 +28,8 @@ FakeMessage = collections.namedtuple('Message',
|
||||
|
||||
class FakeNotifier(object):
|
||||
|
||||
def __init__(self, transport, publisher_id, serializer=None, driver=None,
|
||||
topic=None, retry=None):
|
||||
def __init__(self, transport, publisher_id=None, serializer=None,
|
||||
driver=None, topic=None, retry=None):
|
||||
self.transport = transport
|
||||
self.publisher_id = publisher_id
|
||||
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
|
||||
|
||||
@@ -83,3 +83,18 @@ class RPCAPITestCase(test.TestCase):
|
||||
|
||||
self.assertFalse(get_min_obj.called)
|
||||
self.assertFalse(get_min_rpc.called)
|
||||
|
||||
@mock.patch('oslo_messaging.JsonPayloadSerializer', wraps=True)
|
||||
def test_init_no_notifications(self, serializer_mock):
|
||||
self.override_config('transport_url', '',
|
||||
group='oslo_messaging_notifications')
|
||||
rpc.init(test.CONF)
|
||||
self.assertEqual(rpc.utils.DO_NOTHING, rpc.NOTIFIER)
|
||||
serializer_mock.assert_not_called()
|
||||
|
||||
@mock.patch.object(rpc, 'messaging')
|
||||
def test_init_notifications(self, messaging_mock):
|
||||
rpc.init(test.CONF)
|
||||
self.assertTrue(messaging_mock.JsonPayloadSerializer.called)
|
||||
self.assertTrue(messaging_mock.Notifier.called)
|
||||
self.assertEqual(rpc.NOTIFIER, messaging_mock.Notifier.return_value)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
@@ -1413,3 +1414,36 @@ class TestValidateInteger(test.TestCase):
|
||||
self.assertRaises(webob.exc.HTTPBadRequest,
|
||||
utils.validate_integer,
|
||||
value, 'limit', min_value=-1, max_value=(2 ** 31))
|
||||
|
||||
|
||||
class TestNotificationShortCircuit(test.TestCase):
|
||||
def test_do_nothing_getter(self):
|
||||
"""Test any attribute will always return the same instance (self)."""
|
||||
donothing = utils.DoNothing()
|
||||
self.assertIs(donothing, donothing.anyname)
|
||||
|
||||
def test_do_nothing_caller(self):
|
||||
"""Test calling the object will always return the same instance."""
|
||||
donothing = utils.DoNothing()
|
||||
self.assertIs(donothing, donothing())
|
||||
|
||||
def test_do_nothing_json_serializable(self):
|
||||
"""Test calling the object will always return the same instance."""
|
||||
donothing = utils.DoNothing()
|
||||
self.assertEqual('""', json.dumps(donothing))
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _decorated_method(self):
|
||||
return mock.sentinel.success
|
||||
|
||||
def test_if_notification_enabled_when_enabled(self):
|
||||
"""Test method is called when notifications are enabled."""
|
||||
result = self._decorated_method()
|
||||
self.assertEqual(mock.sentinel.success, result)
|
||||
|
||||
def test_if_notification_enabled_when_disabled(self):
|
||||
"""Test method is not called when notifications are disabled."""
|
||||
self.override_config('transport_url', '',
|
||||
group='oslo_messaging_notifications')
|
||||
result = self._decorated_method()
|
||||
self.assertEqual(utils.DO_NOTHING, result)
|
||||
|
||||
@@ -28,6 +28,7 @@ from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder import rpc
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
|
||||
|
||||
@@ -503,3 +504,13 @@ def create_populated_cluster(ctxt, num_services, num_down_svcs=0, **values):
|
||||
for i in range(num_services)
|
||||
]
|
||||
return cluster, svcs
|
||||
|
||||
|
||||
def set_normal_rpc_notifier(test_case):
|
||||
"""Instead of shortcircuiting notifications, user Oslo's notifier."""
|
||||
test_case.override_config('transport_url', 'fake_transport',
|
||||
group='oslo_messaging_notifications')
|
||||
json_serializer = rpc.messaging.JsonPayloadSerializer()
|
||||
serializer = rpc.RequestContextSerializer(json_serializer)
|
||||
rpc.NOTIFIER = rpc.messaging.Notifier(rpc.NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer)
|
||||
|
||||
@@ -1072,3 +1072,28 @@ def validate_dictionary_string_length(specs):
|
||||
def service_expired_time(with_timezone=False):
|
||||
return (timeutils.utcnow(with_timezone=with_timezone) -
|
||||
datetime.timedelta(seconds=CONF.service_down_time))
|
||||
|
||||
|
||||
class DoNothing(str):
|
||||
"""Class that literrally does nothing.
|
||||
|
||||
We inherit from str in case it's called with json.dumps.
|
||||
"""
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self
|
||||
|
||||
|
||||
DO_NOTHING = DoNothing()
|
||||
|
||||
|
||||
def if_notifications_enabled(f):
|
||||
"""Calls decorated method only if notifications are enabled."""
|
||||
@functools.wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
if CONF.oslo_messaging_notifications.transport_url:
|
||||
return f(*args, **kwargs)
|
||||
return DO_NOTHING
|
||||
return wrapped
|
||||
|
||||
@@ -123,6 +123,7 @@ def _usage_from_backup(backup, **kw):
|
||||
return usage_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_volume_usage(context, volume, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -137,6 +138,7 @@ def notify_about_volume_usage(context, volume, event_suffix,
|
||||
usage_info)
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_backup_usage(context, backup, event_suffix,
|
||||
extra_usage_info=None,
|
||||
host=None):
|
||||
@@ -183,6 +185,7 @@ def _usage_from_snapshot(snapshot, **extra_usage_info):
|
||||
return usage_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_snapshot_usage(context, snapshot, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -214,6 +217,7 @@ def _usage_from_capacity(capacity, **extra_usage_info):
|
||||
return capacity_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_capacity_usage(context, capacity, suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -229,6 +233,7 @@ def notify_about_capacity_usage(context, capacity, suffix,
|
||||
usage_info)
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_replication_usage(context, volume, suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -245,6 +250,7 @@ def notify_about_replication_usage(context, volume, suffix,
|
||||
usage_info)
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_replication_error(context, volume, suffix,
|
||||
extra_error_info=None, host=None):
|
||||
if not host:
|
||||
@@ -274,6 +280,7 @@ def _usage_from_consistencygroup(group_ref, **kw):
|
||||
return usage_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_consistencygroup_usage(context, group, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -305,6 +312,7 @@ def _usage_from_group(group_ref, **kw):
|
||||
return usage_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_group_usage(context, group, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -351,6 +359,7 @@ def _usage_from_group_snapshot(group_snapshot, **kw):
|
||||
return usage_info
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
@@ -368,6 +377,7 @@ def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix,
|
||||
usage_info)
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_group_snapshot_usage(context, group_snapshot, event_suffix,
|
||||
extra_usage_info=None, host=None):
|
||||
if not host:
|
||||
|
||||
@@ -31,6 +31,7 @@ from cinder import exception
|
||||
from cinder.i18n import _, _LE
|
||||
from cinder import quota
|
||||
from cinder import rpc
|
||||
from cinder import utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -186,6 +187,7 @@ def is_public_volume_type(context, volume_type_id):
|
||||
return volume_type['is_public']
|
||||
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def notify_about_volume_type_access_usage(context,
|
||||
volume_type_id,
|
||||
project_id,
|
||||
|
||||
Reference in New Issue
Block a user