Browse Source

Add binding activation to OVS agent

As part of the implementation of multiple port bindings [1], add binding
activation support to the OVS agent. This will enable the execution in
OVS agents of the complete sequence of steps outlined in [1] during an
instance migration:

1) Create inactive port bindings for destination host
2) Migrate the instance to the destination host and plug its VIFs
3) Activate the port bindings in the destination host
4) Delete the port bindings for the source host

[1] https://review.openstack.org/#/c/309416/

Change-Id: Iabca39364ec95633b2a8891fc295b3ada5f4f5e0
Partial-Bug: #1580880
changes/58/574058/15
Miguel Lavalle 4 years ago
parent
commit
5c3bf12496
  1. 43
      neutron/agent/rpc.py
  2. 3
      neutron/common/constants.py
  3. 58
      neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
  4. 59
      neutron/tests/unit/agent/test_rpc.py
  5. 71
      neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py

43
neutron/agent/rpc.py

@ -35,6 +35,7 @@ from neutron.common import utils
from neutron import objects
LOG = logging.getLogger(__name__)
BINDING_DEACTIVATE = 'binding_deactivate'
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
@ -201,14 +202,23 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
method, host = self._get_method_host(rtype, event, **kwargs)
method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs))
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id,
'host': host}
getattr(self._legacy_interface, method)(context, **payload)
# If there is a binding deactivation, we must also notify the
# corresponding activation
if method == BINDING_DEACTIVATE:
self._legacy_interface.binding_deactivate(
context, port_id=resource_id, host=host_with_deactivation)
self._legacy_interface.binding_activate(
context, port_id=resource_id, host=host_with_activation)
else:
payload = {rtype: {'id': resource_id},
'%s_id' % rtype: resource_id}
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
"""Constructs the name of method to be called in the legacy interface.
@ -222,9 +232,10 @@ class CacheBackedPluginApi(PluginApi):
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
host = None
host_with_activation = None
host_with_deactivation = None
if is_delete or rtype != callback_resources.PORT:
return method, host
return method, host_with_activation, host_with_deactivation
# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
@ -245,9 +256,10 @@ class CacheBackedPluginApi(PluginApi):
getattr(kwargs['updated'], 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = 'binding_deactivate'
host = existing_active_binding.host
return method, host
method = BINDING_DEACTIVATE
host_with_activation = updated_active_binding.host
host_with_deactivation = existing_active_binding.host
return method, host_with_activation, host_with_deactivation
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
@ -274,15 +286,22 @@ class CacheBackedPluginApi(PluginApi):
if not segment:
LOG.debug("Device %s is not bound to any segment.", port_obj)
return {'device': device}
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
if (port_obj.device_owner.startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX) and
binding[pb_ext.HOST] != host):
LOG.debug("Device %s has no active binding in this host",
port_obj)
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
entry = {
'device': device,
'network_id': port_obj.network_id,

3
neutron/common/constants.py

@ -227,3 +227,6 @@ VALUES_TYPE_RANGE = "range"
# Units base
SI_BASE = 1000
IEC_BASE = 1024
# Port bindings handling
NO_ACTIVE_BINDING = 'no_active_binding'

58
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py

@ -55,6 +55,7 @@ from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import constants as c_const
from neutron.common import utils as n_utils
from neutron.conf.agent import xenapi_conf
from neutron.plugins.ml2.drivers.agent import capabilities
@ -124,7 +125,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# 1.2 Support DVR (Distributed Virtual Router) RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
# 1.4 Added support for network_update
# 1.5 Added binding_deactivate
# 1.5 Added binding_activate and binding_deactivate
target = oslo_messaging.Target(version='1.5')
def __init__(self, bridge_classes, ext_manager, conf=None):
@ -177,6 +178,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.deleted_ports = set()
# Stores the port IDs whose binding has been deactivated
self.deactivated_bindings = set()
# Stores the port IDs whose binding has been activated
self.activated_bindings = set()
self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
@ -423,6 +426,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_id = kwargs.get('port_id')
self.deactivated_bindings.add(port_id)
def binding_activate(self, context, **kwargs):
if kwargs.get('host') != self.conf.host:
return
port_id = kwargs.get('port_id')
self.activated_bindings.add(port_id)
def _clean_network_ports(self, port_id):
for port_set in self.network_ports.values():
if port_id in port_set:
@ -467,6 +476,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
LOG.debug(("Port id %s unplugged from integration bridge because "
"its binding was de-activated"), port_id)
def process_activated_bindings(self, port_info, activated_bindings_copy):
# Compute which ports for activated bindings are still present...
activated_bindings_copy &= port_info['current']
# ...and treat them as just added
port_info['added'] |= activated_bindings_copy
def tunnel_update(self, context, **kwargs):
LOG.debug("tunnel_update received")
if not self.enable_tunneling:
@ -1534,6 +1549,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def treat_devices_added_or_updated(self, devices, provisioning_needed):
skipped_devices = []
need_binding_devices = []
binding_no_activated_devices = set()
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
@ -1576,13 +1592,21 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if c_const.NO_ACTIVE_BINDING in details:
# Port was added to the bridge, but its binding in this
# agent hasn't been activated yet. It will be treated as
# added when binding is activated
binding_no_activated_devices.add(device)
LOG.debug("Device %s has no active binding in host",
device)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if (port and port.ofport != -1):
self.port_dead(port)
return (skipped_devices, need_binding_devices,
failed_devices)
return (skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices)
def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
@ -1673,19 +1697,23 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_info.get('updated', set()))
need_binding_devices = []
skipped_devices = set()
binding_no_activated_devices = set()
if devices_added_updated:
start = time.time()
(skipped_devices, need_binding_devices,
failed_devices['added']) = (
(skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, provisioning_needed))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Skipped %(num_skipped)d and no activated binding "
"devices %(num_no_active_binding)d of %(num_current)d "
"devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_no_active_binding':
len(binding_no_activated_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
@ -1695,7 +1723,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# TODO(salv-orlando): Optimize avoiding applying filters
# unnecessarily, (eg: when there are no IP address changes)
added_ports = port_info.get('added', set()) - skipped_devices
added_ports = (port_info.get('added', set()) - skipped_devices -
binding_no_activated_devices)
self._add_port_tag_info(need_binding_devices)
self.sg_agent.setup_port_filters(added_ports,
port_info.get('updated', set()))
@ -1810,6 +1839,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.updated_ports or
self.deleted_ports or
self.deactivated_bindings or
self.activated_bindings or
self.sg_agent.firewall_refresh_needed())
def _port_info_has_changes(self, port_info):
@ -2031,6 +2061,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
sync = False
ports = set()
updated_ports_copy = set()
activated_bindings_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
@ -2091,6 +2122,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
activated_bindings_copy = self.activated_bindings
self.activated_bindings = set()
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
@ -2100,6 +2133,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
sync = False
self.process_deleted_ports(port_info)
self.process_deactivated_bindings(port_info)
self.process_activated_bindings(port_info,
activated_bindings_copy)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
port_info.setdefault('updated', set()).update(
@ -2154,6 +2189,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
LOG.exception("Error while processing VIF ports")
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
self.activated_bindings |= activated_bindings_copy
sync = True
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)

59
neutron/tests/unit/agent/test_rpc.py

@ -16,6 +16,7 @@
import datetime
import mock
import netaddr
from neutron_lib.agent import topics as lib_topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
@ -24,6 +25,8 @@ from oslo_context import context as oslo_context
from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.common import constants as n_const
from neutron.objects import network
from neutron.objects import ports
from neutron.tests import base
@ -176,12 +179,28 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
super(TestCacheBackedPluginApi, self).setUp()
self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN)
self._api._legacy_interface = mock.Mock()
self._api.remote_resource_cache = mock.Mock()
self._network_id = uuidutils.generate_uuid()
self._segment_id = uuidutils.generate_uuid()
self._segment = network.NetworkSegment(
id=self._segment_id, network_id=self._network_id,
network_type=constants.TYPE_FLAT)
self._port_id = uuidutils.generate_uuid()
self._network = network.Network(id=self._network_id,
segments=[self._segment])
self._port = ports.Port(
id=self._port_id,
id=self._port_id, network_id=self._network_id,
mac_address=netaddr.EUI('fa:16:3e:ec:c7:d9'), admin_state_up=True,
security_group_ids=set([uuidutils.generate_uuid()]),
fixed_ips=[], allowed_address_pairs=[],
device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
bindings=[ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.ACTIVE)])
status=constants.ACTIVE,
profile={})],
binding_levels=[ports.PortBindingLevel(port_id=self._port_id,
host='host1',
segment=self._segment)])
def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
@ -189,8 +208,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
existing=self._port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_resource_update(self):
@ -201,8 +219,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def _test__legacy_notifier_binding_activated(self):
@ -225,8 +242,9 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_new_binding_activated(self):
self._test__legacy_notifier_binding_activated()
self._api._legacy_interface.binding_deactivate.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host='host1')
mock.ANY, host='host1', port_id=self._port_id)
self._api._legacy_interface.binding_activate.assert_called_once_with(
mock.ANY, host='host2', port_id=self._port_id)
def test__legacy_notifier_no_new_binding_activated(self):
updated_port = ports.Port(
@ -240,8 +258,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
@ -257,7 +274,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
resource_id=self._port_id,
existing=self._port, updated=None)
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id, host=None)
port_id=self._port_id)
self._api._legacy_interface.port_update.assert_has_calls([call, call])
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
@ -265,3 +282,23 @@ class TestCacheBackedPluginApi(base.BaseTestCase):
def test__legacy_notifier_binding_activated_not_supported(self):
del self._api._legacy_interface.binding_deactivate
self._test__legacy_notifier_binding_activated()
def test_get_device_details_binding_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host1')
self.assertEqual(self._port_id, entry['device'])
self.assertEqual(self._port_id, entry['port_id'])
self.assertEqual(self._network_id, entry['network_id'])
self.assertNotIn(n_const.NO_ACTIVE_BINDING, entry)
def test_get_device_details_binding_not_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host2')
self.assertEqual(self._port_id, entry['device'])
self.assertNotIn('port_id', entry)
self.assertNotIn('network_id', entry)
self.assertIn(n_const.NO_ACTIVE_BINDING, entry)

71
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py

@ -28,6 +28,7 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.common import constants as c_const
from neutron.common import rpc as n_rpc
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
@ -797,12 +798,30 @@ class TestOvsNeutronAgent(object):
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices, _ = (
skip_devs, _, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should not raise
self.assertFalse(skip_devs)
return func.called
def test_treat_devices_added_updated_no_active_binding(self):
details = {'device': 'id',
c_const.NO_ACTIVE_BINDING: True}
port = mock.Mock()
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': []}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
mock.patch.object(self.agent, 'port_dead') as func:
skip_devs, binding_no_activated_devices, _, _ = (
self.agent.treat_devices_added_or_updated([], False))
self.assertFalse(skip_devs)
self.assertTrue(func.called)
self.assertIn('id', binding_no_activated_devices)
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
port = mock.Mock()
port.ofport = -1
@ -873,7 +892,8 @@ class TestOvsNeutronAgent(object):
skip_devs = self.agent.treat_devices_added_or_updated([], False)
# The function should return False for resync and no device
# processed
self.assertEqual((['the_skipped_one'], [], set()), skip_devs)
self.assertEqual((['the_skipped_one'], set(), [], set()),
skip_devs)
ext_mgr_delete_port.assert_called_once_with(
self.agent.context, {'port_id': 'the_skipped_one'})
self.assertFalse(treat_vif_port.called)
@ -890,7 +910,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
failed_devices = {'added': set(), 'removed': set()}
(_, _, failed_devices['added']) = (
(_, _, _, failed_devices['added']) = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync and no device
# processed
@ -921,7 +941,7 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices, _ = (
skip_devs, _, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync
self.assertFalse(skip_devs)
@ -1008,16 +1028,18 @@ class TestOvsNeutronAgent(object):
self.agent._bind_devices([{'network_id': 'non-existent',
'vif_port': vif_port}])
def _test_process_network_ports(self, port_info, skipped_devices=None):
def _test_process_network_ports(self, port_info, skipped_devices=None,
binding_no_activated_devices=None):
failed_devices = {'added': set(), 'removed': set()}
skipped_devices = skipped_devices or []
binding_no_activated_devices = binding_no_activated_devices or set()
added_devices = port_info.get('added', set())
with mock.patch.object(self.agent.sg_agent,
"setup_port_filters") as setup_port_filters,\
mock.patch.object(
self.agent, "treat_devices_added_or_updated",
return_value=(
skipped_devices, [],
skipped_devices, binding_no_activated_devices, [],
failed_devices['added'])) as device_added_updated,\
mock.patch.object(self.agent.int_br, "get_ports_attributes",
return_value=[]),\
@ -1034,7 +1056,8 @@ class TestOvsNeutronAgent(object):
failed_devices,
self.agent.process_network_ports(port_info, False))
setup_port_filters.assert_called_once_with(
added_devices - set(skipped_devices),
(added_devices - set(skipped_devices) -
binding_no_activated_devices),
port_info.get('updated', set()))
devices_added_updated = (added_devices |
port_info.get('updated', set()))
@ -1065,6 +1088,14 @@ class TestOvsNeutronAgent(object):
'added': set(['eth1', 'eth2'])}
self._test_process_network_ports(port_info, skipped_devices=['eth1'])
def test_process_network_port_with_binding_no_activated_devices(self):
port_info = {'current': set(['tap0', 'tap1']),
'removed': set(['eth0']),
'added': set(['eth1', 'eth2', 'eth3'])}
self._test_process_network_ports(
port_info, skipped_devices=['eth1'],
binding_no_activated_devices=set(['eth3']))
def test_process_network_port_with_empty_port(self):
self._test_process_network_ports({})
@ -1247,6 +1278,32 @@ class TestOvsNeutronAgent(object):
int_br.delete_port.assert_not_called()
self.assertEqual(set(), self.agent.deactivated_bindings)
def test_binding_activate(self):
self.agent.binding_activate('context', port_id='id', host='host')
self.assertIn('id', self.agent.activated_bindings)
def test_binding_activate_not_for_host(self):
self.agent.binding_activate('context', port_id='id', host='other-host')
self.assertEqual(set(), self.agent.activated_bindings)
def test_process_activated_bindings(self):
port_info = {}
port_info['added'] = set(['added_port_id'])
port_info['current'] = set(['activated_port_id'])
self.agent.process_activated_bindings(port_info,
set(['activated_port_id']))
self.assertIn('added_port_id', port_info['added'])
self.assertIn('activated_port_id', port_info['added'])
def test_process_activated_bindings_activated_port_not_present(self):
port_info = {}
port_info['added'] = set(['added_port_id'])
port_info['current'] = set()
self.agent.process_activated_bindings(port_info,
set(['activated_port_id']))
self.assertIn('added_port_id', port_info['added'])
self.assertNotIn('activated_port_id', port_info['added'])
def _test_setup_physical_bridges(self, port_exists=False):
with mock.patch.object(ip_lib.IPDevice, "exists") as devex_fn,\
mock.patch.object(sys, "exit"),\

Loading…
Cancel
Save