Merge "Add binding activation to OVS agent"

This commit is contained in:
Zuul 2018-07-24 05:34:02 +00:00 committed by Gerrit Code Review
commit f51aa27025
5 changed files with 193 additions and 41 deletions

View File

@ -35,6 +35,7 @@ from neutron.common import utils
from neutron import objects
LOG = logging.getLogger(__name__)
BINDING_DEACTIVATE = 'binding_deactivate'
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
@ -201,14 +202,23 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
method, host = self._get_method_host(rtype, event, **kwargs)
method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs))
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id,
'host': host}
getattr(self._legacy_interface, method)(context, **payload)
# If there is a binding deactivation, we must also notify the
# corresponding activation
if method == BINDING_DEACTIVATE:
self._legacy_interface.binding_deactivate(
context, port_id=resource_id, host=host_with_deactivation)
self._legacy_interface.binding_activate(
context, port_id=resource_id, host=host_with_activation)
else:
payload = {rtype: {'id': resource_id},
'%s_id' % rtype: resource_id}
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
"""Constructs the name of method to be called in the legacy interface.
@ -222,9 +232,10 @@ class CacheBackedPluginApi(PluginApi):
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
host = None
host_with_activation = None
host_with_deactivation = None
if is_delete or rtype != callback_resources.PORT:
return method, host
return method, host_with_activation, host_with_deactivation
# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
@ -245,9 +256,10 @@ class CacheBackedPluginApi(PluginApi):
getattr(kwargs['updated'], 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = 'binding_deactivate'
host = existing_active_binding.host
return method, host
method = BINDING_DEACTIVATE
host_with_activation = updated_active_binding.host
host_with_deactivation = existing_active_binding.host
return method, host_with_activation, host_with_deactivation
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
@ -274,15 +286,22 @@ class CacheBackedPluginApi(PluginApi):
if not segment:
LOG.debug("Device %s is not bound to any segment.", port_obj)
return {'device': device}
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
if (port_obj.device_owner.startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX) and
binding[pb_ext.HOST] != host):
LOG.debug("Device %s has no active binding in this host",
port_obj)
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
entry = {
'device': device,
'network_id': port_obj.network_id,

View File

@ -227,3 +227,6 @@ VALUES_TYPE_RANGE = "range"
# Units base
SI_BASE = 1000
IEC_BASE = 1024
# Port bindings handling
NO_ACTIVE_BINDING = 'no_active_binding'

View File

@ -55,6 +55,7 @@ from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import constants as c_const
from neutron.common import utils as n_utils
from neutron.conf.agent import xenapi_conf
from neutron.plugins.ml2.drivers.agent import capabilities
@ -124,7 +125,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# 1.2 Support DVR (Distributed Virtual Router) RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
# 1.4 Added support for network_update
# 1.5 Added binding_deactivate
# 1.5 Added binding_activate and binding_deactivate
target = oslo_messaging.Target(version='1.5')
def __init__(self, bridge_classes, ext_manager, conf=None):
@ -177,6 +178,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.deleted_ports = set()
# Stores the port IDs whose binding has been deactivated
self.deactivated_bindings = set()
# Stores the port IDs whose binding has been activated
self.activated_bindings = set()
self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
@ -423,6 +426,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_id = kwargs.get('port_id')
self.deactivated_bindings.add(port_id)
def binding_activate(self, context, **kwargs):
if kwargs.get('host') != self.conf.host:
return
port_id = kwargs.get('port_id')
self.activated_bindings.add(port_id)
def _clean_network_ports(self, port_id):
for port_set in self.network_ports.values():
if port_id in port_set:
@ -467,6 +476,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
LOG.debug(("Port id %s unplugged from integration bridge because "
"its binding was de-activated"), port_id)
def process_activated_bindings(self, port_info, activated_bindings_copy):
# Compute which ports for activated bindings are still present...
activated_bindings_copy &= port_info['current']
# ...and treat them as just added
port_info['added'] |= activated_bindings_copy
def tunnel_update(self, context, **kwargs):
LOG.debug("tunnel_update received")
if not self.enable_tunneling:
@ -1534,6 +1549,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def treat_devices_added_or_updated(self, devices, provisioning_needed):
skipped_devices = []
need_binding_devices = []
binding_no_activated_devices = set()
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
@ -1576,13 +1592,21 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if c_const.NO_ACTIVE_BINDING in details:
# Port was added to the bridge, but its binding in this
# agent hasn't been activated yet. It will be treated as
# added when binding is activated
binding_no_activated_devices.add(device)
LOG.debug("Device %s has no active binding in host",
device)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if (port and port.ofport != -1):
self.port_dead(port)
return (skipped_devices, need_binding_devices,
failed_devices)
return (skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices)
def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
@ -1673,19 +1697,23 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_info.get('updated', set()))
need_binding_devices = []
skipped_devices = set()
binding_no_activated_devices = set()
if devices_added_updated:
start = time.time()
(skipped_devices, need_binding_devices,
failed_devices['added']) = (
(skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, provisioning_needed))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Skipped %(num_skipped)d and no activated binding "
"devices %(num_no_active_binding)d of %(num_current)d "
"devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_no_active_binding':
len(binding_no_activated_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
@ -1695,7 +1723,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# TODO(salv-orlando): Optimize avoiding applying filters
# unnecessarily, (eg: when there are no IP address changes)
added_ports = port_info.get('added', set()) - skipped_devices
added_ports = (port_info.get('added', set()) - skipped_devices -
binding_no_activated_devices)
self._add_port_tag_info(need_binding_devices)
self.sg_agent.setup_port_filters(added_ports,
port_info.get('updated', set()))
@ -1810,6 +1839,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.updated_ports or
self.deleted_ports or
self.deactivated_bindings or
self.activated_bindings or
self.sg_agent.firewall_refresh_needed())
def _port_info_has_changes(self, port_info):
@ -2031,6 +2061,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
sync = False
ports = set()
updated_ports_copy = set()
activated_bindings_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
@ -2091,6 +2122,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
activated_bindings_copy = self.activated_bindings
self.activated_bindings = set()
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
@ -2100,6 +2133,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
sync = False
self.process_deleted_ports(port_info)
self.process_deactivated_bindings(port_info)
self.process_activated_bindings(port_info,
activated_bindings_copy)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
port_info.setdefault('updated', set()).update(
@ -2154,6 +2189,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
LOG.exception("Error while processing VIF ports")
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
self.activated_bindings |= activated_bindings_copy
sync = True
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)

View File

@ -16,6 +16,7 @@
import datetime
import mock
import netaddr
from neutron_lib.agent import topics as lib_topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
@ -24,6 +25,8 @@ from oslo_context import context as oslo_context
from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.common import constants as n_const
from neutron.objects import network
from neutron.objects import ports
from neutron.tests import base
@ -176,12 +179,28 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
super(TestCacheBackedPluginApi, self).setUp()
self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN)
self._api._legacy_interface = mock.Mock()
self._api.remote_resource_cache = mock.Mock()
self._network_id = uuidutils.generate_uuid()
self._segment_id = uuidutils.generate_uuid()
self._segment = network.NetworkSegment(
id=self._segment_id, network_id=self._network_id,
network_type=constants.TYPE_FLAT)
self._port_id = uuidutils.generate_uuid()
self._network = network.Network(id=self._network_id,
segments=[self._segment])
self._port = ports.Port(
id=self._port_id,
id=self._port_id, network_id=self._network_id,
mac_address=netaddr.EUI('fa:16:3e:ec:c7:d9'), admin_state_up=True,
security_group_ids=set([uuidutils.generate_uuid()]),
fixed_ips=[], allowed_address_pairs=[],
device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
bindings=[ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.ACTIVE)])
status=constants.ACTIVE,
profile={})],
binding_levels=[ports.PortBindingLevel(port_id=self._port_id,
host='host1',
segment=self._segment)])
def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
@ -189,8 +208,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
existing=self._port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_resource_update(self):
@ -201,8 +219,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def _test__legacy_notifier_binding_activated(self):
@ -225,8 +242,9 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_new_binding_activated(self):
self._test__legacy_notifier_binding_activated()
self._api._legacy_interface.binding_deactivate.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host='host1')
mock.ANY, host='host1', port_id=self._port_id)
self._api._legacy_interface.binding_activate.assert_called_once_with(
mock.ANY, host='host2', port_id=self._port_id)
def test__legacy_notifier_no_new_binding_activated(self):
updated_port = ports.Port(
@ -240,8 +258,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
@ -257,7 +274,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
resource_id=self._port_id,
existing=self._port, updated=None)
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id, host=None)
port_id=self._port_id)
self._api._legacy_interface.port_update.assert_has_calls([call, call])
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
@ -265,3 +282,23 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_binding_activated_not_supported(self):
del self._api._legacy_interface.binding_deactivate
self._test__legacy_notifier_binding_activated()
def test_get_device_details_binding_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host1')
self.assertEqual(self._port_id, entry['device'])
self.assertEqual(self._port_id, entry['port_id'])
self.assertEqual(self._network_id, entry['network_id'])
self.assertNotIn(n_const.NO_ACTIVE_BINDING, entry)
def test_get_device_details_binding_not_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host2')
self.assertEqual(self._port_id, entry['device'])
self.assertNotIn('port_id', entry)
self.assertNotIn('network_id', entry)
self.assertIn(n_const.NO_ACTIVE_BINDING, entry)

View File

@ -28,6 +28,7 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.common import constants as c_const
from neutron.common import rpc as n_rpc
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
@ -797,12 +798,30 @@ class TestOvsNeutronAgent(object):
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices, _ = (
skip_devs, _, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should not raise
self.assertFalse(skip_devs)
return func.called
def test_treat_devices_added_updated_no_active_binding(self):
details = {'device': 'id',
c_const.NO_ACTIVE_BINDING: True}
port = mock.Mock()
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': []}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
mock.patch.object(self.agent, 'port_dead') as func:
skip_devs, binding_no_activated_devices, _, _ = (
self.agent.treat_devices_added_or_updated([], False))
self.assertFalse(skip_devs)
self.assertTrue(func.called)
self.assertIn('id', binding_no_activated_devices)
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
port = mock.Mock()
port.ofport = -1
@ -873,7 +892,8 @@ class TestOvsNeutronAgent(object):
skip_devs = self.agent.treat_devices_added_or_updated([], False)
# The function should return False for resync and no device
# processed
self.assertEqual((['the_skipped_one'], [], set()), skip_devs)
self.assertEqual((['the_skipped_one'], set(), [], set()),
skip_devs)
ext_mgr_delete_port.assert_called_once_with(
self.agent.context, {'port_id': 'the_skipped_one'})
self.assertFalse(treat_vif_port.called)
@ -890,7 +910,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
failed_devices = {'added': set(), 'removed': set()}
(_, _, failed_devices['added']) = (
(_, _, _, failed_devices['added']) = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync and no device
# processed
@ -921,7 +941,7 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices, _ = (
skip_devs, _, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync
self.assertFalse(skip_devs)
@ -1008,16 +1028,18 @@ class TestOvsNeutronAgent(object):
self.agent._bind_devices([{'network_id': 'non-existent',
'vif_port': vif_port}])
def _test_process_network_ports(self, port_info, skipped_devices=None):
def _test_process_network_ports(self, port_info, skipped_devices=None,
binding_no_activated_devices=None):
failed_devices = {'added': set(), 'removed': set()}
skipped_devices = skipped_devices or []
binding_no_activated_devices = binding_no_activated_devices or set()
added_devices = port_info.get('added', set())
with mock.patch.object(self.agent.sg_agent,
"setup_port_filters") as setup_port_filters,\
mock.patch.object(
self.agent, "treat_devices_added_or_updated",
return_value=(
skipped_devices, [],
skipped_devices, binding_no_activated_devices, [],
failed_devices['added'])) as device_added_updated,\
mock.patch.object(self.agent.int_br, "get_ports_attributes",
return_value=[]),\
@ -1034,7 +1056,8 @@ class TestOvsNeutronAgent(object):
failed_devices,
self.agent.process_network_ports(port_info, False))
setup_port_filters.assert_called_once_with(
added_devices - set(skipped_devices),
(added_devices - set(skipped_devices) -
binding_no_activated_devices),
port_info.get('updated', set()))
devices_added_updated = (added_devices |
port_info.get('updated', set()))
@ -1065,6 +1088,14 @@ class TestOvsNeutronAgent(object):
'added': set(['eth1', 'eth2'])}
self._test_process_network_ports(port_info, skipped_devices=['eth1'])
def test_process_network_port_with_binding_no_activated_devices(self):
port_info = {'current': set(['tap0', 'tap1']),
'removed': set(['eth0']),
'added': set(['eth1', 'eth2', 'eth3'])}
self._test_process_network_ports(
port_info, skipped_devices=['eth1'],
binding_no_activated_devices=set(['eth3']))
def test_process_network_port_with_empty_port(self):
self._test_process_network_ports({})
@ -1247,6 +1278,32 @@ class TestOvsNeutronAgent(object):
int_br.delete_port.assert_not_called()
self.assertEqual(set(), self.agent.deactivated_bindings)
def test_binding_activate(self):
self.agent.binding_activate('context', port_id='id', host='host')
self.assertIn('id', self.agent.activated_bindings)
def test_binding_activate_not_for_host(self):
self.agent.binding_activate('context', port_id='id', host='other-host')
self.assertEqual(set(), self.agent.activated_bindings)
def test_process_activated_bindings(self):
port_info = {}
port_info['added'] = set(['added_port_id'])
port_info['current'] = set(['activated_port_id'])
self.agent.process_activated_bindings(port_info,
set(['activated_port_id']))
self.assertIn('added_port_id', port_info['added'])
self.assertIn('activated_port_id', port_info['added'])
def test_process_activated_bindings_activated_port_not_present(self):
port_info = {}
port_info['added'] = set(['added_port_id'])
port_info['current'] = set()
self.agent.process_activated_bindings(port_info,
set(['activated_port_id']))
self.assertIn('added_port_id', port_info['added'])
self.assertNotIn('activated_port_id', port_info['added'])
def _test_setup_physical_bridges(self, port_exists=False):
with mock.patch.object(ip_lib.IPDevice, "exists") as devex_fn,\
mock.patch.object(sys, "exit"),\