diff --git a/neutron/agent/l2population_rpc.py b/neutron/agent/l2population_rpc.py index c9d131aaf3d..f108c4ebdcd 100644 --- a/neutron/agent/l2population_rpc.py +++ b/neutron/agent/l2population_rpc.py @@ -37,6 +37,11 @@ class L2populationRpcCallBackMixin(object): if not host or host == cfg.CONF.host: self.fdb_remove(context, fdb_entries) + @log.log + def update_fdb_entries(self, context, fdb_entries, host=None): + if not host or host == cfg.CONF.host: + self.fdb_update(context, fdb_entries) + @abc.abstractmethod def fdb_add(self, context, fdb_entries): pass @@ -44,3 +49,7 @@ class L2populationRpcCallBackMixin(object): @abc.abstractmethod def fdb_remove(self, context, fdb_entries): pass + + @abc.abstractmethod + def fdb_update(self, context, fdb_entries): + pass diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index b25f56fb5fd..d7159a37b51 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -717,6 +717,40 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, ports, interface) + def _fdb_chg_ip(self, context, fdb_entries): + LOG.debug(_("update chg_ip received")) + for network_id, agent_ports in fdb_entries.items(): + segment = self.agent.br_mgr.network_map.get(network_id) + if not segment: + return + + if segment.network_type != lconst.TYPE_VXLAN: + return + + interface = self.agent.br_mgr.get_vxlan_device_name( + segment.segmentation_id) + + for agent_ip, state in agent_ports.items(): + if agent_ip == self.agent.br_mgr.local_ip: + continue + + after = state.get('after') + for mac, ip in after: + self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface) + + before = state.get('before') + for mac, ip in before: + self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface) + + def fdb_update(self, context, fdb_entries): + LOG.debug(_("fdb_update received")) + for action, values in fdb_entries.items(): + method = '_fdb_' + action + if not hasattr(self, method): + raise NotImplementedError() + + getattr(self, method)(context, values) + def create_rpc_dispatcher(self): '''Get the rpc dispatcher for this manager. diff --git a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py index 8ab92da25f4..a3f54849da0 100644 --- a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py +++ b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py @@ -36,6 +36,7 @@ class L2populationMechanismDriver(api.MechanismDriver, def initialize(self): LOG.debug(_("Experimental L2 population driver")) + self.rpc_ctx = n_context.get_admin_context_without_session() def _get_port_fdb_entries(self, port): return [[port['mac_address'], @@ -45,31 +46,64 @@ class L2populationMechanismDriver(api.MechanismDriver, self.remove_fdb_entries = self._update_port_down(context) def delete_port_postcommit(self, context): - self._notify_remove_fdb_entries(context, - self.remove_fdb_entries) - - def _notify_remove_fdb_entries(self, context, fdb_entries): - rpc_ctx = n_context.get_admin_context_without_session() l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( - rpc_ctx, fdb_entries) + self.rpc_ctx, self.remove_fdb_entries) + + def _get_diff_ips(self, orig, port): + orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']]) + port_ips = set([ip['ip_address'] for ip in port['fixed_ips']]) + + # check if an ip has been added or removed + orig_chg_ips = orig_ips.difference(port_ips) + port_chg_ips = port_ips.difference(orig_ips) + + if orig_chg_ips or port_chg_ips: + return orig_chg_ips, port_chg_ips + + def _fixed_ips_changed(self, context, orig, port): + diff_ips = self._get_diff_ips(orig, port) + if not diff_ips: + return + orig_ips, port_ips = diff_ips + + port_infos = self._get_port_infos(context, orig) + if not port_infos: + return + agent, agent_ip, segment, port_fdb_entries = port_infos + + orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips] + port_mac_ip = [[port['mac_address'], ip] for ip in port_ips] + + upd_fdb_entries = {port['network_id']: {agent_ip: {}}} + + ports = upd_fdb_entries[port['network_id']][agent_ip] + if orig_mac_ip: + ports['before'] = orig_mac_ip + + if port_mac_ip: + ports['after'] = port_mac_ip + + l2pop_rpc.L2populationAgentNotify.update_fdb_entries( + self.rpc_ctx, {'chg_ip': upd_fdb_entries}) + + return True def update_port_postcommit(self, context): port = context.current orig = context.original if port['status'] == orig['status']: - return + self._fixed_ips_changed(context, orig, port) if port['status'] == const.PORT_STATUS_ACTIVE: self._update_port_up(context) elif port['status'] == const.PORT_STATUS_DOWN: fdb_entries = self._update_port_down(context) - self._notify_remove_fdb_entries(context, fdb_entries) + l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( + self.rpc_ctx, fdb_entries) - def _update_port_up(self, context): - port_context = context.current - network_id = port_context['network_id'] - agent_host = port_context['binding:host_id'] + def _get_port_infos(self, context, port): + agent_host = port['binding:host_id'] if not agent_host: return @@ -80,26 +114,39 @@ class L2populationMechanismDriver(api.MechanismDriver, agent_ip = self.get_agent_ip(agent) if not agent_ip: - LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"), - agent_host) + LOG.warning(_("Unable to retrieve the agent ip, check the agent " + "configuration.")) return segment = context.bound_segment if not segment: LOG.warning(_("Port %(port)s updated by agent %(agent)s " "isn't bound to any segment"), - {'port': port_context['id'], 'agent': agent.host}) + {'port': port['id'], 'agent': agent}) return tunnel_types = self.get_agent_tunnel_types(agent) if segment['network_type'] not in tunnel_types: return + fdb_entries = self._get_port_fdb_entries(port) + + return agent, agent_ip, segment, fdb_entries + + def _update_port_up(self, context): + port_context = context.current + port_infos = self._get_port_infos(context, port_context) + if not port_infos: + return + agent, agent_ip, segment, port_fdb_entries = port_infos + + agent_host = port_context['binding:host_id'] + network_id = port_context['network_id'] + + session = db_api.get_session() agent_ports = self.get_agent_network_port_count(session, agent_host, network_id) - rpc_ctx = n_context.get_admin_context_without_session() - other_fdb_entries = {network_id: {'segment_id': segment['segmentation_id'], 'network_type': segment['network_type'], @@ -138,45 +185,25 @@ class L2populationMechanismDriver(api.MechanismDriver, if ports.keys(): l2pop_rpc.L2populationAgentNotify.add_fdb_entries( - rpc_ctx, agent_fdb_entries, agent_host) + self.rpc_ctx, agent_fdb_entries, agent_host) # Notify other agents to add fdb rule for current port - fdb_entries = self._get_port_fdb_entries(port_context) - other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries + other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries - l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx, + l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx, other_fdb_entries) def _update_port_down(self, context): port_context = context.current - network_id = port_context['network_id'] + port_infos = self._get_port_infos(context, port_context) + if not port_infos: + return + agent, agent_ip, segment, port_fdb_entries = port_infos agent_host = port_context['binding:host_id'] - if not agent_host: - return + network_id = port_context['network_id'] session = db_api.get_session() - agent = self.get_agent_by_host(session, agent_host) - if not agent: - return - - agent_ip = self.get_agent_ip(agent) - if not agent_ip: - LOG.warning(_("Unable to retrieve the agent ip, check the agent " - "configuration.")) - return - - segment = context.bound_segment - if not segment: - LOG.warning(_("Port %(port)s updated by agent %(agent)s " - "isn't bound to any segment"), - {'port': port_context['id'], 'agent': agent}) - return - - tunnel_types = self.get_agent_tunnel_types(agent) - if segment['network_type'] not in tunnel_types: - return - agent_ports = self.get_agent_network_port_count(session, agent_host, network_id) diff --git a/neutron/plugins/ml2/drivers/l2pop/rpc.py b/neutron/plugins/ml2/drivers/l2pop/rpc.py index 176f1182059..b29717165e2 100644 --- a/neutron/plugins/ml2/drivers/l2pop/rpc.py +++ b/neutron/plugins/ml2/drivers/l2pop/rpc.py @@ -76,4 +76,13 @@ class L2populationAgentNotifyAPI(proxy.RpcProxy): self._notification_fanout(context, 'remove_fdb_entries', fdb_entries) + def update_fdb_entries(self, context, fdb_entries, host=None): + if fdb_entries: + if host: + self._notification_host(context, 'update_fdb_entries', + fdb_entries, host) + else: + self._notification_fanout(context, 'update_fdb_entries', + fdb_entries) + L2populationAgentNotify = L2populationAgentNotifyAPI() diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 443f00411da..174d682d1e3 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -415,6 +415,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, dl_vlan=lvm.vlan, dl_dst=port_info[0]) + def fdb_update(self, context, fdb_entries): + LOG.debug(_("fdb_update received")) + for action, values in fdb_entries.items(): + method = '_fdb_' + action + if not hasattr(self, method): + raise NotImplementedError() + + getattr(self, method)(context, values) + def create_rpc_dispatcher(self): '''Get the rpc dispatcher for this manager. diff --git a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py index b0846439cb1..f115733695d 100644 --- a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py +++ b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py @@ -879,3 +879,26 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): check_exit_code=False), ] execute_fn.assert_has_calls(expected) + + def test_fdb_update_chg_ip(self): + fdb_entries = {'chg_ip': + {'net_id': + {'agent_ip': + {'before': [['port_mac', 'port_ip_1']], + 'after': [['port_mac', 'port_ip_2']]}}}} + + with mock.patch.object(utils, 'execute', + return_value='') as execute_fn: + self.lb_rpc.fdb_update(None, fdb_entries) + + expected = [ + mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr', + 'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'], + root_helper=self.root_helper, + check_exit_code=False), + mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr', + 'port_mac', 'dev', 'vxlan-1'], + root_helper=self.root_helper, + check_exit_code=False) + ] + execute_fn.assert_has_calls(expected) diff --git a/neutron/tests/unit/ml2/drivers/test_l2population.py b/neutron/tests/unit/ml2/drivers/test_l2population.py index 718926a6e58..57a0a2bbe20 100644 --- a/neutron/tests/unit/ml2/drivers/test_l2population.py +++ b/neutron/tests/unit/ml2/drivers/test_l2population.py @@ -406,3 +406,80 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase): self.mock_fanout.assert_any_call( mock.ANY, expected, topic=self.fanout_topic) + + def test_fixed_ips_changed(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + with self.port(subnet=subnet, cidr='10.0.0.0/24', + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + p1 = port1['port'] + + self.mock_fanout.reset_mock() + + data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'}, + {'ip_address': '10.0.0.10'}]}} + req = self.new_update_request('ports', data, p1['id']) + res = self.deserialize(self.fmt, req.get_response(self.api)) + ips = res['port']['fixed_ips'] + self.assertEqual(len(ips), 2) + + add_expected = {'args': + {'fdb_entries': + {'chg_ip': + {p1['network_id']: + {'20.0.0.1': + {'after': [[p1['mac_address'], + '10.0.0.10']]}}}}}, + 'namespace': None, + 'method': 'update_fdb_entries'} + + self.mock_fanout.assert_any_call( + mock.ANY, add_expected, topic=self.fanout_topic) + + self.mock_fanout.reset_mock() + + data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'}, + {'ip_address': '10.0.0.16'}]}} + req = self.new_update_request('ports', data, p1['id']) + res = self.deserialize(self.fmt, req.get_response(self.api)) + ips = res['port']['fixed_ips'] + self.assertEqual(len(ips), 2) + + upd_expected = {'args': + {'fdb_entries': + {'chg_ip': + {p1['network_id']: + {'20.0.0.1': + {'before': [[p1['mac_address'], + '10.0.0.10']], + 'after': [[p1['mac_address'], + '10.0.0.16']]}}}}}, + 'namespace': None, + 'method': 'update_fdb_entries'} + + self.mock_fanout.assert_any_call( + mock.ANY, upd_expected, topic=self.fanout_topic) + + self.mock_fanout.reset_mock() + + data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}} + req = self.new_update_request('ports', data, p1['id']) + res = self.deserialize(self.fmt, req.get_response(self.api)) + ips = res['port']['fixed_ips'] + self.assertEqual(len(ips), 1) + + del_expected = {'args': + {'fdb_entries': + {'chg_ip': + {p1['network_id']: + {'20.0.0.1': + {'before': [[p1['mac_address'], + '10.0.0.2']]}}}}}, + 'namespace': None, + 'method': 'update_fdb_entries'} + + self.mock_fanout.assert_any_call( + mock.ANY, del_expected, topic=self.fanout_topic)