Merge "Generalize agent extension mechanism"

This commit is contained in:
Jenkins 2016-07-19 17:14:33 +00:00 committed by Gerrit Code Review
commit 122a971656
20 changed files with 323 additions and 139 deletions

View File

@ -0,0 +1,75 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Convention for heading levels in Neutron devref:
======= Heading 0 (reserved for the title in a document)
------- Heading 1
~~~~~~~ Heading 2
+++++++ Heading 3
''''''' Heading 4
(Avoid deeper levels because they do not render well.)
Agent extensions
================
All reference agents utilize a common extension mechanism that allows for the
introduction and enabling of a core resource extension without needing to
change agent code. This mechanism allows multiple agent extensions to be run by
a single agent simultaneously. The mechanism may be especially interesting to
third parties whose extensions lie outside the neutron tree.
Under this framework, an agent may expose its API to each of its extensions
thereby allowing an extension to access resources internal to the agent. At
layer 2, for instance, upon each port event the agent is then able to trigger a
handle_port method in its extensions.
Each extension is referenced through a stevedore entry point defined within a
specific namespace. For example, L2 extensions are referenced through the
neutron.agent.l2.extensions namespace.
The relevant modules are:
* neutron.agent.agent_extension:
This module defines an abstract extension interface for all agent
extensions across L2 and L3.
* neutron.agent.l2.agent_extension:
* neutron.agent.l3.agent_extension:
These modules subclass
neutron.agent.agent_extension.AgentExtension and define a
layer-specific abstract extension interface.
* neutron.agent.agent_extensions_manager:
This module contains a manager that allows extensions to load themselves at
runtime.
* neutron.agent.l2.l2_extensions_manager:
* neutron.agent.l3.l3_extensions_manager:
Each of these modules passes core resource events to loaded extensions.
Agent API object
----------------
Every agent can pass an "agent API object" into its extensions in order to
expose its internals to them in a controlled way. To accommodate different
agents, each extension may define a consume_api() method that will receive
this object.
This agent API object is part of neutron's public interface for third parties.
All changes to the interface will be managed in a backwards-compatible way.
At the moment, only the L2 Open vSwitch agent provides an agent API object to
extensions. See :doc:`L2 agent extensions <agent_extensions>`.

View File

@ -64,6 +64,7 @@ Neutron Internals
rpc_callbacks
layer3
l2_agents
agent_extensions
ovs_vhostuser
quality_of_service
service_extensions

View File

@ -24,38 +24,8 @@
L2 agent extensions
===================
All reference agents support common extension mechanism that allows to easily
reuse code between agents and to avoid the need to patch an agent for each new
core resource extension. Those extensions can be especially interesting to
third parties that don't want to maintain their code in Neutron tree.
Extensions are referenced through stevedore entry points defined under
neutron.agent.l2.extensions namespace. On each port event, handle_port is
triggered by the agent.
* neutron.agent.l2.agent_extension:
This module defines an abstract extension interface.
* neutron.agent.l2.extensions.manager:
This module contains a manager that allows to register multiple extensions,
and passes handle_port events down to all enabled extensions.
Agent API object
----------------
Every agent can pass a so-called agent API object into extensions to expose
some of its internals to them in controlled way.
If an extension is interested in using the object, it should define
consume_api() method that will receive the object before extension's
initialize() method is called by the extension manager.
This agent API object is part of public Neutron interface for third parties.
All changes to the interface will be managed in backwards compatible way.
At the moment, only Open vSwitch agent provides an agent API object to
extensions.
L2 agent extensions are part of a generalized L2/L3 extension framework. See
:doc:`agent extensions <agent_extensions>`.
Open vSwitch agent API
~~~~~~~~~~~~~~~~~~~~~~

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AgentExtension(object):
"""Define stable abstract interface for agent extensions.
An agent extension extends the agent core functionality.
"""
@abc.abstractmethod
def initialize(self, connection, driver_type):
"""Perform agent core resource extension initialization.
:param connection: RPC connection that can be reused by the extension
to define its RPC endpoints
:param driver_type: a string that defines the agent type to the
extension. Can be used to choose the right backend
implementation.
Called after all extensions have been loaded.
No resource (port, policy, router, etc.) handling will be called before
this method.
"""
def consume_api(self, agent_api):
"""Consume the AgentAPI instance from the AgentExtensionsManager.
Allows an extension to gain access to resources internal to the
neutron agent and otherwise unavailable to the extension. Examples of
such resources include bridges, ports, and routers.
:param agent_api: An instance of an agent-specific API.
"""

View File

@ -1,6 +1,3 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -17,13 +14,12 @@ from oslo_config import cfg
from oslo_log import log
import stevedore
from neutron._i18n import _, _LE, _LI
from neutron._i18n import _, _LI
LOG = log.getLogger(__name__)
L2_AGENT_EXT_MANAGER_NAMESPACE = 'neutron.agent.l2.extensions'
L2_AGENT_EXT_MANAGER_OPTS = [
AGENT_EXT_MANAGER_OPTS = [
cfg.ListOpt('extensions',
default=[],
help=_('Extensions list to use')),
@ -31,20 +27,20 @@ L2_AGENT_EXT_MANAGER_OPTS = [
def register_opts(conf):
conf.register_opts(L2_AGENT_EXT_MANAGER_OPTS, 'agent')
conf.register_opts(AGENT_EXT_MANAGER_OPTS, 'agent')
class AgentExtensionsManager(stevedore.named.NamedExtensionManager):
"""Manage agent extensions."""
def __init__(self, conf):
def __init__(self, conf, namespace):
super(AgentExtensionsManager, self).__init__(
L2_AGENT_EXT_MANAGER_NAMESPACE, conf.agent.extensions,
namespace, conf.agent.extensions,
invoke_on_load=True, name_order=True)
LOG.info(_LI("Loaded agent extensions: %s"), self.names())
def initialize(self, connection, driver_type, agent_api=None):
"""Initialize enabled L2 agent extensions.
"""Initialize enabled agent extensions.
:param connection: RPC connection that can be reused by extensions to
define their RPC endpoints
@ -58,29 +54,11 @@ class AgentExtensionsManager(stevedore.named.NamedExtensionManager):
# Initialize each agent extension in the list.
for extension in self:
LOG.info(_LI("Initializing agent extension '%s'"), extension.name)
# If the agent has provided an agent_api object, this object will
# be passed to all interested extensions. This object must be
# consumed by each such extension before the extension's
# intialize() method is called, as the initilization step
# relies on the agent_api already being available.
extension.obj.consume_api(agent_api)
extension.obj.initialize(connection, driver_type)
def handle_port(self, context, data):
"""Notify all agent extensions to handle port."""
for extension in self:
try:
extension.obj.handle_port(context, data)
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed "
"while handling port update"),
{'name': extension.name}
)
def delete_port(self, context, data):
"""Notify all agent extensions to delete port."""
for extension in self:
try:
extension.obj.delete_port(context, data)
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed "
"while handling port deletion"),
{'name': extension.name}
)

View File

@ -17,52 +17,11 @@ import abc
import six
from neutron.agent.l2 import l2_agent_extension
@six.add_metaclass(abc.ABCMeta)
class AgentCoreResourceExtension(object):
"""Define stable abstract interface for agent extensions.
An agent extension extends the agent core functionality.
class AgentCoreResourceExtension(l2_agent_extension.L2AgentExtension):
"""This is a shim around L2AgentExtension class. It is intended for use by
out of tree extensions that were inheriting AgentCoreResourceExtension.
"""
def initialize(self, connection, driver_type):
"""Perform agent core resource extension initialization.
:param connection: RPC connection that can be reused by the extension
to define its RPC endpoints
:param driver_type: a string that defines the agent type to the
extension. Can be used to choose the right backend
implementation.
Called after all extensions have been loaded.
No port handling will be called before this method.
"""
@abc.abstractmethod
def handle_port(self, context, data):
"""Handle agent extension for port.
This can be called on either create or update, depending on the
code flow. Thus, it's this function's responsibility to check what
actually changed.
:param context: rpc context
:param data: port data
"""
@abc.abstractmethod
def delete_port(self, context, data):
"""Delete port from agent extension.
:param context: rpc context
:param data: port data
"""
def consume_api(self, agent_api):
"""Consume the AgentAPI instance from the AgentExtensionsManager
This allows extensions to gain access to resources limited to the
NeutronAgent.
:param agent_api: An instance of an agent specific API
"""

View File

@ -20,7 +20,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from neutron._i18n import _, _LE, _LW
from neutron.agent.l2 import agent_extension
from neutron.agent.l2 import l2_agent_extension
from neutron.agent.linux import bridge_lib
from neutron.common import utils as n_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent.common import (
@ -42,7 +42,8 @@ cfg.CONF.register_opts(fdb_population_opt, 'FDB')
LOG = logging.getLogger(__name__)
class FdbPopulationAgentExtension(agent_extension.AgentCoreResourceExtension):
class FdbPopulationAgentExtension(
l2_agent_extension.L2AgentExtension):
"""The FDB population is an agent extension to OVS or linux bridge
who's objective is to update the FDB table for existing instance
using normal port, thus enabling communication between SR-IOV instances

