From c6cee9207349a12e499cbc81fe0e5d4d5bfa015c Mon Sep 17 00:00:00 2001 From: Brian Haley <bhaley@redhat.com> Date: Tue, 7 Apr 2020 15:54:57 -0400 Subject: [PATCH] Spawn long-running processes in the driver agent The OVN Octavia provider driver in the OvnProviderHelper class caches these attributes: ovn_nbdb_api_for_events ovn_nb_idl_for_events ovn_nbdb_api to not re-create things each time OVN IDL that is used for handling events is called. We should be using the Octavia Driver Provider Agent framework instead to not have those long-running IDLs in the API process. This change: - Creates driver provider agent and registers its entry point - While setting up the driver agent instance, start IDL that will handle events - Stop caching ovn_nbdb_api, ovn_nb_idl_for_events and ovn_nbdb_api_for_events in the OvnProviderHelper class Change-Id: I0034a48997bd6b95e1b51bfcbd56e8372b35e62f Closes-bug: #1871355 --- devstack/plugin.sh | 1 + devstack/settings | 3 +- ovn_octavia_provider/agent.py | 40 ++++++ ovn_octavia_provider/common/config.py | 3 + ovn_octavia_provider/driver.py | 63 ++++----- .../tests/functional/test_driver.py | 127 +++++++++++------- .../tests/unit/test_driver.py | 24 ++-- setup.cfg | 4 +- 8 files changed, 169 insertions(+), 96 deletions(-) create mode 100644 ovn_octavia_provider/agent.py diff --git a/devstack/plugin.sh b/devstack/plugin.sh index d5b79910..11f06fec 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -13,6 +13,7 @@ OVN_NB_REMOTE=${OVN_NB_REMOTE:-$OVN_PROTO:$SERVICE_HOST:6641} function _configure_provider_driver { iniset ${OCTAVIA_CONF} api_settings enabled_provider_drivers ${OCTAVIA_PROVIDER_DRIVERS} + iniset ${OCTAVIA_CONF} driver_agent enabled_provider_agents ${OCTAVIA_PROVIDER_AGENTS} iniset ${OCTAVIA_CONF} ovn ovn_nb_connection "$OVN_NB_REMOTE" if is_service_enabled tls-proxy; then diff --git a/devstack/settings b/devstack/settings index f88898f8..ee3a7f56 100644 --- a/devstack/settings +++ b/devstack/settings @@ -1,5 +1,6 @@ OCTAVIA_DIR=${OCTAVIA_DIR:-"${DEST}/octavia"} OCTAVIA_CONF_DIR=${OCTAVIA_CONF_DIR:-"/etc/octavia"} -OCTAVIA_PROVIDER_DRIVERS=${OCTAVIA_PROVIDER_DRIVERS:-"amphora:'The Octavia Amphora driver.',ovn:'Octavia OVN driver.'"} +OCTAVIA_PROVIDER_DRIVERS=${OCTAVIA_PROVIDER_DRIVERS:-"amphora:Amphora,ovn:OVN"} +OCTAVIA_PROVIDER_AGENTS=${OCTAVIA_PROVIDER_AGENTS:-"ovn"} OVN_OCTAVIA_PROVIDER_DIR=$DEST/ovn-octavia-provider diff --git a/ovn_octavia_provider/agent.py b/ovn_octavia_provider/agent.py new file mode 100644 index 00000000..9c7a9de1 --- /dev/null +++ b/ovn_octavia_provider/agent.py @@ -0,0 +1,40 @@ +# Copyright 2020 Red Hat, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from ovn_octavia_provider import driver + +LOG = logging.getLogger(__name__) + +OVN_EVENT_LOCK_NAME = "neutron_ovn_octavia_event_lock" + + +def OvnProviderAgent(exit_event): + + helper = driver.OvnProviderHelper() + events = [driver.LogicalRouterPortEvent(helper), + driver.LogicalSwitchPortUpdateEvent(helper)] + + # NOTE(mjozefcz): This API is only for handling OVSDB events! + ovn_nb_idl_for_events = driver.OvnNbIdlForLb( + event_lock_name=OVN_EVENT_LOCK_NAME) + ovn_nb_idl_for_events.notify_handler.watch_events(events) + ovn_nb_idl_for_events.start() + + LOG.info('OVN provider agent has started.') + exit_event.wait() + LOG.info('OVN provider agent is exiting.') + ovn_nb_idl_for_events.notify_handler.unwatch_events(events) + ovn_nb_idl_for_events.stop() diff --git a/ovn_octavia_provider/common/config.py b/ovn_octavia_provider/common/config.py index d9e42fef..0109e1ca 100644 --- a/ovn_octavia_provider/common/config.py +++ b/ovn_octavia_provider/common/config.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +from keystoneauth1 import loading as ks_loading from oslo_config import cfg from oslo_log import log as logging @@ -58,6 +59,8 @@ ovn_opts = [ ] cfg.CONF.register_opts(ovn_opts, group='ovn') +ks_loading.register_auth_conf_options(cfg.CONF, 'service_auth') +ks_loading.register_session_conf_options(cfg.CONF, 'service_auth') def list_opts(): diff --git a/ovn_octavia_provider/driver.py b/ovn_octavia_provider/driver.py index 170357dc..eda20ab0 100644 --- a/ovn_octavia_provider/driver.py +++ b/ovn_octavia_provider/driver.py @@ -79,7 +79,6 @@ OVN_NATIVE_LB_PROTOCOLS = [constants.PROTOCOL_TCP, constants.PROTOCOL_UDP, ] OVN_NATIVE_LB_ALGORITHMS = [constants.LB_ALGORITHM_SOURCE_IP_PORT, ] EXCEPTION_MSG = "Exception occurred during %s" -OVN_EVENT_LOCK_NAME = "neutron_ovn_octavia_event_lock" class IPVersionsMixingNotSupportedError( @@ -109,8 +108,6 @@ def get_neutron_client(): class LogicalRouterPortEvent(row_event.RowEvent): - driver = None - def __init__(self, driver): table = 'Logical_Router_Port' events = (self.ROW_CREATE, self.ROW_DELETE) @@ -124,7 +121,7 @@ class LogicalRouterPortEvent(row_event.RowEvent): '%(event)s, %(row)s', {'event': event, 'row': row}) - if not self.driver or row.gateway_chassis: + if row.gateway_chassis: return if event == self.ROW_CREATE: self.driver.lb_create_lrp_assoc_handler(row) @@ -134,8 +131,6 @@ class LogicalRouterPortEvent(row_event.RowEvent): class LogicalSwitchPortUpdateEvent(row_event.RowEvent): - driver = None - def __init__(self, driver): table = 'Logical_Switch_Port' events = (self.ROW_UPDATE,) @@ -145,11 +140,15 @@ class LogicalSwitchPortUpdateEvent(row_event.RowEvent): self.driver = driver def run(self, event, row, old): + LOG.debug('LogicalSwitchPortUpdateEvent logged, ' + '%(event)s, %(row)s', + {'event': event, + 'row': row}) # Get the neutron:port_name from external_ids and check if # it's a vip port or not. port_name = row.external_ids.get( ovn_const.OVN_PORT_NAME_EXT_ID_KEY, '') - if self.driver and port_name.startswith(ovn_const.LB_VIP_PORT_PREFIX): + if port_name.startswith(ovn_const.LB_VIP_PORT_PREFIX): # Handle port update only for vip ports created by # this driver. self.driver.vip_port_update_handler(row) @@ -199,21 +198,19 @@ class OvnNbIdlForLb(ovsdb_monitor.OvnIdl): class OvnProviderHelper(object): - ovn_nbdb_api_for_events = None - ovn_nb_idl_for_events = None - ovn_nbdb_api = None - def __init__(self): self.requests = queue.Queue() self.helper_thread = threading.Thread(target=self.request_handler) self.helper_thread.daemon = True - atexit.register(self.shutdown) self._octavia_driver_lib = o_driver_lib.DriverLibrary() self._check_and_set_ssl_files() self._init_lb_actions() - self.events = [LogicalRouterPortEvent(self), - LogicalSwitchPortUpdateEvent(self)] - self.start() + + # NOTE(mjozefcz): This API is only for handling octavia API requests. + self.ovn_nbdb = OvnNbIdlForLb() + self.ovn_nbdb_api = self.ovn_nbdb.start() + + self.helper_thread.start() def _init_lb_actions(self): self._lb_request_func_maps = { @@ -253,8 +250,6 @@ class OvnProviderHelper(object): def _check_and_set_ssl_files(self): # TODO(reedip): Make ovsdb_monitor's _check_and_set_ssl_files() public # This is a copy of ovsdb_monitor._check_and_set_ssl_files - if OvnProviderHelper.ovn_nbdb_api: - return priv_key_file = ovn_conf.get_ovn_nb_private_key() cert_file = ovn_conf.get_ovn_nb_certificate() ca_cert_file = ovn_conf.get_ovn_nb_ca_cert() @@ -267,25 +262,10 @@ class OvnProviderHelper(object): if ca_cert_file: Stream.ssl_set_ca_cert_file(ca_cert_file) - def start(self): - # NOTE(mjozefcz): This API is only for handling octavia API requests. - if not OvnProviderHelper.ovn_nbdb_api: - OvnProviderHelper.ovn_nbdb_api = OvnNbIdlForLb().start() - - # NOTE(mjozefcz): This API is only for handling OVSDB events! - if not OvnProviderHelper.ovn_nbdb_api_for_events: - OvnProviderHelper.ovn_nb_idl_for_events = OvnNbIdlForLb( - event_lock_name=OVN_EVENT_LOCK_NAME) - (OvnProviderHelper.ovn_nb_idl_for_events.notify_handler. - watch_events(self.events)) - OvnProviderHelper.ovn_nbdb_api_for_events = ( - OvnProviderHelper.ovn_nb_idl_for_events.start()) - self.helper_thread.start() - def shutdown(self): self.requests.put({'type': REQ_TYPE_EXIT}) self.helper_thread.join() - self.ovn_nb_idl_for_events.notify_handler.unwatch_events(self.events) + self.ovn_nbdb.stop() @staticmethod def _map_val(row, col, key): @@ -416,7 +396,7 @@ class OvnProviderHelper(object): port_name = vip_lp.external_ids.get(ovn_const.OVN_PORT_NAME_EXT_ID_KEY) lb_id = port_name[len(ovn_const.LB_VIP_PORT_PREFIX):] try: - ovn_lbs = self._find_ovn_lbs(lb_id) + ovn_lbs = self._find_ovn_lbs_with_retry(lb_id) except idlutils.RowNotFound: LOG.debug("Loadbalancer %s not found!", lb_id) return @@ -492,6 +472,14 @@ class OvnProviderHelper(object): LOG.error(msg) raise driver_exceptions.UpdateStatusError(msg) + @tenacity.retry( + retry=tenacity.retry_if_exception_type(idlutils.RowNotFound), + wait=tenacity.wait_exponential(), + stop=tenacity.stop_after_delay(10), + reraise=True) + def _find_ovn_lbs_with_retry(self, lb_id, protocol=None): + return self._find_ovn_lbs(lb_id, protocol=protocol) + def _find_ovn_lbs(self, lb_id, protocol=None): """Find the Loadbalancers in OVN with the given lb_id as its name @@ -1927,12 +1915,13 @@ class OvnProviderHelper(object): class OvnProviderDriver(driver_base.ProviderDriver): - _ovn_helper = None def __init__(self): super(OvnProviderDriver, self).__init__() - if not OvnProviderDriver._ovn_helper: - OvnProviderDriver._ovn_helper = OvnProviderHelper() + self._ovn_helper = OvnProviderHelper() + + def __del__(self): + self._ovn_helper.shutdown() def _check_for_supported_protocols(self, protocol): if protocol not in OVN_NATIVE_LB_PROTOCOLS: diff --git a/ovn_octavia_provider/tests/functional/test_driver.py b/ovn_octavia_provider/tests/functional/test_driver.py index 89c9f9da..ac6891d3 100644 --- a/ovn_octavia_provider/tests/functional/test_driver.py +++ b/ovn_octavia_provider/tests/functional/test_driver.py @@ -13,7 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. +import atexit import copy +import multiprocessing as mp from unittest import mock from neutron.common import utils as n_utils @@ -29,25 +31,20 @@ from ovsdbapp.schema.ovn_northbound import impl_idl as idl_ovn # NOTE(mjozefcz): We need base neutron functionals because we need # mechanism driver and l3 plugin. from neutron.tests.functional import base +from ovn_octavia_provider import agent as ovn_agent from ovn_octavia_provider.common import constants as ovn_const from ovn_octavia_provider import driver as ovn_driver LR_REF_KEY_HEADER = 'neutron-' -class TestOctaviaOvnProviderDriver( +class TestOvnOctaviaBase( base.TestOVNFunctionalBase, base.BaseLoggingTestCase): def setUp(self): - super(TestOctaviaOvnProviderDriver, self).setUp() - # ovn_driver.OvnProviderHelper.ovn_nbdb_api is a class variable. - # Set it to None, so that when a worker starts the 2nd test we don't - # use the old object. + super(TestOvnOctaviaBase, self).setUp() idl_ovn.OvnNbApiIdlImpl.ovsdb_connection = None - ovn_driver.OvnProviderHelper.ovn_nbdb_api = None - ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None - ovn_driver.OvnProviderDriver._ovn_helper = None # TODO(mjozefcz): Use octavia listeners to provide needed # sockets and modify tests in order to verify if fake # listener (status) has received valid value. @@ -70,7 +67,6 @@ class TestOctaviaOvnProviderDriver( self.fake_neutron_client.delete_port.return_value = True self._local_net_cache = {} self._local_port_cache = {'ports': []} - self.addCleanup(self.ovn_driver._ovn_helper.shutdown) self.core_plugin = directory.get_plugin() def _mock_show_subnet(self, subnet_id): @@ -871,6 +867,10 @@ class TestOctaviaOvnProviderDriver( self._wait_for_status_and_validate(lb_data, [expected_status]) + +class TestOvnOctaviaProviderDriver( + TestOvnOctaviaBase): + def test_loadbalancer(self): lb_data = self._create_load_balancer_and_validate( {'vip_network': 'vip_network', @@ -1126,6 +1126,66 @@ class TestOctaviaOvnProviderDriver( self._create_listener_and_validate(lb_data) self._delete_load_balancer_and_validate(lb_data) + def test_lb_listener_pool_workflow(self): + lb_data = self._create_load_balancer_and_validate( + {'vip_network': 'vip_network', + 'cidr': '10.0.0.0/24'}) + self._create_listener_and_validate(lb_data) + self._create_pool_and_validate( + lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id) + self._delete_pool_and_validate( + lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id) + self._delete_listener_and_validate(lb_data) + self._delete_load_balancer_and_validate(lb_data) + + def test_lb_member_batch_update(self): + # Create a LoadBalancer + lb_data = self._create_load_balancer_and_validate( + {'vip_network': 'vip_network', + 'cidr': '10.0.0.0/24'}) + # Create a pool + self._create_pool_and_validate(lb_data, "p1") + pool_id = lb_data['pools'][0].pool_id + # Create Member-1 and associate it with lb_data + self._create_member_and_validate( + lb_data, pool_id, lb_data['vip_net_info'][1], + lb_data['vip_net_info'][0], '10.0.0.10') + # Create Member-2 + m_member = self._create_member_model(pool_id, + lb_data['vip_net_info'][1], + '10.0.0.12') + # Update ovn's Logical switch reference + self._update_ls_refs(lb_data, lb_data['vip_net_info'][0]) + lb_data['pools'][0].members.append(m_member) + # Add a new member to the LB + members = [m_member] + [lb_data['pools'][0].members[0]] + self._update_members_in_batch_and_validate(lb_data, pool_id, members) + # Deleting one member, while keeping the other member available + self._update_members_in_batch_and_validate(lb_data, pool_id, + [m_member]) + self._delete_load_balancer_and_validate(lb_data) + + +class TestOvnOctaviaProviderAgent(TestOvnOctaviaBase): + + def setUp(self): + super(TestOvnOctaviaProviderAgent, self).setUp() + self._initialize_ovn_da() + + def _initialize_ovn_da(self): + # NOTE(mjozefcz): In theory this is separate process + # with IDL running, but to make it easier for now + # we can initialize this IDL here instead spawning + # another process. + da_helper = ovn_driver.OvnProviderHelper() + events = [ovn_driver.LogicalRouterPortEvent(da_helper), + ovn_driver.LogicalSwitchPortUpdateEvent(da_helper)] + ovn_nb_idl_for_events = ovn_driver.OvnNbIdlForLb( + event_lock_name='func_test') + ovn_nb_idl_for_events.notify_handler.watch_events(events) + ovn_nb_idl_for_events.start() + atexit.register(da_helper.shutdown) + def _test_lrp_event_handler(self, cascade=False): # Create Network N1 on router R1 and LBA on N1 lba_data = self._create_load_balancer_and_validate( @@ -1241,45 +1301,6 @@ class TestOctaviaOvnProviderDriver( LR_REF_KEY_HEADER + provider_net['network']['id']), timeout=10) - def test_lb_listener_pool_workflow(self): - lb_data = self._create_load_balancer_and_validate( - {'vip_network': 'vip_network', - 'cidr': '10.0.0.0/24'}) - self._create_listener_and_validate(lb_data) - self._create_pool_and_validate( - lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id) - self._delete_pool_and_validate( - lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id) - self._delete_listener_and_validate(lb_data) - self._delete_load_balancer_and_validate(lb_data) - - def test_lb_member_batch_update(self): - # Create a LoadBalancer - lb_data = self._create_load_balancer_and_validate( - {'vip_network': 'vip_network', - 'cidr': '10.0.0.0/24'}) - # Create a pool - self._create_pool_and_validate(lb_data, "p1") - pool_id = lb_data['pools'][0].pool_id - # Create Member-1 and associate it with lb_data - self._create_member_and_validate( - lb_data, pool_id, lb_data['vip_net_info'][1], - lb_data['vip_net_info'][0], '10.0.0.10') - # Create Member-2 - m_member = self._create_member_model(pool_id, - lb_data['vip_net_info'][1], - '10.0.0.12') - # Update ovn's Logical switch reference - self._update_ls_refs(lb_data, lb_data['vip_net_info'][0]) - lb_data['pools'][0].members.append(m_member) - # Add a new member to the LB - members = [m_member] + [lb_data['pools'][0].members[0]] - self._update_members_in_batch_and_validate(lb_data, pool_id, members) - # Deleting one member, while keeping the other member available - self._update_members_in_batch_and_validate(lb_data, pool_id, - [m_member]) - self._delete_load_balancer_and_validate(lb_data) - def test_fip_on_lb_vip(self): """This test checks if FIP on LB VIP is configured. @@ -1349,3 +1370,13 @@ class TestOctaviaOvnProviderDriver( elif ls.name == provider_net: # Make sure that LB1 is not added to provider net - e1 LS self.assertListEqual([], ls.load_balancer) + + def test_agent_exit(self): + exit_event = mp.Event() + agent = mp.Process(target=ovn_agent.OvnProviderAgent, + args=[exit_event]) + agent.start() + self.assertTrue(agent.is_alive()) + exit_event.set() + agent.join() + self.assertFalse(agent.is_alive()) diff --git a/ovn_octavia_provider/tests/unit/test_driver.py b/ovn_octavia_provider/tests/unit/test_driver.py index f1432819..a80a65da 100644 --- a/ovn_octavia_provider/tests/unit/test_driver.py +++ b/ovn_octavia_provider/tests/unit/test_driver.py @@ -25,6 +25,7 @@ from oslo_utils import uuidutils from ovs.db import idl as ovs_idl from ovsdbapp.backend.ovs_idl import idlutils +from ovn_octavia_provider import agent as ovn_agent from ovn_octavia_provider.common import constants as ovn_const from ovn_octavia_provider import driver as ovn_driver from ovn_octavia_provider.tests.unit import fakes @@ -109,8 +110,8 @@ class TestOvnOctaviaBase(base.BaseTestCase): self.vip_network_id = uuidutils.generate_uuid() self.vip_port_id = uuidutils.generate_uuid() self.vip_subnet_id = uuidutils.generate_uuid() - mock.patch( - "ovn_octavia_provider.driver.OvnNbIdlForLb").start() + ovn_nb_idl = mock.patch("ovn_octavia_provider.driver.OvnNbIdlForLb") + self.mock_ovn_nb_idl = ovn_nb_idl.start() self.member_address = "192.168.2.149" self.vip_address = '192.148.210.109' self.vip_dict = {'vip_network_id': uuidutils.generate_uuid(), @@ -2551,7 +2552,7 @@ class TestOvnProviderHelper(TestOvnOctaviaBase): @mock.patch('ovn_octavia_provider.driver.OvnProviderHelper.' '_find_ovn_lbs') def test_vip_port_update_handler_lb_not_found(self, lb): - lb.side_effect = [idlutils.RowNotFound] + lb.side_effect = [idlutils.RowNotFound for _ in range(5)] self.switch_port_event = ovn_driver.LogicalSwitchPortUpdateEvent( self.helper) port_name = '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX, 'foo') @@ -2772,8 +2773,7 @@ class TestOvnProviderHelper(TestOvnOctaviaBase): self._test_handle_member_dvr_lb_fip( net_cli, action=ovn_driver.REQ_INFO_MEMBER_DELETED) - @mock.patch.object(ovn_driver, 'atexit') - def test_ovsdb_connections(self, mock_atexit): + def test_ovsdb_connections(self): ovn_driver.OvnProviderHelper.ovn_nbdb_api = None ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None prov_helper1 = ovn_driver.OvnProviderHelper() @@ -2786,10 +2786,6 @@ class TestOvnProviderHelper(TestOvnOctaviaBase): prov_helper2.ovn_nbdb_api_for_events) prov_helper2.shutdown() prov_helper1.shutdown() - # Assert at_exit calls - mock_atexit.assert_has_calls([ - mock.call.register(prov_helper1.shutdown), - mock.call.register(prov_helper2.shutdown)]) def test_create_vip_port_vip_selected(self): expected_dict = { @@ -2945,3 +2941,13 @@ class TestOvnProviderHelper(TestOvnOctaviaBase): fol.return_value = None ret = self.helper.check_lb_protocol(self.listener_id, 'TCP') self.assertFalse(ret) + + +class TestOvnProviderAgent(TestOvnOctaviaBase): + + def test_exit(self): + mock_exit_event = mock.MagicMock() + mock_exit_event.is_set.side_effect = [False, False, False, False, True] + ovn_agent.OvnProviderAgent(mock_exit_event) + self.assertEqual(1, mock_exit_event.wait.call_count) + self.assertEqual(2, self.mock_ovn_nb_idl.call_count) diff --git a/setup.cfg b/setup.cfg index f2604c5e..32588cb7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,10 +26,12 @@ packages = setup-hooks = pbr.hooks.setup_hook - [entry_points] octavia.api.drivers = ovn = ovn_octavia_provider.driver:OvnProviderDriver +octavia.driver_agent.provider_agents = + ovn = ovn_octavia_provider.agent:OvnProviderAgent + oslo.config.opts = octavia.api.drivers.ovn = ovn_octavia_provider.common.config:list_opts