Add Local IP L2 extension skeleton

OVS agent part of Local IP feature was divided into
2 parts to make it easier for reviewers:

1. This patch adds agent extension skeleton and sets
server <-> agent RPC communication mechanism via
push notifications of LocalIPAssociation objects
create/delete. It also shows how the extension would
treat those changes. It may be called extension "frontend".

2. Agent extension flows patch (next one) - deals with OVS
flows and can be called extension "backend".

Partial-Bug: #1930200
Change-Id: I31cb4062b6a21b71c739ab202c60aa7002e4d36e
This commit is contained in:
Oleg Bondarev 2021-10-25 14:52:08 +03:00
parent cd1d96863e
commit 1222962767
6 changed files with 307 additions and 0 deletions

View File

@ -0,0 +1,135 @@
# Copyright 2021 Huawei, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import sys
from neutron_lib.agent import l2_extension
from neutron_lib.callbacks import events as lib_events
from neutron_lib.callbacks import registry as lib_registry
from neutron_lib import context as lib_ctx
from oslo_log import log as logging
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
constants as ovs_constants)
LOG = logging.getLogger(__name__)
class LocalIPAgentExtension(l2_extension.L2AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.LOCAL_IP_ASSOCIATION]
def initialize(self, connection, driver_type):
if driver_type != ovs_constants.EXTENSION_DRIVER_TYPE:
LOG.error('Local IP extension is only supported for OVS, '
'currently uses %(driver_type)s',
{'driver_type': driver_type})
sys.exit(1)
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers(connection)
self.local_ip_updates = {
'added': collections.defaultdict(dict),
'deleted': collections.defaultdict(dict)
}
self._pull_all_local_ip_associations()
def _pull_all_local_ip_associations(self):
context = lib_ctx.get_admin_context_without_session()
assoc_list = self.resource_rpc.bulk_pull(
context, resources.LOCAL_IP_ASSOCIATION)
for assoc in assoc_list:
port_id = assoc.fixed_port_id
lip_id = assoc.local_ip_id
self.local_ip_updates['added'][port_id][lip_id] = assoc
# No need to notify "port updated" here as on restart agent
# handles all ports anyway
def consume_api(self, agent_api):
"""Allows an extension to gain access to resources internal to the
neutron agent and otherwise unavailable to the extension.
"""
self.agent_api = agent_api
def _register_rpc_consumers(self, connection):
"""Allows an extension to receive notifications of updates made to
items of interest.
"""
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
for resource_type in self.SUPPORTED_RESOURCE_TYPES:
# We assume that the neutron server always broadcasts the latest
# version known to the agent
registry.register(self._handle_notification, resource_type)
topic = resources_rpc.resource_type_versioned_topic(resource_type)
connection.create_consumer(topic, endpoints, fanout=True)
def _handle_notification(self, context, resource_type,
local_ip_associations, event_type):
if resource_type != resources.LOCAL_IP_ASSOCIATION:
LOG.warning("Only Local IP Association notifications are "
"supported, got: %s", resource_type)
return
LOG.info("Local IP Association notification received: %s, %s",
local_ip_associations, event_type)
for assoc in local_ip_associations:
port_id = assoc.fixed_port_id
lip_id = assoc.local_ip_id
if event_type in [events.CREATED, events.UPDATED]:
self.local_ip_updates['added'][port_id][lip_id] = assoc
elif event_type == events.DELETED:
self.local_ip_updates['deleted'][port_id][lip_id] = assoc
self.local_ip_updates['added'][port_id].pop(lip_id, None)
# Notify agent about port update to handle Local IP flows
self._notify_port_updated(context, port_id)
def _notify_port_updated(self, context, port_id):
payload = lib_events.DBEventPayload(
context, metadata={'changed_fields': {'local_ip'}},
resource_id=port_id, states=(None,))
lib_registry.publish(resources.PORT, lib_events.AFTER_UPDATE,
self, payload=payload)
def handle_port(self, context, port):
"""Handle Local IP associations for a port.
"""
port_id = port['port_id']
local_ip_updates = self._pop_local_ip_updates_for_port(port_id)
for assoc in local_ip_updates['added'].values():
LOG.info("Local IP added for port %s: %s",
port_id, assoc.local_ip)
# TBD
for assoc in local_ip_updates['deleted'].values():
LOG.info("Local IP deleted from port %s: %s",
port_id, assoc.local_ip)
# TBD
def _pop_local_ip_updates_for_port(self, port_id):
return {
'added': self.local_ip_updates['added'].pop(port_id, {}),
'deleted': self.local_ip_updates['deleted'].pop(port_id, {})
}
def delete_port(self, context, port):
self.local_ip_updates['added'].pop(port['port_id'], None)
self.local_ip_updates['deleted'].pop(port['port_id'], None)