View File

@ -22,7 +22,7 @@ from oslo_log import log as logging
import six
from neutron._i18n import _LW, _LI
from neutron.agent.l2 import agent_extension
from neutron.agent.l2 import l2_agent_extension
from neutron.agent.linux import tc_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
@ -185,13 +185,12 @@ class PortPolicyMap(object):
del self.known_policies[qos_policy_id]
class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
SUPPORTED_RESOURCES = [resources.QOS_POLICY]
class QosAgentExtension(l2_agent_extension.L2AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.QOS_POLICY]
def initialize(self, connection, driver_type):
"""Perform Agent Extension initialization.
"""Initialize agent extension."""
"""
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self.qos_driver = manager.NeutronManager.load_class_for_provider(
'neutron.qos.agent_drivers', driver_type)()
@ -200,17 +199,23 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
self.policy_map = PortPolicyMap()
registry.subscribe(self._handle_notification, resources.QOS_POLICY)
self._register_rpc_consumers(connection)
def consume_api(self, agent_api):
"""Allows an extension to gain access to resources internal to the
neutron agent and otherwise unavailable to the extension.
"""
self.agent_api = agent_api
def _register_rpc_consumers(self, connection):
"""Allows an extension to receive notifications of updates made to
items of interest.
"""
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
for resource_type in self.SUPPORTED_RESOURCES:
# we assume that neutron-server always broadcasts the latest
for resource_type in self.SUPPORTED_RESOURCE_TYPES:
# We assume that the neutron server always broadcasts the latest
# version known to the agent
registry.subscribe(self._handle_notification, resource_type)
topic = resources_rpc.resource_type_versioned_topic(resource_type)
connection.create_consumer(topic, endpoints, fanout=True)

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from neutron.agent import agent_extension
@six.add_metaclass(abc.ABCMeta)
class L2AgentExtension(agent_extension.AgentExtension):
"""Define stable abstract interface for l2 agent extensions.
An agent extension extends the agent core functionality.
"""
def initialize(self, connection, driver_type):
"""Initialize agent extension."""
@abc.abstractmethod
def handle_port(self, context, data):
"""Handle agent extension for port.
This can be called on either create or update, depending on the
code flow. Thus, it's this function's responsibility to check what
actually changed.
:param context: rpc context
:param data: port data
"""
@abc.abstractmethod
def delete_port(self, context, data):
"""Delete port from agent extension.
:param context: rpc context
:param data: port data
"""

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from neutron._i18n import _LE
from neutron.agent import agent_extensions_manager as agent_ext_manager
LOG = log.getLogger(__name__)
L2_AGENT_EXT_MANAGER_NAMESPACE = 'neutron.agent.l2.extensions'
def register_opts(conf):
agent_ext_manager.register_opts(conf)
class L2AgentExtensionsManager(agent_ext_manager.AgentExtensionsManager):
"""Manage l2 agent extensions. The handle_port and delete_port methods are
guaranteed to be attributes of each extension because they have been
marked as abc.abstractmethod in the extensions' abstract class.
"""
def __init__(self, conf):
super(L2AgentExtensionsManager, self).__init__(conf,
L2_AGENT_EXT_MANAGER_NAMESPACE)
def handle_port(self, context, data):
"""Notify all agent extensions to handle port."""
for extension in self:
try:
extension.obj.handle_port(context, data)
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed "
"while handling port update"),
{'name': extension.name}
)
def delete_port(self, context, data):
"""Notify all agent extensions to delete port."""
for extension in self:
try:
extension.obj.delete_port(context, data)
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed "
"while handling port deletion"),
{'name': extension.name}
)

View File

@ -17,9 +17,9 @@ import operator
from keystoneauth1 import loading as ks_loading
from oslo_config import cfg
import neutron.agent.agent_extensions_manager
import neutron.agent.common.config
import neutron.agent.common.ovs_lib
import neutron.agent.l2.extensions.manager
import neutron.agent.l3.config
import neutron.agent.l3.ha
import neutron.agent.linux.interface
@ -288,7 +288,7 @@ def list_sriov_agent_opts():
neutron.plugins.ml2.drivers.mech_sriov.agent.common.config.
sriov_nic_opts),
('agent',
neutron.agent.l2.extensions.manager.L2_AGENT_EXT_MANAGER_OPTS)
neutron.agent.agent_extensions_manager.AGENT_EXT_MANAGER_OPTS)
]

View File

