Merge "Fix l2 pop doesn't propagate ip address updates"
This commit is contained in:
commit
67f592dcb1
@ -37,6 +37,11 @@ class L2populationRpcCallBackMixin(object):
|
||||
if not host or host == cfg.CONF.host:
|
||||
self.fdb_remove(context, fdb_entries)
|
||||
|
||||
@log.log
|
||||
def update_fdb_entries(self, context, fdb_entries, host=None):
|
||||
if not host or host == cfg.CONF.host:
|
||||
self.fdb_update(context, fdb_entries)
|
||||
|
||||
@abc.abstractmethod
|
||||
def fdb_add(self, context, fdb_entries):
|
||||
pass
|
||||
@ -44,3 +49,7 @@ class L2populationRpcCallBackMixin(object):
|
||||
@abc.abstractmethod
|
||||
def fdb_remove(self, context, fdb_entries):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def fdb_update(self, context, fdb_entries):
|
||||
pass
|
||||
|
@ -717,6 +717,40 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
ports,
|
||||
interface)
|
||||
|
||||
def _fdb_chg_ip(self, context, fdb_entries):
|
||||
LOG.debug(_("update chg_ip received"))
|
||||
for network_id, agent_ports in fdb_entries.items():
|
||||
segment = self.agent.br_mgr.network_map.get(network_id)
|
||||
if not segment:
|
||||
return
|
||||
|
||||
if segment.network_type != lconst.TYPE_VXLAN:
|
||||
return
|
||||
|
||||
interface = self.agent.br_mgr.get_vxlan_device_name(
|
||||
segment.segmentation_id)
|
||||
|
||||
for agent_ip, state in agent_ports.items():
|
||||
if agent_ip == self.agent.br_mgr.local_ip:
|
||||
continue
|
||||
|
||||
after = state.get('after')
|
||||
for mac, ip in after:
|
||||
self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
|
||||
|
||||
before = state.get('before')
|
||||
for mac, ip in before:
|
||||
self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
|
||||
|
||||
def fdb_update(self, context, fdb_entries):
|
||||
LOG.debug(_("fdb_update received"))
|
||||
for action, values in fdb_entries.items():
|
||||
method = '_fdb_' + action
|
||||
if not hasattr(self, method):
|
||||
raise NotImplementedError()
|
||||
|
||||
getattr(self, method)(context, values)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
|
@ -36,6 +36,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
||||
|
||||
def initialize(self):
|
||||
LOG.debug(_("Experimental L2 population driver"))
|
||||
self.rpc_ctx = n_context.get_admin_context_without_session()
|
||||
|
||||
def _get_port_fdb_entries(self, port):
|
||||
return [[port['mac_address'],
|
||||
@ -45,31 +46,64 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
||||
self.remove_fdb_entries = self._update_port_down(context)
|
||||
|
||||
def delete_port_postcommit(self, context):
|
||||
self._notify_remove_fdb_entries(context,
|
||||
self.remove_fdb_entries)
|
||||
|
||||
def _notify_remove_fdb_entries(self, context, fdb_entries):
|
||||
rpc_ctx = n_context.get_admin_context_without_session()
|
||||
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
|
||||
rpc_ctx, fdb_entries)
|
||||
self.rpc_ctx, self.remove_fdb_entries)
|
||||
|
||||
def _get_diff_ips(self, orig, port):
|
||||
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
|
||||
port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])
|
||||
|
||||
# check if an ip has been added or removed
|
||||
orig_chg_ips = orig_ips.difference(port_ips)
|
||||
port_chg_ips = port_ips.difference(orig_ips)
|
||||
|
||||
if orig_chg_ips or port_chg_ips:
|
||||
return orig_chg_ips, port_chg_ips
|
||||
|
||||
def _fixed_ips_changed(self, context, orig, port):
|
||||
diff_ips = self._get_diff_ips(orig, port)
|
||||
if not diff_ips:
|
||||
return
|
||||
orig_ips, port_ips = diff_ips
|
||||
|
||||
port_infos = self._get_port_infos(context, orig)
|
||||
if not port_infos:
|
||||
return
|
||||
agent, agent_ip, segment, port_fdb_entries = port_infos
|
||||
|
||||
orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips]
|
||||
port_mac_ip = [[port['mac_address'], ip] for ip in port_ips]
|
||||
|
||||
upd_fdb_entries = {port['network_id']: {agent_ip: {}}}
|
||||
|
||||
ports = upd_fdb_entries[port['network_id']][agent_ip]
|
||||
if orig_mac_ip:
|
||||
ports['before'] = orig_mac_ip
|
||||
|
||||
if port_mac_ip:
|
||||
ports['after'] = port_mac_ip
|
||||
|
||||
l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
|
||||
self.rpc_ctx, {'chg_ip': upd_fdb_entries})
|
||||
|
||||
return True
|
||||
|
||||
def update_port_postcommit(self, context):
|
||||
port = context.current
|
||||
orig = context.original
|
||||
|
||||
if port['status'] == orig['status']:
|
||||
return
|
||||
self._fixed_ips_changed(context, orig, port)
|
||||
|
||||
if port['status'] == const.PORT_STATUS_ACTIVE:
|
||||
self._update_port_up(context)
|
||||
elif port['status'] == const.PORT_STATUS_DOWN:
|
||||
fdb_entries = self._update_port_down(context)
|
||||
self._notify_remove_fdb_entries(context, fdb_entries)
|
||||
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
|
||||
self.rpc_ctx, fdb_entries)
|
||||
|
||||
def _update_port_up(self, context):
|
||||
port_context = context.current
|
||||
network_id = port_context['network_id']
|
||||
agent_host = port_context['binding:host_id']
|
||||
def _get_port_infos(self, context, port):
|
||||
agent_host = port['binding:host_id']
|
||||
if not agent_host:
|
||||
return
|
||||
|
||||
@ -80,26 +114,39 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
||||
|
||||
agent_ip = self.get_agent_ip(agent)
|
||||
if not agent_ip:
|
||||
LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
|
||||
agent_host)
|
||||
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
|
||||
"configuration."))
|
||||
return
|
||||
|
||||
segment = context.bound_segment
|
||||
if not segment:
|
||||
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
|
||||
"isn't bound to any segment"),
|
||||
{'port': port_context['id'], 'agent': agent.host})
|
||||
{'port': port['id'], 'agent': agent})
|
||||
return
|
||||
|
||||
tunnel_types = self.get_agent_tunnel_types(agent)
|
||||
if segment['network_type'] not in tunnel_types:
|
||||
return
|
||||
|
||||
fdb_entries = self._get_port_fdb_entries(port)
|
||||
|
||||
return agent, agent_ip, segment, fdb_entries
|
||||
|
||||
def _update_port_up(self, context):
|
||||
port_context = context.current
|
||||
port_infos = self._get_port_infos(context, port_context)
|
||||
if not port_infos:
|
||||
return
|
||||
agent, agent_ip, segment, port_fdb_entries = port_infos
|
||||
|
||||
agent_host = port_context['binding:host_id']
|
||||
network_id = port_context['network_id']
|
||||
|
||||
session = db_api.get_session()
|
||||
agent_ports = self.get_agent_network_port_count(session, agent_host,
|
||||
network_id)
|
||||
|
||||
rpc_ctx = n_context.get_admin_context_without_session()
|
||||
|
||||
other_fdb_entries = {network_id:
|
||||
{'segment_id': segment['segmentation_id'],
|
||||
'network_type': segment['network_type'],
|
||||
@ -138,45 +185,25 @@ class L2populationMechanismDriver(api.MechanismDriver,
|
||||
|
||||
if ports.keys():
|
||||
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
|
||||
rpc_ctx, agent_fdb_entries, agent_host)
|
||||
self.rpc_ctx, agent_fdb_entries, agent_host)
|
||||
|
||||
# Notify other agents to add fdb rule for current port
|
||||
fdb_entries = self._get_port_fdb_entries(port_context)
|
||||
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
|
||||
other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
|
||||
|
||||
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
|
||||
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
|
||||
other_fdb_entries)
|
||||
|
||||
def _update_port_down(self, context):
|
||||
port_context = context.current
|
||||
network_id = port_context['network_id']
|
||||
port_infos = self._get_port_infos(context, port_context)
|
||||
if not port_infos:
|
||||
return
|
||||
agent, agent_ip, segment, port_fdb_entries = port_infos
|
||||
|
||||
agent_host = port_context['binding:host_id']
|
||||
if not agent_host:
|
||||
return
|
||||
network_id = port_context['network_id']
|
||||
|
||||
session = db_api.get_session()
|
||||
agent = self.get_agent_by_host(session, agent_host)
|
||||
if not agent:
|
||||
return
|
||||
|
||||
agent_ip = self.get_agent_ip(agent)
|
||||
if not agent_ip:
|
||||
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
|
||||
"configuration."))
|
||||
return
|
||||
|
||||
segment = context.bound_segment
|
||||
if not segment:
|
||||
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
|
||||
"isn't bound to any segment"),
|
||||
{'port': port_context['id'], 'agent': agent})
|
||||
return
|
||||
|
||||
tunnel_types = self.get_agent_tunnel_types(agent)
|
||||
if segment['network_type'] not in tunnel_types:
|
||||
return
|
||||
|
||||
agent_ports = self.get_agent_network_port_count(session, agent_host,
|
||||
network_id)
|
||||
|
||||
|
@ -76,4 +76,13 @@ class L2populationAgentNotifyAPI(proxy.RpcProxy):
|
||||
self._notification_fanout(context, 'remove_fdb_entries',
|
||||
fdb_entries)
|
||||
|
||||
def update_fdb_entries(self, context, fdb_entries, host=None):
|
||||
if fdb_entries:
|
||||
if host:
|
||||
self._notification_host(context, 'update_fdb_entries',
|
||||
fdb_entries, host)
|
||||
else:
|
||||
self._notification_fanout(context, 'update_fdb_entries',
|
||||
fdb_entries)
|
||||
|
||||
L2populationAgentNotify = L2populationAgentNotifyAPI()
|
||||
|
@ -415,6 +415,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
dl_vlan=lvm.vlan,
|
||||
dl_dst=port_info[0])
|
||||
|
||||
def fdb_update(self, context, fdb_entries):
|
||||
LOG.debug(_("fdb_update received"))
|
||||
for action, values in fdb_entries.items():
|
||||
method = '_fdb_' + action
|
||||
if not hasattr(self, method):
|
||||
raise NotImplementedError()
|
||||
|
||||
getattr(self, method)(context, values)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
|
@ -879,3 +879,26 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
|
||||
check_exit_code=False),
|
||||
]
|
||||
execute_fn.assert_has_calls(expected)
|
||||
|
||||
def test_fdb_update_chg_ip(self):
|
||||
fdb_entries = {'chg_ip':
|
||||
{'net_id':
|
||||
{'agent_ip':
|
||||
{'before': [['port_mac', 'port_ip_1']],
|
||||
'after': [['port_mac', 'port_ip_2']]}}}}
|
||||
|
||||
with mock.patch.object(utils, 'execute',
|
||||
return_value='') as execute_fn:
|
||||
self.lb_rpc.fdb_update(None, fdb_entries)
|
||||
|
||||
expected = [
|
||||
mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr',
|
||||
'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'],
|
||||
root_helper=self.root_helper,
|
||||
check_exit_code=False),
|
||||
mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr',
|
||||
'port_mac', 'dev', 'vxlan-1'],
|
||||
root_helper=self.root_helper,
|
||||
check_exit_code=False)
|
||||
]
|
||||
execute_fn.assert_has_calls(expected)
|
||||
|
@ -406,3 +406,80 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
||||
|
||||
self.mock_fanout.assert_any_call(
|
||||
mock.ANY, expected, topic=self.fanout_topic)
|
||||
|
||||
def test_fixed_ips_changed(self):
|
||||
self._register_ml2_agents()
|
||||
|
||||
with self.subnet(network=self._network) as subnet:
|
||||
host_arg = {portbindings.HOST_ID: HOST}
|
||||
with self.port(subnet=subnet, cidr='10.0.0.0/24',
|
||||
arg_list=(portbindings.HOST_ID,),
|
||||
**host_arg) as port1:
|
||||
p1 = port1['port']
|
||||
|
||||
self.mock_fanout.reset_mock()
|
||||
|
||||
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
|
||||
{'ip_address': '10.0.0.10'}]}}
|
||||
req = self.new_update_request('ports', data, p1['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
ips = res['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 2)
|
||||
|
||||
add_expected = {'args':
|
||||
{'fdb_entries':
|
||||
{'chg_ip':
|
||||
{p1['network_id']:
|
||||
{'20.0.0.1':
|
||||
{'after': [[p1['mac_address'],
|
||||
'10.0.0.10']]}}}}},
|
||||
'namespace': None,
|
||||
'method': 'update_fdb_entries'}
|
||||
|
||||
self.mock_fanout.assert_any_call(
|
||||
mock.ANY, add_expected, topic=self.fanout_topic)
|
||||
|
||||
self.mock_fanout.reset_mock()
|
||||
|
||||
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
|
||||
{'ip_address': '10.0.0.16'}]}}
|
||||
req = self.new_update_request('ports', data, p1['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
ips = res['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 2)
|
||||
|
||||
upd_expected = {'args':
|
||||
{'fdb_entries':
|
||||
{'chg_ip':
|
||||
{p1['network_id']:
|
||||
{'20.0.0.1':
|
||||
{'before': [[p1['mac_address'],
|
||||
'10.0.0.10']],
|
||||
'after': [[p1['mac_address'],
|
||||
'10.0.0.16']]}}}}},
|
||||
'namespace': None,
|
||||
'method': 'update_fdb_entries'}
|
||||
|
||||
self.mock_fanout.assert_any_call(
|
||||
mock.ANY, upd_expected, topic=self.fanout_topic)
|
||||
|
||||
self.mock_fanout.reset_mock()
|
||||
|
||||
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}}
|
||||
req = self.new_update_request('ports', data, p1['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
ips = res['port']['fixed_ips']
|
||||
self.assertEqual(len(ips), 1)
|
||||
|
||||
del_expected = {'args':
|
||||
{'fdb_entries':
|
||||
{'chg_ip':
|
||||
{p1['network_id']:
|
||||
{'20.0.0.1':
|
||||
{'before': [[p1['mac_address'],
|
||||
'10.0.0.2']]}}}}},
|
||||
'namespace': None,
|
||||
'method': 'update_fdb_entries'}
|
||||
|
||||
self.mock_fanout.assert_any_call(
|
||||
mock.ANY, del_expected, topic=self.fanout_topic)
|
||||
|
Loading…
Reference in New Issue
Block a user