View File

@ -13,6 +13,7 @@
from neutron._i18n import _
from neutron.objects import address_group
from neutron.objects import conntrack_helper
from neutron.objects import local_ip
from neutron.objects.logapi import logging_resource as log_object
from neutron.objects import network
from neutron.objects import port_forwarding
@ -36,6 +37,7 @@ SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name()
PORTFORWARDING = port_forwarding.PortForwarding.obj_name()
CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name()
ADDRESSGROUP = address_group.AddressGroup.obj_name()
LOCAL_IP_ASSOCIATION = local_ip.LocalIPAssociation.obj_name()
_VALID_CLS = (
@ -51,6 +53,7 @@ _VALID_CLS = (
port_forwarding.PortForwarding,
conntrack_helper.ConntrackHelper,
address_group.AddressGroup,
local_ip.LocalIPAssociation,
)
_TYPE_TO_CLS_MAP = {cls.obj_name(): cls for cls in _VALID_CLS}

View File

@ -15,6 +15,8 @@
from neutron_lib.api.definitions import local_ip as local_ip_apidef
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.db import local_ip_db
@ -26,3 +28,21 @@ class LocalIPPlugin(local_ip_db.LocalIPDbMixin):
__native_pagination_support = True
__native_sorting_support = True
__filter_validation_support = True
def __init__(self):
super(LocalIPPlugin, self).__init__()
self._resource_rpc = resources_rpc.ResourcesPushRpcApi()
def create_local_ip_port_association(self, context, local_ip_id,
port_association):
lip_assoc = self._create_local_ip_port_association(
context, local_ip_id, port_association)
self._resource_rpc.push(context, [lip_assoc], rpc_events.CREATED)
return self._make_local_ip_assoc_dict(lip_assoc)
def delete_local_ip_port_association(self, context, fixed_port_id,
local_ip_id):
lip_assoc = super(
LocalIPPlugin, self).delete_local_ip_port_association(
context, fixed_port_id, local_ip_id)
self._resource_rpc.push(context, [lip_assoc], rpc_events.DELETED)

View File

@ -0,0 +1,145 @@
# Copyright 2021 Huawei, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from neutron_lib.callbacks import events as lib_events
from neutron_lib.callbacks import registry as lib_registry
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.agent.l2.extensions import local_ip as local_ip_ext
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.objects import local_ip as lip_obj
from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_agent_extension_api as ovs_ext_api
from neutron.tests import base
class LocalIPAgentExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(LocalIPAgentExtensionTestCase, self).setUp()
self.context = context.get_admin_context_without_session()
self.local_ip_ext = local_ip_ext.LocalIPAgentExtension()
self.int_br = mock.Mock()
self.tun_br = mock.Mock()
self.plugin_rpc = mock.Mock()
self.agent_api = ovs_ext_api.OVSAgentExtensionAPI(
self.int_br,
self.tun_br,
phys_brs=None,
plugin_rpc=self.plugin_rpc)
self.local_ip_ext.consume_api(self.agent_api)
with mock.patch.object(
self.local_ip_ext, '_pull_all_local_ip_associations'):
self.local_ip_ext.initialize(mock.Mock(), 'ovs')
def _generate_test_lip_associations(self, count=2):
return [lip_obj.LocalIPAssociation(
fixed_port_id=uuidutils.generate_uuid(),
local_ip_id=uuidutils.generate_uuid(),
local_ip=lip_obj.LocalIP()) for _ in range(count)
]
def test_pulling_lip_associations_on_init(self):
res_rpc = mock.Mock()
lip_assocs = self._generate_test_lip_associations()
with mock.patch('neutron.api.rpc.handlers.'
'resources_rpc.ResourcesPullRpcApi') as res_rpc_cls:
res_rpc_cls.return_value = res_rpc
res_rpc.bulk_pull.return_value = lip_assocs
self.local_ip_ext.initialize(mock.Mock(), 'ovs')
res_rpc.bulk_pull.assert_called_once_with(
mock.ANY, resources.LOCAL_IP_ASSOCIATION)
for assoc in lip_assocs:
self.assertEqual(
assoc, self.local_ip_ext.local_ip_updates[
'added'][assoc.fixed_port_id][assoc.local_ip_id])
def test_notify_port_updated(self):
with mock.patch.object(lib_registry, "publish") as publish_mock:
port_id = 'test'
self.local_ip_ext._notify_port_updated(
self.context, port_id=port_id)
publish_mock.assert_called_once_with(
resources.PORT, lib_events.AFTER_UPDATE,
self.local_ip_ext, payload=mock.ANY)
actual_payload = publish_mock.call_args[1]['payload']
self.assertEqual(port_id, actual_payload.resource_id)
self.assertEqual({'changed_fields': {'local_ip'}},
actual_payload.metadata)
def test_handle_updated_notification(self):
lip_assocs = self._generate_test_lip_associations()
with mock.patch.object(
self.local_ip_ext,
"_notify_port_updated") as port_update_notify:
self.local_ip_ext._handle_notification(
self.context, resources.LOCAL_IP_ASSOCIATION,
lip_assocs, events.UPDATED)
for assoc in lip_assocs:
self.assertEqual(
assoc, self.local_ip_ext.local_ip_updates[
'added'][assoc.fixed_port_id][assoc.local_ip_id])
port_update_notify.assert_any_call(
self.context, assoc.fixed_port_id)
return lip_assocs
def test_handle_deleted_notification(self, lip_assocs=None):
lip_assocs = lip_assocs or self.test_handle_updated_notification()
with mock.patch.object(
self.local_ip_ext,
"_notify_port_updated") as port_update_notify:
self.local_ip_ext._handle_notification(
self.context, resources.LOCAL_IP_ASSOCIATION,
lip_assocs, events.DELETED)
for assoc in lip_assocs:
self.assertEqual({}, self.local_ip_ext.local_ip_updates[
'added'][assoc.fixed_port_id])
self.assertEqual(
assoc, self.local_ip_ext.local_ip_updates[
'deleted'][assoc.fixed_port_id][assoc.local_ip_id])
port_update_notify.assert_any_call(
self.context, assoc.fixed_port_id)
def test_handle_port(self):
lip_assocs = self.test_handle_updated_notification()
for assoc in lip_assocs:
port = {'port_id': assoc.fixed_port_id}
self.local_ip_ext.handle_port(self.context, port)
self.assertEqual({}, self.local_ip_ext.local_ip_updates[
'added'][assoc.fixed_port_id])
self.test_handle_deleted_notification(lip_assocs)
for assoc in lip_assocs:
port = {'port_id': assoc.fixed_port_id}
self.local_ip_ext.handle_port(self.context, port)
self.assertEqual({}, self.local_ip_ext.local_ip_updates[
'deleted'][assoc.fixed_port_id])
def test_delete_port(self):
lip_assocs = self.test_handle_updated_notification()
for assoc in lip_assocs:
port = {'port_id': assoc.fixed_port_id}
self.local_ip_ext.delete_port(self.context, port)
self.assertEqual({}, self.local_ip_ext.local_ip_updates['added'])
self.assertEqual({}, self.local_ip_ext.local_ip_updates['added'])

View File

@ -14,6 +14,7 @@
# under the License.
import contextlib
from unittest import mock
import netaddr
from neutron_lib.api.definitions import local_ip as apidef
@ -97,6 +98,8 @@ class TestLocalIP(LocalIPTestBase):
ext_mgr = LocalIPTestExtensionManager()
svc_plugins = (
'neutron.services.local_ip.local_ip_plugin.LocalIPPlugin',)
mock.patch("neutron.api.rpc.handlers.resources_rpc."
"ResourcesPushRpcApi.push").start()
super(TestLocalIP, self).setUp(ext_mgr=ext_mgr,
service_plugins=svc_plugins)

View File

@ -130,6 +130,7 @@ neutron.agent.l2.extensions =
fdb = neutron.agent.l2.extensions.fdb_population:FdbPopulationAgentExtension
log = neutron.services.logapi.agent.log_extension:LoggingExtension
dhcp = neutron.agent.l2.extensions.dhcp.extension:DHCPAgentExtension
local_ip = neutron.agent.l2.extensions.local_ip:LocalIPAgentExtension
neutron.agent.l3.extensions =
fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension
gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension