Add event notifications for load balancers.

This patch creates tasks for load balancer notifications and adds them to the amphora loadbalancer create/delete/update flows.

Change-Id: I287d89cd83e91473f1375788c969521aa58ca567
This commit is contained in:
Spencer Harmon 2022-02-10 14:47:51 -06:00
parent 92038b24b6
commit 70257eb6b3
11 changed files with 385 additions and 28 deletions

View File

@ -0,0 +1,115 @@
===========================
Octavia Event Notifications
===========================
Octavia uses the oslo messaging notification system to send notifications for
certain events, such as "octavia.loadbalancer.create.end" after the completion
of a loadbalancer create operation.
Configuring oslo messaging for event notifications
==================================================
By default, the notifications driver in oslo_messaging is set to an empty
string; therefore, this option must be configured in order for notifications
to be sent. Valid options are defined in `oslo.messaging documentation
<https://docs.openstack.org/oslo.messaging/latest/configuration/opts.html#oslo-messaging-notifications>`__.
The example provided below is the format produced by the messagingv2 driver.
You may specify a custom list of topics on which to send notifications.
A topic is created for each notification level, with a dot and the level
appended to the value(s) specified in this list, e.g.: notifications.info,
octavia-notifications.info, etc..
Oslo messaging supports separate backends for RPC and notifications. If
different from the **[DEFAULT]** **transport_url** configuration, you
must specify the **transport_url** in the
**[oslo_messaging_notifications]** section of your *octavia.conf*
configuration.
.. code-block:: ini
[oslo_messaging_notifications]
driver = messagingv2
topics = octavia-notifications,notifications
transport_url = transport://user:pass@host1:port/virtual_host
Event Types
===========
Event types supported in Octavia are:
``'octavia.loadbalancer.update.end'``
``'octavia.loadbalancer.create.end'``
``'octavia.loadbalancer.delete.end'``
Example Notification
====================
The payload for an oslo.message notification for Octavia loadbalancer events
is the complete loadbalancer dict in json format.
The complete contents of an oslo.message notification for a loadbalancer
event in Octavia follows the format of the following example:
.. code-block:: json
{
"message_id": "d84a3800-06ca-410e-a1a3-b40a02306a97",
"publisher_id": null,
"event_type": "octavia.loadbalancer.create.end",
"priority": "INFO",
"payload": {
"enabled": true,
"availability_zone": null,
"created_at": "2022-04-22T23:02:14.000000",
"description": "",
"flavor_id": null,
"id": "8d4c8f66-7ac1-408e-82d5-59f6fcdea9ee",
"listeners": [],
"name": "my-octavia-loadbalancer",
"operating_status": "OFFLINE",
"pools": [],
"project_id": "qs59p6z696cp9cho8ze96edddvpfyvgz",
"provider": "amphora",
"provisioning_status": "PENDING_CREATE",
"tags": [],
"updated_at": null,
"vip": {
"ip_address": "192.168.100.2",
"network_id": "849b08a9-4397-4d6e-929d-90efc055ab8e",
"port_id": "303870a4-bbc3-428c-98dd-492f423869d9",
"qos_policy_id": null,
"subnet_id": "d59311ee-ed3a-42c0-ac97-cebf7945facc"
}
},
"timestamp": "2022-04-22 23:02:15.717375",
"_unique_id": "71f03f00c96342328f09dbd92fe0d398",
"_context_user": null,
"_context_tenant": "qs59p6z696cp9cho8ze96edddvpfyvgz",
"_context_system_scope": null,
"_context_project": "qs59p6z696cp9cho8ze96edddvpfyvgz",
"_context_domain": null,
"_context_user_domain": null,
"_context_project_domain": null,
"_context_is_admin": false,
"_context_read_only": false,
"_context_show_deleted": false,
"_context_auth_token": null,
"_context_request_id": "req-072bab53-1b9b-46fa-92b0-7f04305c31bf",
"_context_global_request_id": null,
"_context_resource_uuid": null,
"_context_roles": [],
"_context_user_identity": "- qs59p6z696cp9cho8ze96edddvpfyvgz - - -",
"_context_is_admin_project": true
}
Disabling Event Notifications
=============================
By default, event notifications are enabled (see configuring oslo messaging
section above for additional requirements). To disable this feature, use
the following setting in your Octavia configuration file:
.. code-block:: ini
[controller_worker]
event_notifications = False

View File

@ -55,6 +55,7 @@ Operator Reference
Octavia API Reference <https://docs.openstack.org/api-ref/load-balancer/>
../contributor/api/haproxy-amphora-api.rst
event-notifications.rst
.. only:: html

View File

@ -363,6 +363,11 @@
# amphora_delete_retries = 5
# amphora_delete_retry_interval = 5
# Change to False to disable octavia event notifications.
# See oslo_messaging_notifications section for additional
# requirements.
# event_notifications = True
[task_flow]
# TaskFlow engine options are:
# - serial: Runs all tasks on a single thread.
@ -696,6 +701,25 @@
# specified, we fall back to the same configuration used for RPC.
# transport_url =
[oslo_messaging_notifications]
# This section comes from openstack/oslo.messaging and is used for
# event notifications, e.g. octavia.loadbalancer.create.end.
# The default value for driver is None. Without setting this value,
# topics will not be created and notifications will not be sent.
# driver = messagingv2
# The value for topics is a list, and the default topic created is
# called notifications. Topics in this list will be created automatically
# by oslo.messaging. You can change the topic or add additional ones if
# needed
# topics = notifications,
# oslo.messaging supports separate transport for notifications. If you
# would like to use a different transport, you may set this option.
# By default, the same transport URL as rpc messages will be used.
# transport_url =
[driver_agent]
# status_socket_path = /var/run/octavia/status.sock
# stats_socket_path = /var/run/octavia/stats.sock

View File

@ -527,6 +527,7 @@ controller_worker_opts = [
help=_('Number of times an amphora delete should be retried.')),
cfg.IntOpt('amphora_delete_retry_interval', default=5,
help=_('Time, in seconds, between amphora delete retries.')),
cfg.BoolOpt('event_notifications', default=True),
]
task_flow_opts = [

View File

@ -10,26 +10,32 @@
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
def init():
global TRANSPORT
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
TRANSPORT = create_transport(get_transport_url())
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(cfg.CONF)
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT)
def cleanup():
global TRANSPORT
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
if TRANSPORT is not None:
TRANSPORT.cleanup()
TRANSPORT = None
if NOTIFICATION_TRANSPORT is not None:
NOTIFICATION_TRANSPORT.cleanup()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
def get_transport_url(url_str=None):
@ -38,8 +44,8 @@ def get_transport_url(url_str=None):
def get_client(target, version_cap=None, serializer=None,
call_monitor_timeout=None):
if TRANSPORT is None:
init()
assert TRANSPORT is not None, _("'TRANSPORT' must not be None")
return messaging.RPCClient(TRANSPORT,
target,
@ -51,8 +57,7 @@ def get_client(target, version_cap=None, serializer=None,
def get_server(target, endpoints, executor='threading',
access_policy=dispatcher.DefaultRPCAccessPolicy,
serializer=None):
if TRANSPORT is None:
init()
assert TRANSPORT is not None, _("'TRANSPORT' must not be None")
return messaging.get_rpc_server(TRANSPORT,
target,
@ -62,5 +67,11 @@ def get_server(target, endpoints, executor='threading',
access_policy=access_policy)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None, _("'NOTIFIER' must not be None")
return NOTIFIER.prepare()
def create_transport(url):
return messaging.get_rpc_transport(cfg.CONF, url=url)

View File

@ -31,6 +31,7 @@ from octavia.controller.worker.v2.tasks import compute_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import network_tasks
from octavia.controller.worker.v2.tasks import notification_tasks
from octavia.db import repositories as repo
CONF = cfg.CONF
@ -93,6 +94,13 @@ class LoadBalancerFlows(object):
if listeners:
lb_create_flow.add(*self._create_listeners_flow())
if CONF.controller_worker.event_notifications:
lb_create_flow.add(
notification_tasks.SendCreateNotification(
requires=constants.LOADBALANCER
)
)
return lb_create_flow
def _create_single_topology(self):
@ -309,6 +317,9 @@ class LoadBalancerFlows(object):
requires=constants.LOADBALANCER))
delete_LB_flow.add(database_tasks.DecrementLoadBalancerQuota(
requires=constants.PROJECT_ID))
if CONF.controller_worker.event_notifications:
delete_LB_flow.add(notification_tasks.SendDeleteNotification(
requires=constants.LOADBALANCER))
return delete_LB_flow
def get_cascade_delete_load_balancer_flow(self, lb, listeners, pools):
@ -336,6 +347,12 @@ class LoadBalancerFlows(object):
requires=[constants.LOADBALANCER, constants.UPDATE_DICT]))
update_LB_flow.add(database_tasks.MarkLBActiveInDB(
requires=constants.LOADBALANCER))
if CONF.controller_worker.event_notifications:
update_LB_flow.add(
notification_tasks.SendUpdateNotification(
requires=constants.LOADBALANCER
)
)
return update_LB_flow

View File

@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log as logging
from taskflow import task
from octavia.common import constants # noqa H306
from octavia.common import context
from octavia.common import rpc
LOG = logging.getLogger(__name__)
class BaseNotificationTask(task.Task):
event_type = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._rpc_notifier = rpc.get_notifier()
def execute(self, loadbalancer):
ctx = context.Context(project_id=loadbalancer[constants.PROJECT_ID])
LOG.debug(f"Sending rpc notification: {self.event_type} "
f"{loadbalancer[constants.LOADBALANCER_ID]}")
self._rpc_notifier.info(
ctx,
self.event_type,
loadbalancer
)
class SendUpdateNotification(BaseNotificationTask):
event_type = 'octavia.loadbalancer.update.end'
class SendCreateNotification(BaseNotificationTask):
event_type = 'octavia.loadbalancer.create.end'
class SendDeleteNotification(BaseNotificationTask):
event_type = 'octavia.loadbalancer.delete.end'

View File

@ -26,6 +26,10 @@ from octavia.controller.worker.v2.flows import load_balancer_flows
import octavia.tests.unit.base as base
class MockNOTIFIER(mock.MagicMock):
info = mock.MagicMock()
# NOTE: We patch the get_network_driver for all the calls so we don't
# inadvertently make real calls.
@mock.patch('octavia.common.utils.get_network_driver')
@ -40,7 +44,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.conf.config(group="nova", enable_anti_affinity=False)
self.LBFlow = load_balancer_flows.LoadBalancerFlows()
def test_get_create_load_balancer_flow(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_load_balancer_flow(self, mock_get_net_driver,
mock_notifier):
amp_flow = self.LBFlow.get_create_load_balancer_flow(
constants.TOPOLOGY_SINGLE)
self.assertIsInstance(amp_flow, flow.Flow)
@ -50,8 +57,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_active_standby_load_balancer_flow(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
amp_flow = self.LBFlow.get_create_load_balancer_flow(
constants.TOPOLOGY_ACTIVE_STANDBY)
self.assertIsInstance(amp_flow, flow.Flow)
@ -61,8 +70,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_anti_affinity_active_standby_load_balancer_flow(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
self.conf.config(group="nova", enable_anti_affinity=True)
self._LBFlow = load_balancer_flows.LoadBalancerFlows()
@ -77,13 +88,18 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.conf.config(group="nova", enable_anti_affinity=False)
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_bogus_topology_load_balancer_flow(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
self.assertRaises(exceptions.InvalidTopology,
self.LBFlow.get_create_load_balancer_flow,
'BOGUS')
def test_get_delete_load_balancer_flow(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_delete_load_balancer_flow(self, mock_get_net_driver,
mock_notifier):
lb_mock = mock.Mock()
listener_mock = mock.Mock()
listener_mock.id = '123'
@ -100,11 +116,14 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(0, len(lb_flow.provides))
self.assertEqual(3, len(lb_flow.requires))
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
@mock.patch('octavia.db.api.get_session', return_value=mock.MagicMock())
def test_get_delete_load_balancer_flow_cascade(self, mock_session,
mock_get_lb,
mock_get_net_driver):
mock_get_net_driver,
mock_notifier):
lb_mock = mock.Mock()
listener_mock = mock.Mock()
listener_mock.id = '123'
@ -141,7 +160,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(1, len(lb_flow.provides))
self.assertEqual(4, len(lb_flow.requires))
def test_get_update_load_balancer_flow(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_update_load_balancer_flow(self, mock_get_net_driver,
mock_notifier):
lb_flow = self.LBFlow.get_update_load_balancer_flow()
@ -153,7 +175,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(0, len(lb_flow.provides))
self.assertEqual(3, len(lb_flow.requires))
def test_get_post_lb_amp_association_flow(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_post_lb_amp_association_flow(self, mock_get_net_driver,
mock_notifier):
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
'123', constants.TOPOLOGY_SINGLE)
@ -200,8 +225,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(2, len(amp_flow.requires), amp_flow.requires)
self.assertEqual(4, len(amp_flow.provides), amp_flow.provides)
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_load_balancer_flows_single_listeners(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
create_flow = (
self.LBFlow.get_create_load_balancer_flow(
constants.TOPOLOGY_SINGLE, True
@ -229,8 +256,10 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(13, len(create_flow.provides),
create_flow.provides)
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_create_load_balancer_flows_active_standby_listeners(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
create_flow = (
self.LBFlow.get_create_load_balancer_flow(
constants.TOPOLOGY_ACTIVE_STANDBY, True
@ -299,10 +328,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(12, len(failover_flow.provides),
failover_flow.provides)
def test_get_failover_LB_flow_no_amps_single(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_no_amps_single(self, mock_get_net_driver,
mock_notifier):
self._test_get_failover_LB_flow_single([])
def test_get_failover_LB_flow_one_amp_single(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_one_amp_single(self, mock_get_net_driver,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_STANDALONE,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -310,8 +345,11 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_single([amphora_dict])
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_one_bogus_amp_single(self,
mock_get_net_driver):
mock_get_net_driver,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: 'bogus',
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -319,7 +357,10 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_single([amphora_dict])
def test_get_failover_LB_flow_two_amp_single(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_two_amp_single(self, mock_get_net_driver,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid()}
amphora2_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_STANDALONE,
@ -362,10 +403,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(12, len(failover_flow.provides),
failover_flow.provides)
def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver,
mock_notifier):
self._test_get_failover_LB_flow_no_amps_act_stdby([])
def test_get_failover_LB_flow_one_amps_act_stdby(self, amphorae):
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_one_amps_act_stdby(self, amphorae,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_MASTER,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -373,8 +420,11 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict])
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_two_amps_act_stdby(self,
mock_get_net_driver):
mock_get_net_driver,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_MASTER,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -389,8 +439,11 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict,
amphora2_dict])
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_three_amps_act_stdby(self,
mock_get_net_driver):
mock_get_net_driver,
mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_MASTER,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -409,8 +462,10 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby(
[amphora_dict, amphora2_dict, amphora3_dict])
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_two_amps_bogus_act_stdby(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: 'bogus',
constants.COMPUTE_ID: uuidutils.generate_uuid(),
@ -425,8 +480,10 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict,
amphora2_dict])
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
def test_get_failover_LB_flow_two_amps_standalone_act_stdby(
self, mock_get_net_driver):
self, mock_get_net_driver, mock_notifier):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_STANDALONE,
constants.COMPUTE_ID: uuidutils.generate_uuid(),

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
import octavia # noqa H306
from octavia.common import constants
from octavia.controller.worker.v2.tasks import notification_tasks
import octavia.tests.unit.base as base
class MockNOTIFIER(mock.MagicMock):
info = mock.MagicMock()
@mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER)
@mock.patch('octavia.common.context.Context', new_callable=mock.MagicMock)
@mock.patch('octavia.api.v2.types.load_balancer.LoadBalancerFullResponse.'
'from_data_model',
new_callable=mock.MagicMock)
class TestNotificationTasks(base.TestCase):
def test_update_notification_execute(self, *args):
noti = notification_tasks.SendUpdateNotification()
id = 1
lb = {constants.PROJECT_ID: id,
constants.LOADBALANCER_ID: id}
noti.execute(lb)
octavia.common.context.Context.assert_called_with(project_id=id)
call_args, call_kwargs = octavia.common.rpc.NOTIFIER.info.call_args
self.assertEqual('octavia.loadbalancer.update.end', call_args[1])
def test_create_notification(self, *args):
noti = notification_tasks.SendCreateNotification()
id = 2
lb = {constants.PROJECT_ID: id,
constants.LOADBALANCER_ID: id}
noti.execute(lb)
octavia.common.context.Context.assert_called_with(project_id=id)
call_args, call_kwargs = octavia.common.rpc.NOTIFIER.info.call_args
self.assertEqual('octavia.loadbalancer.create.end', call_args[1])
def test_delete_notification(self, *args):
noti = notification_tasks.SendDeleteNotification()
id = 3
lb = {constants.PROJECT_ID: id,
constants.LOADBALANCER_ID: id}
noti.execute(lb)
octavia.common.context.Context.assert_called_with(project_id=id)
call_args, call_kwargs = octavia.common.rpc.NOTIFIER.info.call_args
self.assertEqual('octavia.loadbalancer.delete.end', call_args[1])

View File

@ -0,0 +1,13 @@
---
features:
- |
Octavia now supports oslo.message notifications for loadbalancer create,
delete, and update operations.
upgrade:
- |
A new option is provided in the oslo_messaging namespace to disable
event_notifications.
other:
- |
Admin documentation page has been added to explain the available events,
the notification format, and how to disable event notifications.

View File

@ -23,6 +23,7 @@ from taskflow import engines
from octavia.api.drivers import utils
from octavia.common import constants
from octavia.common import rpc
from octavia.tests.common import data_model_helpers as dmh
@ -64,6 +65,12 @@ def generate(flow_list, output_directory):
get_flow_method(amp1, 2))
elif (current_tuple[1] == 'LoadBalancerFlows' and
current_tuple[2] == 'get_create_load_balancer_flow'):
class fake_notifier:
def prepare(self):
pass
rpc.NOTIFIER = fake_notifier()
rpc.TRANSPORT = "fake"
rpc.NOTIFICATION_TRANSPORT = "fake"
current_engine = engines.load(
get_flow_method(
constants.TOPOLOGY_ACTIVE_STANDBY))