diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index c76bf2f20ff..f84eedf3c59 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -35,6 +35,7 @@ from neutron.common import utils from neutron import objects LOG = logging.getLogger(__name__) +BINDING_DEACTIVATE = 'binding_deactivate' def create_consumers(endpoints, prefix, topic_details, start_listening=True): @@ -201,14 +202,23 @@ class CacheBackedPluginApi(PluginApi): the payloads the handlers are expecting (an ID). """ rtype = rtype.lower() # all legacy handlers don't camelcase - method, host = self._get_method_host(rtype, event, **kwargs) + method, host_with_activation, host_with_deactivation = ( + self._get_method_host(rtype, event, **kwargs)) if not hasattr(self._legacy_interface, method): # TODO(kevinbenton): once these notifications are stable, emit # a deprecation warning for legacy handlers return - payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id, - 'host': host} - getattr(self._legacy_interface, method)(context, **payload) + # If there is a binding deactivation, we must also notify the + # corresponding activation + if method == BINDING_DEACTIVATE: + self._legacy_interface.binding_deactivate( + context, port_id=resource_id, host=host_with_deactivation) + self._legacy_interface.binding_activate( + context, port_id=resource_id, host=host_with_activation) + else: + payload = {rtype: {'id': resource_id}, + '%s_id' % rtype: resource_id} + getattr(self._legacy_interface, method)(context, **payload) def _get_method_host(self, rtype, event, **kwargs): """Constructs the name of method to be called in the legacy interface. @@ -222,9 +232,10 @@ class CacheBackedPluginApi(PluginApi): is_delete = event == callback_events.AFTER_DELETE suffix = 'delete' if is_delete else 'update' method = "%s_%s" % (rtype, suffix) - host = None + host_with_activation = None + host_with_deactivation = None if is_delete or rtype != callback_resources.PORT: - return method, host + return method, host_with_activation, host_with_deactivation # A port update was received. Find out if it is a binding activation # where a previous binding was deactivated @@ -245,9 +256,10 @@ class CacheBackedPluginApi(PluginApi): getattr(kwargs['updated'], 'bindings', []), constants.INACTIVE, host=existing_active_binding.host)): - method = 'binding_deactivate' - host = existing_active_binding.host - return method, host + method = BINDING_DEACTIVATE + host_with_activation = updated_active_binding.host + host_with_deactivation = existing_active_binding.host + return method, host_with_activation, host_with_deactivation def get_devices_details_list_and_failed_devices(self, context, devices, agent_id, host=None): @@ -274,15 +286,22 @@ class CacheBackedPluginApi(PluginApi): if not segment: LOG.debug("Device %s is not bound to any segment.", port_obj) return {'device': device} + binding = utils.get_port_binding_by_status_and_host( + port_obj.bindings, constants.ACTIVE, raise_if_not_found=True, + port_id=port_obj.id) + if (port_obj.device_owner.startswith( + constants.DEVICE_OWNER_COMPUTE_PREFIX) and + binding[pb_ext.HOST] != host): + LOG.debug("Device %s has no active binding in this host", + port_obj) + return {'device': device, + n_const.NO_ACTIVE_BINDING: True} net = self.remote_resource_cache.get_resource_by_id( resources.NETWORK, port_obj.network_id) net_qos_policy_id = net.qos_policy_id # match format of old RPC interface mac_addr = str(netaddr.EUI(str(port_obj.mac_address), dialect=netaddr.mac_unix_expanded)) - binding = utils.get_port_binding_by_status_and_host( - port_obj.bindings, constants.ACTIVE, raise_if_not_found=True, - port_id=port_obj.id) entry = { 'device': device, 'network_id': port_obj.network_id, diff --git a/neutron/common/constants.py b/neutron/common/constants.py index cbce6e86ee8..1fb25a4c5df 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -227,3 +227,6 @@ VALUES_TYPE_RANGE = "range" # Units base SI_BASE = 1000 IEC_BASE = 1024 + +# Port bindings handling +NO_ACTIVE_BINDING = 'no_active_binding' diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index aa306a0d1ba..560238d89ad 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -55,6 +55,7 @@ from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc from neutron.common import config +from neutron.common import constants as c_const from neutron.common import utils as n_utils from neutron.conf.agent import xenapi_conf from neutron.plugins.ml2.drivers.agent import capabilities @@ -124,7 +125,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, # 1.2 Support DVR (Distributed Virtual Router) RPC # 1.3 Added param devices_to_update to security_groups_provider_updated # 1.4 Added support for network_update - # 1.5 Added binding_deactivate + # 1.5 Added binding_activate and binding_deactivate target = oslo_messaging.Target(version='1.5') def __init__(self, bridge_classes, ext_manager, conf=None): @@ -177,6 +178,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, self.deleted_ports = set() # Stores the port IDs whose binding has been deactivated self.deactivated_bindings = set() + # Stores the port IDs whose binding has been activated + self.activated_bindings = set() self.network_ports = collections.defaultdict(set) # keeps association between ports and ofports to detect ofport change @@ -423,6 +426,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, port_id = kwargs.get('port_id') self.deactivated_bindings.add(port_id) + def binding_activate(self, context, **kwargs): + if kwargs.get('host') != self.conf.host: + return + port_id = kwargs.get('port_id') + self.activated_bindings.add(port_id) + def _clean_network_ports(self, port_id): for port_set in self.network_ports.values(): if port_id in port_set: @@ -467,6 +476,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, LOG.debug(("Port id %s unplugged from integration bridge because " "its binding was de-activated"), port_id) + def process_activated_bindings(self, port_info, activated_bindings_copy): + # Compute which ports for activated bindings are still present... + activated_bindings_copy &= port_info['current'] + # ...and treat them as just added + port_info['added'] |= activated_bindings_copy + def tunnel_update(self, context, **kwargs): LOG.debug("tunnel_update received") if not self.enable_tunneling: @@ -1534,6 +1549,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, def treat_devices_added_or_updated(self, devices, provisioning_needed): skipped_devices = [] need_binding_devices = [] + binding_no_activated_devices = set() devices_details_list = ( self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, @@ -1576,13 +1592,21 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, details['network_id']) self.ext_manager.handle_port(self.context, details) else: - LOG.warning( - "Device %s not defined on plugin or binding failed", - device) + if c_const.NO_ACTIVE_BINDING in details: + # Port was added to the bridge, but its binding in this + # agent hasn't been activated yet. It will be treated as + # added when binding is activated + binding_no_activated_devices.add(device) + LOG.debug("Device %s has no active binding in host", + device) + else: + LOG.warning( + "Device %s not defined on plugin or binding failed", + device) if (port and port.ofport != -1): self.port_dead(port) - return (skipped_devices, need_binding_devices, - failed_devices) + return (skipped_devices, binding_no_activated_devices, + need_binding_devices, failed_devices) def _update_port_network(self, port_id, network_id): self._clean_network_ports(port_id) @@ -1673,19 +1697,23 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, port_info.get('updated', set())) need_binding_devices = [] skipped_devices = set() + binding_no_activated_devices = set() if devices_added_updated: start = time.time() - (skipped_devices, need_binding_devices, - failed_devices['added']) = ( + (skipped_devices, binding_no_activated_devices, + need_binding_devices, failed_devices['added']) = ( self.treat_devices_added_or_updated( devices_added_updated, provisioning_needed)) LOG.debug("process_network_ports - iteration:%(iter_num)d - " "treat_devices_added_or_updated completed. " - "Skipped %(num_skipped)d devices of " - "%(num_current)d devices currently available. " + "Skipped %(num_skipped)d and no activated binding " + "devices %(num_no_active_binding)d of %(num_current)d " + "devices currently available. " "Time elapsed: %(elapsed).3f", {'iter_num': self.iter_num, 'num_skipped': len(skipped_devices), + 'num_no_active_binding': + len(binding_no_activated_devices), 'num_current': len(port_info['current']), 'elapsed': time.time() - start}) # Update the list of current ports storing only those which @@ -1695,7 +1723,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, # TODO(salv-orlando): Optimize avoiding applying filters # unnecessarily, (eg: when there are no IP address changes) - added_ports = port_info.get('added', set()) - skipped_devices + added_ports = (port_info.get('added', set()) - skipped_devices - + binding_no_activated_devices) self._add_port_tag_info(need_binding_devices) self.sg_agent.setup_port_filters(added_ports, port_info.get('updated', set())) @@ -1810,6 +1839,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, self.updated_ports or self.deleted_ports or self.deactivated_bindings or + self.activated_bindings or self.sg_agent.firewall_refresh_needed()) def _port_info_has_changes(self, port_info): @@ -2031,6 +2061,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, sync = False ports = set() updated_ports_copy = set() + activated_bindings_copy = set() ancillary_ports = set() tunnel_sync = True ovs_restarted = False @@ -2091,6 +2122,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, # between these two statements, this will be thread-safe updated_ports_copy = self.updated_ports self.updated_ports = set() + activated_bindings_copy = self.activated_bindings + self.activated_bindings = set() (port_info, ancillary_port_info, consecutive_resyncs, ports_not_ready_yet) = (self.process_port_info( start, polling_manager, sync, ovs_restarted, @@ -2100,6 +2133,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, sync = False self.process_deleted_ports(port_info) self.process_deactivated_bindings(port_info) + self.process_activated_bindings(port_info, + activated_bindings_copy) ofport_changed_ports = self.update_stale_ofport_rules() if ofport_changed_ports: port_info.setdefault('updated', set()).update( @@ -2154,6 +2189,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, LOG.exception("Error while processing VIF ports") # Put the ports back in self.updated_port self.updated_ports |= updated_ports_copy + self.activated_bindings |= activated_bindings_copy sync = True port_stats = self.get_port_stats(port_info, ancillary_port_info) self.loop_count_and_wait(start, port_stats) diff --git a/neutron/tests/unit/agent/test_rpc.py b/neutron/tests/unit/agent/test_rpc.py index f687f935b25..4dcd31c8848 100644 --- a/neutron/tests/unit/agent/test_rpc.py +++ b/neutron/tests/unit/agent/test_rpc.py @@ -16,6 +16,7 @@ import datetime import mock +import netaddr from neutron_lib.agent import topics as lib_topics from neutron_lib.callbacks import events from neutron_lib.callbacks import resources @@ -24,6 +25,8 @@ from oslo_context import context as oslo_context from oslo_utils import uuidutils from neutron.agent import rpc +from neutron.common import constants as n_const +from neutron.objects import network from neutron.objects import ports from neutron.tests import base @@ -176,12 +179,28 @@ class TestCacheBackedPluginApi(base.BaseTestCase): super(TestCacheBackedPluginApi, self).setUp() self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN) self._api._legacy_interface = mock.Mock() + self._api.remote_resource_cache = mock.Mock() + self._network_id = uuidutils.generate_uuid() + self._segment_id = uuidutils.generate_uuid() + self._segment = network.NetworkSegment( + id=self._segment_id, network_id=self._network_id, + network_type=constants.TYPE_FLAT) self._port_id = uuidutils.generate_uuid() + self._network = network.Network(id=self._network_id, + segments=[self._segment]) self._port = ports.Port( - id=self._port_id, + id=self._port_id, network_id=self._network_id, + mac_address=netaddr.EUI('fa:16:3e:ec:c7:d9'), admin_state_up=True, + security_group_ids=set([uuidutils.generate_uuid()]), + fixed_ips=[], allowed_address_pairs=[], + device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX, bindings=[ports.PortBinding(port_id=self._port_id, host='host1', - status=constants.ACTIVE)]) + status=constants.ACTIVE, + profile={})], + binding_levels=[ports.PortBindingLevel(port_id=self._port_id, + host='host1', + segment=self._segment)]) def test__legacy_notifier_resource_delete(self): self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self, @@ -189,8 +208,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase): existing=self._port) self._api._legacy_interface.port_update.assert_not_called() self._api._legacy_interface.port_delete.assert_called_once_with( - mock.ANY, port={'id': self._port_id}, port_id=self._port_id, - host=None) + mock.ANY, port={'id': self._port_id}, port_id=self._port_id) self._api._legacy_interface.binding_deactivate.assert_not_called() def test__legacy_notifier_resource_update(self): @@ -201,8 +219,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase): existing=self._port, updated=updated_port) self._api._legacy_interface.port_delete.assert_not_called() self._api._legacy_interface.port_update.assert_called_once_with( - mock.ANY, port={'id': self._port_id}, port_id=self._port_id, - host=None) + mock.ANY, port={'id': self._port_id}, port_id=self._port_id) self._api._legacy_interface.binding_deactivate.assert_not_called() def _test__legacy_notifier_binding_activated(self): @@ -225,8 +242,9 @@ class TestCacheBackedPluginApi(base.BaseTestCase): def test__legacy_notifier_new_binding_activated(self): self._test__legacy_notifier_binding_activated() self._api._legacy_interface.binding_deactivate.assert_called_once_with( - mock.ANY, port={'id': self._port_id}, port_id=self._port_id, - host='host1') + mock.ANY, host='host1', port_id=self._port_id) + self._api._legacy_interface.binding_activate.assert_called_once_with( + mock.ANY, host='host2', port_id=self._port_id) def test__legacy_notifier_no_new_binding_activated(self): updated_port = ports.Port( @@ -240,8 +258,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase): resource_id=self._port_id, existing=self._port, updated=updated_port) self._api._legacy_interface.port_update.assert_called_once_with( - mock.ANY, port={'id': self._port_id}, port_id=self._port_id, - host=None) + mock.ANY, port={'id': self._port_id}, port_id=self._port_id) self._api._legacy_interface.port_delete.assert_not_called() self._api._legacy_interface.binding_deactivate.assert_not_called() @@ -257,7 +274,7 @@ class TestCacheBackedPluginApi(base.BaseTestCase): resource_id=self._port_id, existing=self._port, updated=None) call = mock.call(mock.ANY, port={'id': self._port_id}, - port_id=self._port_id, host=None) + port_id=self._port_id) self._api._legacy_interface.port_update.assert_has_calls([call, call]) self._api._legacy_interface.port_delete.assert_not_called() self._api._legacy_interface.binding_deactivate.assert_not_called() @@ -265,3 +282,23 @@ class TestCacheBackedPluginApi(base.BaseTestCase): def test__legacy_notifier_binding_activated_not_supported(self): del self._api._legacy_interface.binding_deactivate self._test__legacy_notifier_binding_activated() + + def test_get_device_details_binding_in_host(self): + self._api.remote_resource_cache.get_resource_by_id.side_effect = [ + self._port, self._network] + entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY, + 'host1') + self.assertEqual(self._port_id, entry['device']) + self.assertEqual(self._port_id, entry['port_id']) + self.assertEqual(self._network_id, entry['network_id']) + self.assertNotIn(n_const.NO_ACTIVE_BINDING, entry) + + def test_get_device_details_binding_not_in_host(self): + self._api.remote_resource_cache.get_resource_by_id.side_effect = [ + self._port, self._network] + entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY, + 'host2') + self.assertEqual(self._port_id, entry['device']) + self.assertNotIn('port_id', entry) + self.assertNotIn('network_id', entry) + self.assertIn(n_const.NO_ACTIVE_BINDING, entry) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 2be8c15cca4..1ed0083eba8 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -28,6 +28,7 @@ from neutron.agent.common import ovs_lib from neutron.agent.common import utils from neutron.agent.linux import async_process from neutron.agent.linux import ip_lib +from neutron.common import constants as c_const from neutron.common import rpc as n_rpc from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants @@ -797,12 +798,30 @@ class TestOvsNeutronAgent(object): 'get_port_tag_dict', return_value={}),\ mock.patch.object(self.agent, func_name) as func: - skip_devs, need_bound_devices, _ = ( + skip_devs, _, need_bound_devices, _ = ( self.agent.treat_devices_added_or_updated([], False)) # The function should not raise self.assertFalse(skip_devs) return func.called + def test_treat_devices_added_updated_no_active_binding(self): + details = {'device': 'id', + c_const.NO_ACTIVE_BINDING: True} + port = mock.Mock() + with mock.patch.object(self.agent.plugin_rpc, + 'get_devices_details_list_and_failed_devices', + return_value={'devices': [details], + 'failed_devices': []}),\ + mock.patch.object(self.agent.int_br, + 'get_vifs_by_ids', + return_value={details['device']: port}),\ + mock.patch.object(self.agent, 'port_dead') as func: + skip_devs, binding_no_activated_devices, _, _ = ( + self.agent.treat_devices_added_or_updated([], False)) + self.assertFalse(skip_devs) + self.assertTrue(func.called) + self.assertIn('id', binding_no_activated_devices) + def test_treat_devices_added_updated_ignores_invalid_ofport(self): port = mock.Mock() port.ofport = -1 @@ -873,7 +892,8 @@ class TestOvsNeutronAgent(object): skip_devs = self.agent.treat_devices_added_or_updated([], False) # The function should return False for resync and no device # processed - self.assertEqual((['the_skipped_one'], [], set()), skip_devs) + self.assertEqual((['the_skipped_one'], set(), [], set()), + skip_devs) ext_mgr_delete_port.assert_called_once_with( self.agent.context, {'port_id': 'the_skipped_one'}) self.assertFalse(treat_vif_port.called) @@ -890,7 +910,7 @@ class TestOvsNeutronAgent(object): mock.patch.object(self.agent, 'treat_vif_port') as treat_vif_port: failed_devices = {'added': set(), 'removed': set()} - (_, _, failed_devices['added']) = ( + (_, _, _, failed_devices['added']) = ( self.agent.treat_devices_added_or_updated([], False)) # The function should return False for resync and no device # processed @@ -921,7 +941,7 @@ class TestOvsNeutronAgent(object): return_value={}),\ mock.patch.object(self.agent, 'treat_vif_port') as treat_vif_port: - skip_devs, need_bound_devices, _ = ( + skip_devs, _, need_bound_devices, _ = ( self.agent.treat_devices_added_or_updated([], False)) # The function should return False for resync self.assertFalse(skip_devs) @@ -1008,16 +1028,18 @@ class TestOvsNeutronAgent(object): self.agent._bind_devices([{'network_id': 'non-existent', 'vif_port': vif_port}]) - def _test_process_network_ports(self, port_info, skipped_devices=None): + def _test_process_network_ports(self, port_info, skipped_devices=None, + binding_no_activated_devices=None): failed_devices = {'added': set(), 'removed': set()} skipped_devices = skipped_devices or [] + binding_no_activated_devices = binding_no_activated_devices or set() added_devices = port_info.get('added', set()) with mock.patch.object(self.agent.sg_agent, "setup_port_filters") as setup_port_filters,\ mock.patch.object( self.agent, "treat_devices_added_or_updated", return_value=( - skipped_devices, [], + skipped_devices, binding_no_activated_devices, [], failed_devices['added'])) as device_added_updated,\ mock.patch.object(self.agent.int_br, "get_ports_attributes", return_value=[]),\ @@ -1034,7 +1056,8 @@ class TestOvsNeutronAgent(object): failed_devices, self.agent.process_network_ports(port_info, False)) setup_port_filters.assert_called_once_with( - added_devices - set(skipped_devices), + (added_devices - set(skipped_devices) - + binding_no_activated_devices), port_info.get('updated', set())) devices_added_updated = (added_devices | port_info.get('updated', set())) @@ -1065,6 +1088,14 @@ class TestOvsNeutronAgent(object): 'added': set(['eth1', 'eth2'])} self._test_process_network_ports(port_info, skipped_devices=['eth1']) + def test_process_network_port_with_binding_no_activated_devices(self): + port_info = {'current': set(['tap0', 'tap1']), + 'removed': set(['eth0']), + 'added': set(['eth1', 'eth2', 'eth3'])} + self._test_process_network_ports( + port_info, skipped_devices=['eth1'], + binding_no_activated_devices=set(['eth3'])) + def test_process_network_port_with_empty_port(self): self._test_process_network_ports({}) @@ -1247,6 +1278,32 @@ class TestOvsNeutronAgent(object): int_br.delete_port.assert_not_called() self.assertEqual(set(), self.agent.deactivated_bindings) + def test_binding_activate(self): + self.agent.binding_activate('context', port_id='id', host='host') + self.assertIn('id', self.agent.activated_bindings) + + def test_binding_activate_not_for_host(self): + self.agent.binding_activate('context', port_id='id', host='other-host') + self.assertEqual(set(), self.agent.activated_bindings) + + def test_process_activated_bindings(self): + port_info = {} + port_info['added'] = set(['added_port_id']) + port_info['current'] = set(['activated_port_id']) + self.agent.process_activated_bindings(port_info, + set(['activated_port_id'])) + self.assertIn('added_port_id', port_info['added']) + self.assertIn('activated_port_id', port_info['added']) + + def test_process_activated_bindings_activated_port_not_present(self): + port_info = {} + port_info['added'] = set(['added_port_id']) + port_info['current'] = set() + self.agent.process_activated_bindings(port_info, + set(['activated_port_id'])) + self.assertIn('added_port_id', port_info['added']) + self.assertNotIn('activated_port_id', port_info['added']) + def _test_setup_physical_bridges(self, port_exists=False): with mock.patch.object(ip_lib.IPDevice, "exists") as devex_fn,\ mock.patch.object(sys, "exit"),\