@ -26,7 +26,7 @@ from oslo_service import service
from osprofiler import profiler
from neutron._i18n import _LE, _LI
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
@ -167,7 +167,7 @@ class CommonAgentLoop(service.Service):
def init_extension_manager(self, connection):
ext_manager.register_opts(cfg.CONF)
self.ext_manager = (
ext_manager.AgentExtensionsManager(cfg.CONF))
ext_manager.L2AgentExtensionsManager(cfg.CONF))
self.ext_manager.initialize(
connection, self.mgr.get_extension_driver_type())

View File

@ -29,7 +29,7 @@ from osprofiler import profiler
import six
from neutron._i18n import _, _LE, _LI, _LW
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
@ -186,7 +186,7 @@ class SriovNicSwitchAgent(object):
def _create_agent_extension_manager(self, connection):
ext_manager.register_opts(self.conf)
mgr = ext_manager.AgentExtensionsManager(self.conf)
mgr = ext_manager.L2AgentExtensionsManager(self.conf)
mgr.initialize(connection, 'sriov')
return mgr

View File

@ -74,7 +74,7 @@ class OVSAgentExtensionAPI(object):
'''Implements the Agent API for Open vSwitch agent.
Extensions can gain access to this API by overriding the consume_api
method which has been added to the AgentCoreResourceExtension class.
method which has been added to the AgentExtension class.
'''
def __init__(self, int_br, tun_br):

View File

@ -37,7 +37,7 @@ from neutron.agent.common import ip_lib
from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.callbacks import resources
@ -396,7 +396,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def init_extension_manager(self, connection):
ext_manager.register_opts(self.conf)
self.ext_manager = (
ext_manager.AgentExtensionsManager(self.conf))
ext_manager.L2AgentExtensionsManager(self.conf))
self.agent_api = ovs_ext_api.OVSAgentExtensionAPI(self.int_br,
self.tun_br)
self.ext_manager.initialize(

View File

@ -24,7 +24,7 @@ from oslo_utils import uuidutils
from neutron.agent.common import config as agent_config
from neutron.agent.common import ovs_lib
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent.linux import interface
from neutron.agent.linux import polling
from neutron.common import config as common_config

View File

@ -308,7 +308,7 @@ class QosExtensionInitializeTestCase(QosExtensionBaseTestCase):
resources_rpc.resource_type_versioned_topic(resource_type),
[rpc_mock()],
fanout=True)
for resource_type in self.qos_ext.SUPPORTED_RESOURCES]
for resource_type in self.qos_ext.SUPPORTED_RESOURCE_TYPES]
)
subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)

View File

@ -13,20 +13,20 @@
import mock
from oslo_config import cfg
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as l2_ext_manager
from neutron.tests import base
class TestAgentExtensionsManager(base.BaseTestCase):
class TestL2AgentExtensionsManager(base.BaseTestCase):
def setUp(self):
super(TestAgentExtensionsManager, self).setUp()
super(TestL2AgentExtensionsManager, self).setUp()
mock.patch('neutron.agent.l2.extensions.qos.QosAgentExtension',
autospec=True).start()
conf = cfg.CONF
ext_manager.register_opts(conf)
l2_ext_manager.register_opts(conf)
cfg.CONF.set_override('extensions', ['qos'], 'agent')
self.manager = ext_manager.AgentExtensionsManager(conf)
self.manager = l2_ext_manager.L2AgentExtensionsManager(conf)
def _get_extension(self):
return self.manager.extensions[0].obj

View File

@ -0,0 +1,39 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.agent import agent_extensions_manager as ext_manager
from neutron.tests import base
class TestAgentExtensionsManager(base.BaseTestCase):
def setUp(self):
super(TestAgentExtensionsManager, self).setUp()
mock.patch('neutron.agent.l2.extensions.qos.QosAgentExtension',
autospec=True).start()
conf = cfg.CONF
ext_manager.register_opts(conf)
cfg.CONF.set_override('extensions', ['qos'], 'agent')
namespace = 'neutron.agent.l2.extensions'
self.manager = ext_manager.AgentExtensionsManager(conf, namespace)
def _get_extension(self):
return self.manager.extensions[0].obj
def test_initialize(self):
connection = object()
self.manager.initialize(connection, 'fake_driver_type')
ext = self._get_extension()
ext.initialize.assert_called_once_with(connection, 'fake_driver_type')

View File

@ -18,7 +18,7 @@ import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.agent.l2.extensions import manager as l2_ext_manager
from neutron.agent.l2 import l2_agent_extensions_manager as l2_ext_manager
from neutron.agent import rpc as agent_rpc
from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa