Spawn long-running processes in the driver agent

The OVN Octavia provider driver in the OvnProviderHelper
class caches these attributes:

    ovn_nbdb_api_for_events
    ovn_nb_idl_for_events
    ovn_nbdb_api

to not re-create things each time OVN IDL that is used for
handling events is called.

We should be using the Octavia Driver Provider Agent framework
instead to not have those long-running IDLs in the API process.

This change:
- Creates driver provider agent and registers its entry point
- While setting up the driver agent instance, start IDL that
  will handle events
- Stop caching ovn_nbdb_api, ovn_nb_idl_for_events and
  ovn_nbdb_api_for_events in the OvnProviderHelper class

Change-Id: I0034a48997bd6b95e1b51bfcbd56e8372b35e62f
Closes-bug: #1871355
This commit is contained in:
Brian Haley 2020-04-07 15:54:57 -04:00
parent 96c83ea224
commit c6cee92073
8 changed files with 169 additions and 96 deletions

View File

@ -13,6 +13,7 @@ OVN_NB_REMOTE=${OVN_NB_REMOTE:-$OVN_PROTO:$SERVICE_HOST:6641}
function _configure_provider_driver {
iniset ${OCTAVIA_CONF} api_settings enabled_provider_drivers ${OCTAVIA_PROVIDER_DRIVERS}
iniset ${OCTAVIA_CONF} driver_agent enabled_provider_agents ${OCTAVIA_PROVIDER_AGENTS}
iniset ${OCTAVIA_CONF} ovn ovn_nb_connection "$OVN_NB_REMOTE"
if is_service_enabled tls-proxy; then

View File

@ -1,5 +1,6 @@
OCTAVIA_DIR=${OCTAVIA_DIR:-"${DEST}/octavia"}
OCTAVIA_CONF_DIR=${OCTAVIA_CONF_DIR:-"/etc/octavia"}
OCTAVIA_PROVIDER_DRIVERS=${OCTAVIA_PROVIDER_DRIVERS:-"amphora:'The Octavia Amphora driver.',ovn:'Octavia OVN driver.'"}
OCTAVIA_PROVIDER_DRIVERS=${OCTAVIA_PROVIDER_DRIVERS:-"amphora:Amphora,ovn:OVN"}
OCTAVIA_PROVIDER_AGENTS=${OCTAVIA_PROVIDER_AGENTS:-"ovn"}
OVN_OCTAVIA_PROVIDER_DIR=$DEST/ovn-octavia-provider

View File

@ -0,0 +1,40 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from ovn_octavia_provider import driver
LOG = logging.getLogger(__name__)
OVN_EVENT_LOCK_NAME = "neutron_ovn_octavia_event_lock"
def OvnProviderAgent(exit_event):
helper = driver.OvnProviderHelper()
events = [driver.LogicalRouterPortEvent(helper),
driver.LogicalSwitchPortUpdateEvent(helper)]
# NOTE(mjozefcz): This API is only for handling OVSDB events!
ovn_nb_idl_for_events = driver.OvnNbIdlForLb(
event_lock_name=OVN_EVENT_LOCK_NAME)
ovn_nb_idl_for_events.notify_handler.watch_events(events)
ovn_nb_idl_for_events.start()
LOG.info('OVN provider agent has started.')
exit_event.wait()
LOG.info('OVN provider agent is exiting.')
ovn_nb_idl_for_events.notify_handler.unwatch_events(events)
ovn_nb_idl_for_events.stop()

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import loading as ks_loading
from oslo_config import cfg
from oslo_log import log as logging
@ -58,6 +59,8 @@ ovn_opts = [
]
cfg.CONF.register_opts(ovn_opts, group='ovn')
ks_loading.register_auth_conf_options(cfg.CONF, 'service_auth')
ks_loading.register_session_conf_options(cfg.CONF, 'service_auth')
def list_opts():

View File

@ -79,7 +79,6 @@ OVN_NATIVE_LB_PROTOCOLS = [constants.PROTOCOL_TCP,
constants.PROTOCOL_UDP, ]
OVN_NATIVE_LB_ALGORITHMS = [constants.LB_ALGORITHM_SOURCE_IP_PORT, ]
EXCEPTION_MSG = "Exception occurred during %s"
OVN_EVENT_LOCK_NAME = "neutron_ovn_octavia_event_lock"
class IPVersionsMixingNotSupportedError(
@ -109,8 +108,6 @@ def get_neutron_client():
class LogicalRouterPortEvent(row_event.RowEvent):
driver = None
def __init__(self, driver):
table = 'Logical_Router_Port'
events = (self.ROW_CREATE, self.ROW_DELETE)
@ -124,7 +121,7 @@ class LogicalRouterPortEvent(row_event.RowEvent):
'%(event)s, %(row)s',
{'event': event,
'row': row})
if not self.driver or row.gateway_chassis:
if row.gateway_chassis:
return
if event == self.ROW_CREATE:
self.driver.lb_create_lrp_assoc_handler(row)
@ -134,8 +131,6 @@ class LogicalRouterPortEvent(row_event.RowEvent):
class LogicalSwitchPortUpdateEvent(row_event.RowEvent):
driver = None
def __init__(self, driver):
table = 'Logical_Switch_Port'
events = (self.ROW_UPDATE,)
@ -145,11 +140,15 @@ class LogicalSwitchPortUpdateEvent(row_event.RowEvent):
self.driver = driver
def run(self, event, row, old):
LOG.debug('LogicalSwitchPortUpdateEvent logged, '
'%(event)s, %(row)s',
{'event': event,
'row': row})
# Get the neutron:port_name from external_ids and check if
# it's a vip port or not.
port_name = row.external_ids.get(
ovn_const.OVN_PORT_NAME_EXT_ID_KEY, '')
if self.driver and port_name.startswith(ovn_const.LB_VIP_PORT_PREFIX):
if port_name.startswith(ovn_const.LB_VIP_PORT_PREFIX):
# Handle port update only for vip ports created by
# this driver.
self.driver.vip_port_update_handler(row)
@ -199,21 +198,19 @@ class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
class OvnProviderHelper(object):
ovn_nbdb_api_for_events = None
ovn_nb_idl_for_events = None
ovn_nbdb_api = None
def __init__(self):
self.requests = queue.Queue()
self.helper_thread = threading.Thread(target=self.request_handler)
self.helper_thread.daemon = True
atexit.register(self.shutdown)
self._octavia_driver_lib = o_driver_lib.DriverLibrary()
self._check_and_set_ssl_files()
self._init_lb_actions()
self.events = [LogicalRouterPortEvent(self),
LogicalSwitchPortUpdateEvent(self)]
self.start()
# NOTE(mjozefcz): This API is only for handling octavia API requests.
self.ovn_nbdb = OvnNbIdlForLb()
self.ovn_nbdb_api = self.ovn_nbdb.start()
self.helper_thread.start()
def _init_lb_actions(self):
self._lb_request_func_maps = {
@ -253,8 +250,6 @@ class OvnProviderHelper(object):
def _check_and_set_ssl_files(self):
# TODO(reedip): Make ovsdb_monitor's _check_and_set_ssl_files() public
# This is a copy of ovsdb_monitor._check_and_set_ssl_files
if OvnProviderHelper.ovn_nbdb_api:
return
priv_key_file = ovn_conf.get_ovn_nb_private_key()
cert_file = ovn_conf.get_ovn_nb_certificate()
ca_cert_file = ovn_conf.get_ovn_nb_ca_cert()
@ -267,25 +262,10 @@ class OvnProviderHelper(object):
if ca_cert_file:
Stream.ssl_set_ca_cert_file(ca_cert_file)
def start(self):
# NOTE(mjozefcz): This API is only for handling octavia API requests.
if not OvnProviderHelper.ovn_nbdb_api:
OvnProviderHelper.ovn_nbdb_api = OvnNbIdlForLb().start()
# NOTE(mjozefcz): This API is only for handling OVSDB events!
if not OvnProviderHelper.ovn_nbdb_api_for_events:
OvnProviderHelper.ovn_nb_idl_for_events = OvnNbIdlForLb(
event_lock_name=OVN_EVENT_LOCK_NAME)
(OvnProviderHelper.ovn_nb_idl_for_events.notify_handler.
watch_events(self.events))
OvnProviderHelper.ovn_nbdb_api_for_events = (
OvnProviderHelper.ovn_nb_idl_for_events.start())
self.helper_thread.start()
def shutdown(self):
self.requests.put({'type': REQ_TYPE_EXIT})
self.helper_thread.join()
self.ovn_nb_idl_for_events.notify_handler.unwatch_events(self.events)
self.ovn_nbdb.stop()
@staticmethod
def _map_val(row, col, key):
@ -416,7 +396,7 @@ class OvnProviderHelper(object):
port_name = vip_lp.external_ids.get(ovn_const.OVN_PORT_NAME_EXT_ID_KEY)
lb_id = port_name[len(ovn_const.LB_VIP_PORT_PREFIX):]
try:
ovn_lbs = self._find_ovn_lbs(lb_id)
ovn_lbs = self._find_ovn_lbs_with_retry(lb_id)
except idlutils.RowNotFound:
LOG.debug("Loadbalancer %s not found!", lb_id)
return
@ -492,6 +472,14 @@ class OvnProviderHelper(object):
LOG.error(msg)
raise driver_exceptions.UpdateStatusError(msg)
@tenacity.retry(
retry=tenacity.retry_if_exception_type(idlutils.RowNotFound),
wait=tenacity.wait_exponential(),
stop=tenacity.stop_after_delay(10),
reraise=True)
def _find_ovn_lbs_with_retry(self, lb_id, protocol=None):
return self._find_ovn_lbs(lb_id, protocol=protocol)
def _find_ovn_lbs(self, lb_id, protocol=None):
"""Find the Loadbalancers in OVN with the given lb_id as its name
@ -1927,12 +1915,13 @@ class OvnProviderHelper(object):
class OvnProviderDriver(driver_base.ProviderDriver):
_ovn_helper = None
def __init__(self):
super(OvnProviderDriver, self).__init__()
if not OvnProviderDriver._ovn_helper:
OvnProviderDriver._ovn_helper = OvnProviderHelper()
self._ovn_helper = OvnProviderHelper()
def __del__(self):
self._ovn_helper.shutdown()
def _check_for_supported_protocols(self, protocol):
if protocol not in OVN_NATIVE_LB_PROTOCOLS:

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import copy
import multiprocessing as mp
from unittest import mock
from neutron.common import utils as n_utils
@ -29,25 +31,20 @@ from ovsdbapp.schema.ovn_northbound import impl_idl as idl_ovn
# NOTE(mjozefcz): We need base neutron functionals because we need
# mechanism driver and l3 plugin.
from neutron.tests.functional import base
from ovn_octavia_provider import agent as ovn_agent
from ovn_octavia_provider.common import constants as ovn_const
from ovn_octavia_provider import driver as ovn_driver
LR_REF_KEY_HEADER = 'neutron-'
class TestOctaviaOvnProviderDriver(
class TestOvnOctaviaBase(
base.TestOVNFunctionalBase,
base.BaseLoggingTestCase):
def setUp(self):
super(TestOctaviaOvnProviderDriver, self).setUp()
# ovn_driver.OvnProviderHelper.ovn_nbdb_api is a class variable.
# Set it to None, so that when a worker starts the 2nd test we don't
# use the old object.
super(TestOvnOctaviaBase, self).setUp()
idl_ovn.OvnNbApiIdlImpl.ovsdb_connection = None
ovn_driver.OvnProviderHelper.ovn_nbdb_api = None
ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None
ovn_driver.OvnProviderDriver._ovn_helper = None
# TODO(mjozefcz): Use octavia listeners to provide needed
# sockets and modify tests in order to verify if fake
# listener (status) has received valid value.
@ -70,7 +67,6 @@ class TestOctaviaOvnProviderDriver(
self.fake_neutron_client.delete_port.return_value = True
self._local_net_cache = {}
self._local_port_cache = {'ports': []}
self.addCleanup(self.ovn_driver._ovn_helper.shutdown)
self.core_plugin = directory.get_plugin()
def _mock_show_subnet(self, subnet_id):
@ -871,6 +867,10 @@ class TestOctaviaOvnProviderDriver(
self._wait_for_status_and_validate(lb_data, [expected_status])
class TestOvnOctaviaProviderDriver(
TestOvnOctaviaBase):
def test_loadbalancer(self):
lb_data = self._create_load_balancer_and_validate(
{'vip_network': 'vip_network',
@ -1126,6 +1126,66 @@ class TestOctaviaOvnProviderDriver(
self._create_listener_and_validate(lb_data)
self._delete_load_balancer_and_validate(lb_data)
def test_lb_listener_pool_workflow(self):
lb_data = self._create_load_balancer_and_validate(
{'vip_network': 'vip_network',
'cidr': '10.0.0.0/24'})
self._create_listener_and_validate(lb_data)
self._create_pool_and_validate(
lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id)
self._delete_pool_and_validate(
lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id)
self._delete_listener_and_validate(lb_data)
self._delete_load_balancer_and_validate(lb_data)
def test_lb_member_batch_update(self):
# Create a LoadBalancer
lb_data = self._create_load_balancer_and_validate(
{'vip_network': 'vip_network',
'cidr': '10.0.0.0/24'})
# Create a pool
self._create_pool_and_validate(lb_data, "p1")
pool_id = lb_data['pools'][0].pool_id
# Create Member-1 and associate it with lb_data
self._create_member_and_validate(
lb_data, pool_id, lb_data['vip_net_info'][1],
lb_data['vip_net_info'][0], '10.0.0.10')
# Create Member-2
m_member = self._create_member_model(pool_id,
lb_data['vip_net_info'][1],
'10.0.0.12')
# Update ovn's Logical switch reference
self._update_ls_refs(lb_data, lb_data['vip_net_info'][0])
lb_data['pools'][0].members.append(m_member)
# Add a new member to the LB
members = [m_member] + [lb_data['pools'][0].members[0]]
self._update_members_in_batch_and_validate(lb_data, pool_id, members)
# Deleting one member, while keeping the other member available
self._update_members_in_batch_and_validate(lb_data, pool_id,
[m_member])
self._delete_load_balancer_and_validate(lb_data)
class TestOvnOctaviaProviderAgent(TestOvnOctaviaBase):
def setUp(self):
super(TestOvnOctaviaProviderAgent, self).setUp()
self._initialize_ovn_da()
def _initialize_ovn_da(self):
# NOTE(mjozefcz): In theory this is separate process
# with IDL running, but to make it easier for now
# we can initialize this IDL here instead spawning
# another process.
da_helper = ovn_driver.OvnProviderHelper()
events = [ovn_driver.LogicalRouterPortEvent(da_helper),
ovn_driver.LogicalSwitchPortUpdateEvent(da_helper)]
ovn_nb_idl_for_events = ovn_driver.OvnNbIdlForLb(
event_lock_name='func_test')
ovn_nb_idl_for_events.notify_handler.watch_events(events)
ovn_nb_idl_for_events.start()
atexit.register(da_helper.shutdown)
def _test_lrp_event_handler(self, cascade=False):
# Create Network N1 on router R1 and LBA on N1
lba_data = self._create_load_balancer_and_validate(
@ -1241,45 +1301,6 @@ class TestOctaviaOvnProviderDriver(
LR_REF_KEY_HEADER + provider_net['network']['id']),
timeout=10)
def test_lb_listener_pool_workflow(self):
lb_data = self._create_load_balancer_and_validate(
{'vip_network': 'vip_network',
'cidr': '10.0.0.0/24'})
self._create_listener_and_validate(lb_data)
self._create_pool_and_validate(
lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id)
self._delete_pool_and_validate(
lb_data, "p1", listener_id=lb_data['listeners'][0].listener_id)
self._delete_listener_and_validate(lb_data)
self._delete_load_balancer_and_validate(lb_data)
def test_lb_member_batch_update(self):
# Create a LoadBalancer
lb_data = self._create_load_balancer_and_validate(
{'vip_network': 'vip_network',
'cidr': '10.0.0.0/24'})
# Create a pool
self._create_pool_and_validate(lb_data, "p1")
pool_id = lb_data['pools'][0].pool_id
# Create Member-1 and associate it with lb_data
self._create_member_and_validate(
lb_data, pool_id, lb_data['vip_net_info'][1],
lb_data['vip_net_info'][0], '10.0.0.10')
# Create Member-2
m_member = self._create_member_model(pool_id,
lb_data['vip_net_info'][1],
'10.0.0.12')
# Update ovn's Logical switch reference
self._update_ls_refs(lb_data, lb_data['vip_net_info'][0])
lb_data['pools'][0].members.append(m_member)
# Add a new member to the LB
members = [m_member] + [lb_data['pools'][0].members[0]]
self._update_members_in_batch_and_validate(lb_data, pool_id, members)
# Deleting one member, while keeping the other member available
self._update_members_in_batch_and_validate(lb_data, pool_id,
[m_member])
self._delete_load_balancer_and_validate(lb_data)
def test_fip_on_lb_vip(self):
"""This test checks if FIP on LB VIP is configured.
@ -1349,3 +1370,13 @@ class TestOctaviaOvnProviderDriver(
elif ls.name == provider_net:
# Make sure that LB1 is not added to provider net - e1 LS
self.assertListEqual([], ls.load_balancer)
def test_agent_exit(self):
exit_event = mp.Event()
agent = mp.Process(target=ovn_agent.OvnProviderAgent,
args=[exit_event])
agent.start()
self.assertTrue(agent.is_alive())
exit_event.set()
agent.join()
self.assertFalse(agent.is_alive())

View File

@ -25,6 +25,7 @@ from oslo_utils import uuidutils
from ovs.db import idl as ovs_idl
from ovsdbapp.backend.ovs_idl import idlutils
from ovn_octavia_provider import agent as ovn_agent
from ovn_octavia_provider.common import constants as ovn_const
from ovn_octavia_provider import driver as ovn_driver
from ovn_octavia_provider.tests.unit import fakes
@ -109,8 +110,8 @@ class TestOvnOctaviaBase(base.BaseTestCase):
self.vip_network_id = uuidutils.generate_uuid()
self.vip_port_id = uuidutils.generate_uuid()
self.vip_subnet_id = uuidutils.generate_uuid()
mock.patch(
"ovn_octavia_provider.driver.OvnNbIdlForLb").start()
ovn_nb_idl = mock.patch("ovn_octavia_provider.driver.OvnNbIdlForLb")
self.mock_ovn_nb_idl = ovn_nb_idl.start()
self.member_address = "192.168.2.149"
self.vip_address = '192.148.210.109'
self.vip_dict = {'vip_network_id': uuidutils.generate_uuid(),
@ -2551,7 +2552,7 @@ class TestOvnProviderHelper(TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.driver.OvnProviderHelper.'
'_find_ovn_lbs')
def test_vip_port_update_handler_lb_not_found(self, lb):
lb.side_effect = [idlutils.RowNotFound]
lb.side_effect = [idlutils.RowNotFound for _ in range(5)]
self.switch_port_event = ovn_driver.LogicalSwitchPortUpdateEvent(
self.helper)
port_name = '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX, 'foo')
@ -2772,8 +2773,7 @@ class TestOvnProviderHelper(TestOvnOctaviaBase):
self._test_handle_member_dvr_lb_fip(
net_cli, action=ovn_driver.REQ_INFO_MEMBER_DELETED)
@mock.patch.object(ovn_driver, 'atexit')
def test_ovsdb_connections(self, mock_atexit):
def test_ovsdb_connections(self):
ovn_driver.OvnProviderHelper.ovn_nbdb_api = None
ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None
prov_helper1 = ovn_driver.OvnProviderHelper()
@ -2786,10 +2786,6 @@ class TestOvnProviderHelper(TestOvnOctaviaBase):
prov_helper2.ovn_nbdb_api_for_events)
prov_helper2.shutdown()
prov_helper1.shutdown()
# Assert at_exit calls
mock_atexit.assert_has_calls([
mock.call.register(prov_helper1.shutdown),
mock.call.register(prov_helper2.shutdown)])
def test_create_vip_port_vip_selected(self):
expected_dict = {
@ -2945,3 +2941,13 @@ class TestOvnProviderHelper(TestOvnOctaviaBase):
fol.return_value = None
ret = self.helper.check_lb_protocol(self.listener_id, 'TCP')
self.assertFalse(ret)
class TestOvnProviderAgent(TestOvnOctaviaBase):
def test_exit(self):
mock_exit_event = mock.MagicMock()
mock_exit_event.is_set.side_effect = [False, False, False, False, True]
ovn_agent.OvnProviderAgent(mock_exit_event)
self.assertEqual(1, mock_exit_event.wait.call_count)
self.assertEqual(2, self.mock_ovn_nb_idl.call_count)

View File

@ -26,10 +26,12 @@ packages =
setup-hooks =
pbr.hooks.setup_hook
[entry_points]
octavia.api.drivers =
ovn = ovn_octavia_provider.driver:OvnProviderDriver
octavia.driver_agent.provider_agents =
ovn = ovn_octavia_provider.agent:OvnProviderAgent
oslo.config.opts =
octavia.api.drivers.ovn = ovn_octavia_provider.common.config:list_opts