modify l2-proxy-- query ports based on timestamp
Change-Id: Id51d8c3cfb2f52dc5bbf6ebc085bc4cf963061b8
This commit is contained in:
@@ -53,6 +53,7 @@ from neutron.plugins.l2_proxy.common import config # noqa
|
|||||||
from neutron.plugins.l2_proxy.common import constants
|
from neutron.plugins.l2_proxy.common import constants
|
||||||
from neutron.plugins.l2_proxy.agent import neutron_proxy_context
|
from neutron.plugins.l2_proxy.agent import neutron_proxy_context
|
||||||
from neutron.plugins.l2_proxy.agent import clients
|
from neutron.plugins.l2_proxy.agent import clients
|
||||||
|
from neutron.openstack.common import timeutils
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@@ -71,7 +72,7 @@ class QueryPortsInfoInterface:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.context = n_context.get_admin_context_without_session()
|
self.context = n_context.get_admin_context_without_session()
|
||||||
|
|
||||||
def _list_ports(self):
|
def _list_ports(self, since_time=None):
|
||||||
keystone_auth_url = cfg.CONF.AGENT.keystone_auth_url
|
keystone_auth_url = cfg.CONF.AGENT.keystone_auth_url
|
||||||
kwargs = {'auth_token': None,
|
kwargs = {'auth_token': None,
|
||||||
'username': cfg.CONF.AGENT.neutron_user_name,
|
'username': cfg.CONF.AGENT.neutron_user_name,
|
||||||
@@ -87,14 +88,23 @@ class QueryPortsInfoInterface:
|
|||||||
neutronClient = openStackClients.neutron()
|
neutronClient = openStackClients.neutron()
|
||||||
#filters = {'status': 'ACTIVE'}
|
#filters = {'status': 'ACTIVE'}
|
||||||
#bodyResponse = neutronClient.list_ports(filters = filters)
|
#bodyResponse = neutronClient.list_ports(filters = filters)
|
||||||
|
if(since_time == None):
|
||||||
bodyResponse = neutronClient.list_ports(status='ACTIVE')
|
bodyResponse = neutronClient.list_ports(status='ACTIVE')
|
||||||
LOG.debug(_('list ports, Response:%s'), str(bodyResponse))
|
LOG.debug(_('First list ports, Response:%s'), str(bodyResponse))
|
||||||
|
else:
|
||||||
|
bodyResponse = neutronClient.list_ports(status='ACTIVE',
|
||||||
|
changes_since=since_time)
|
||||||
|
LOG.debug(_('list ports,since_time:%s, Response:%s'),
|
||||||
|
str(since_time), str(bodyResponse))
|
||||||
return bodyResponse
|
return bodyResponse
|
||||||
|
|
||||||
def get_update_net_port_info(self):
|
def get_update_net_port_info(self):
|
||||||
ports = self._list_ports()
|
ports = self._list_ports()
|
||||||
return ports.get("ports", [])
|
return ports.get("ports", [])
|
||||||
|
|
||||||
|
def get_update_port_info_since(self, since_time):
|
||||||
|
ports = self._list_ports(since_time)
|
||||||
|
return ports.get("ports", [])
|
||||||
|
|
||||||
class RemotePort:
|
class RemotePort:
|
||||||
|
|
||||||
@@ -258,6 +268,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
self.query_ports_info_inter = QueryPortsInfoInterface()
|
self.query_ports_info_inter = QueryPortsInfoInterface()
|
||||||
self.cascaded_port_info = {}
|
self.cascaded_port_info = {}
|
||||||
self.cascaded_host_map = {}
|
self.cascaded_host_map = {}
|
||||||
|
self.first_scan_flag = True
|
||||||
|
|
||||||
# Keep track of int_br's device count for use by _report_state()
|
# Keep track of int_br's device count for use by _report_state()
|
||||||
self.int_br_device_count = 0
|
self.int_br_device_count = 0
|
||||||
@@ -456,6 +467,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
bodyResponse = neutronClient.delete_port(port_id)
|
bodyResponse = neutronClient.delete_port(port_id)
|
||||||
LOG.debug(_('destroy port, Response:%s'), str(bodyResponse))
|
LOG.debug(_('destroy port, Response:%s'), str(bodyResponse))
|
||||||
return bodyResponse
|
return bodyResponse
|
||||||
|
|
||||||
def fdb_add(self, context, fdb_entries):
|
def fdb_add(self, context, fdb_entries):
|
||||||
LOG.debug("fdb_add received")
|
LOG.debug("fdb_add received")
|
||||||
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
|
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
|
||||||
@@ -463,7 +475,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
cascaded_net_id = lvm.cascaded_net_id
|
cascaded_net_id = lvm.cascaded_net_id
|
||||||
if not cascaded_net_id:
|
if not cascaded_net_id:
|
||||||
continue
|
continue
|
||||||
#agent_ports = values.get('ports')
|
|
||||||
agent_ports.pop(self.local_ip, None)
|
agent_ports.pop(self.local_ip, None)
|
||||||
if len(agent_ports):
|
if len(agent_ports):
|
||||||
for agent_ip, ports in agent_ports.items():
|
for agent_ip, ports in agent_ports.items():
|
||||||
@@ -515,15 +527,18 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
LOG.debug("fdb_remove received")
|
LOG.debug("fdb_remove received")
|
||||||
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
|
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
|
||||||
self.local_vlan_map):
|
self.local_vlan_map):
|
||||||
#agent_ports = values.get('ports')
|
|
||||||
agent_ports.pop(self.local_ip, None)
|
agent_ports.pop(self.local_ip, None)
|
||||||
if len(agent_ports):
|
if len(agent_ports):
|
||||||
for agent_ip, ports in agent_ports.items():
|
for agent_ip, ports in agent_ports.items():
|
||||||
for port in ports:
|
for port in ports:
|
||||||
rp = lvm.remote_ports.pop(port[0], None)
|
local_p = lvm.vif_ports.pop(port[0], None)
|
||||||
if not rp:
|
if(local_p and local_p.port_id):
|
||||||
|
self.cascaded_port_info.pop(local_p.port_id, None)
|
||||||
continue
|
continue
|
||||||
self._destroy_port(context, rp.port_id)
|
remote_p = lvm.remote_ports.pop(port[0], None)
|
||||||
|
if not remote_p:
|
||||||
|
continue
|
||||||
|
self._destroy_port(context, remote_p.port_id)
|
||||||
|
|
||||||
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
|
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
|
||||||
if port_info == q_const.FLOODING_ENTRY:
|
if port_info == q_const.FLOODING_ENTRY:
|
||||||
@@ -1023,8 +1038,17 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
return cur_ports
|
return cur_ports
|
||||||
|
|
||||||
def scan_ports(self, registered_ports, updated_ports=None):
|
def scan_ports(self, registered_ports, updated_ports=None):
|
||||||
|
if(self.first_scan_flag == True):
|
||||||
ports_info = self.query_ports_info_inter.get_update_net_port_info()
|
ports_info = self.query_ports_info_inter.get_update_net_port_info()
|
||||||
cur_ports = self.analysis_ports_info(ports_info)
|
self.first_scan_flag = False
|
||||||
|
else:
|
||||||
|
pre_time = time.time() - self.polling_interval - 1
|
||||||
|
since_time = time.strftime("%Y-%m-%d %H:%M:%S",
|
||||||
|
time.gmtime(pre_time))
|
||||||
|
ports_info = self.query_ports_info_inter.get_update_port_info_since(
|
||||||
|
since_time)
|
||||||
|
added_or_updated_ports = self.analysis_ports_info(ports_info)
|
||||||
|
cur_ports = set(self.cascaded_port_info.keys()) | added_or_updated_ports
|
||||||
self.int_br_device_count = len(cur_ports)
|
self.int_br_device_count = len(cur_ports)
|
||||||
port_info = {'current': cur_ports}
|
port_info = {'current': cur_ports}
|
||||||
if updated_ports is None:
|
if updated_ports is None:
|
||||||
@@ -1276,7 +1300,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
"Because port info in cascading and cascaded layer"
|
"Because port info in cascading and cascaded layer"
|
||||||
"are different, Details: %(details)s"),
|
"are different, Details: %(details)s"),
|
||||||
{'device': device, 'details': details})
|
{'device': device, 'details': details})
|
||||||
return
|
skipped_devices.add(device)
|
||||||
|
return skipped_devices
|
||||||
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
|
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
|
||||||
{'device': device, 'details': details})
|
{'device': device, 'details': details})
|
||||||
self.treat_vif_port(device, details['port_id'],
|
self.treat_vif_port(device, details['port_id'],
|
||||||
@@ -1541,7 +1566,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
ancillary_ports.clear()
|
ancillary_ports.clear()
|
||||||
sync = False
|
sync = False
|
||||||
polling_manager.force_polling()
|
polling_manager.force_polling()
|
||||||
ovs_restarted = self.check_ovs_restart()
|
#ovs_restarted = self.check_ovs_restart()
|
||||||
if ovs_restarted:
|
if ovs_restarted:
|
||||||
self.setup_integration_br()
|
self.setup_integration_br()
|
||||||
self.setup_physical_bridges(self.bridge_mappings)
|
self.setup_physical_bridges(self.bridge_mappings)
|
||||||
@@ -1575,6 +1600,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
updated_ports_copy = self.updated_ports
|
updated_ports_copy = self.updated_ports
|
||||||
self.updated_ports = set()
|
self.updated_ports = set()
|
||||||
reg_ports = (set() if ovs_restarted else ports)
|
reg_ports = (set() if ovs_restarted else ports)
|
||||||
|
#import pdb;pdb.set_trace()
|
||||||
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
||||||
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||||
"port information retrieved. "
|
"port information retrieved. "
|
||||||
|
|||||||
@@ -1,123 +0,0 @@
|
|||||||
# Copyright (c) 2013 OpenStack Foundation.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
from sqlalchemy import sql
|
|
||||||
|
|
||||||
from neutron.common import constants as const
|
|
||||||
from neutron.db import agents_db
|
|
||||||
from neutron.db import common_db_mixin as base_db
|
|
||||||
from neutron.db import models_v2
|
|
||||||
from neutron.openstack.common import jsonutils
|
|
||||||
from neutron.openstack.common import timeutils
|
|
||||||
from neutron.plugins.ml2.drivers.l2pop import constants as l2_const
|
|
||||||
from neutron.plugins.ml2 import models as ml2_models
|
|
||||||
|
|
||||||
|
|
||||||
class L2populationDbMixin(base_db.CommonDbMixin):
|
|
||||||
|
|
||||||
def get_agent_ip_by_host(self, session, agent_host):
|
|
||||||
agent = self.get_agent_by_host(session, agent_host)
|
|
||||||
if agent:
|
|
||||||
return self.get_agent_ip(agent)
|
|
||||||
|
|
||||||
def get_agent_ip(self, agent):
|
|
||||||
configuration = jsonutils.loads(agent.configurations)
|
|
||||||
return configuration.get('tunneling_ip')
|
|
||||||
|
|
||||||
def get_host_ip_from_binding_profile(self, port):
|
|
||||||
ip = port['binding:profile'].get('host_ip')
|
|
||||||
return ip
|
|
||||||
|
|
||||||
def get_host_ip_from_binding_profile_str(self, profile):
|
|
||||||
if(not profile):
|
|
||||||
return
|
|
||||||
profile = jsonutils.loads(profile)
|
|
||||||
return profile.get('host_ip')
|
|
||||||
|
|
||||||
def get_agent_uptime(self, agent):
|
|
||||||
return timeutils.delta_seconds(agent.started_at,
|
|
||||||
agent.heartbeat_timestamp)
|
|
||||||
|
|
||||||
def get_agent_tunnel_types(self, agent):
|
|
||||||
configuration = jsonutils.loads(agent.configurations)
|
|
||||||
return configuration.get('tunnel_types')
|
|
||||||
|
|
||||||
def get_agent_l2pop_network_types(self, agent):
|
|
||||||
configuration = jsonutils.loads(agent.configurations)
|
|
||||||
return configuration.get('l2pop_network_types')
|
|
||||||
|
|
||||||
def get_agent_by_host(self, session, agent_host):
|
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
query = session.query(agents_db.Agent)
|
|
||||||
query = query.filter(agents_db.Agent.host == agent_host,
|
|
||||||
agents_db.Agent.agent_type.in_(
|
|
||||||
l2_const.SUPPORTED_AGENT_TYPES))
|
|
||||||
return query.first()
|
|
||||||
|
|
||||||
def get_network_ports(self, session, network_id):
|
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
query = session.query(ml2_models.PortBinding,
|
|
||||||
agents_db.Agent)
|
|
||||||
query = query.join(agents_db.Agent,
|
|
||||||
agents_db.Agent.host ==
|
|
||||||
ml2_models.PortBinding.host)
|
|
||||||
query = query.join(models_v2.Port)
|
|
||||||
query = query.filter(models_v2.Port.network_id == network_id,
|
|
||||||
models_v2.Port.admin_state_up == sql.true(),
|
|
||||||
agents_db.Agent.agent_type.in_(
|
|
||||||
l2_const.SUPPORTED_AGENT_TYPES))
|
|
||||||
return query
|
|
||||||
|
|
||||||
def get_nondvr_network_ports(self, session, network_id):
|
|
||||||
query = self.get_network_ports(session, network_id)
|
|
||||||
return query.filter(models_v2.Port.device_owner !=
|
|
||||||
const.DEVICE_OWNER_DVR_INTERFACE)
|
|
||||||
|
|
||||||
def get_dvr_network_ports(self, session, network_id):
|
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
query = session.query(ml2_models.DVRPortBinding,
|
|
||||||
agents_db.Agent)
|
|
||||||
query = query.join(agents_db.Agent,
|
|
||||||
agents_db.Agent.host ==
|
|
||||||
ml2_models.DVRPortBinding.host)
|
|
||||||
query = query.join(models_v2.Port)
|
|
||||||
query = query.filter(models_v2.Port.network_id == network_id,
|
|
||||||
models_v2.Port.admin_state_up == sql.true(),
|
|
||||||
models_v2.Port.device_owner ==
|
|
||||||
const.DEVICE_OWNER_DVR_INTERFACE,
|
|
||||||
agents_db.Agent.agent_type.in_(
|
|
||||||
l2_const.SUPPORTED_AGENT_TYPES))
|
|
||||||
return query
|
|
||||||
|
|
||||||
def get_agent_network_active_port_count(self, session, agent_host,
|
|
||||||
network_id):
|
|
||||||
with session.begin(subtransactions=True):
|
|
||||||
query = session.query(models_v2.Port)
|
|
||||||
query1 = query.join(ml2_models.PortBinding)
|
|
||||||
query1 = query1.filter(models_v2.Port.network_id == network_id,
|
|
||||||
models_v2.Port.status ==
|
|
||||||
const.PORT_STATUS_ACTIVE,
|
|
||||||
models_v2.Port.device_owner !=
|
|
||||||
const.DEVICE_OWNER_DVR_INTERFACE,
|
|
||||||
ml2_models.PortBinding.host == agent_host)
|
|
||||||
query2 = query.join(ml2_models.DVRPortBinding)
|
|
||||||
query2 = query2.filter(models_v2.Port.network_id == network_id,
|
|
||||||
ml2_models.DVRPortBinding.status ==
|
|
||||||
const.PORT_STATUS_ACTIVE,
|
|
||||||
models_v2.Port.device_owner ==
|
|
||||||
const.DEVICE_OWNER_DVR_INTERFACE,
|
|
||||||
ml2_models.DVRPortBinding.host ==
|
|
||||||
agent_host)
|
|
||||||
return (query1.count() + query2.count())
|
|
||||||
@@ -1,304 +0,0 @@
|
|||||||
# Copyright (c) 2013 OpenStack Foundation.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from neutron.common import constants as const
|
|
||||||
from neutron import context as n_context
|
|
||||||
from neutron.db import api as db_api
|
|
||||||
from neutron.openstack.common import log as logging
|
|
||||||
from neutron.plugins.ml2 import driver_api as api
|
|
||||||
from neutron.plugins.ml2.drivers.l2pop import config # noqa
|
|
||||||
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
|
|
||||||
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class L2populationMechanismDriver(api.MechanismDriver,
|
|
||||||
l2pop_db.L2populationDbMixin):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super(L2populationMechanismDriver, self).__init__()
|
|
||||||
self.L2populationAgentNotify = l2pop_rpc.L2populationAgentNotifyAPI()
|
|
||||||
|
|
||||||
def initialize(self):
|
|
||||||
LOG.debug(_("Experimental L2 population driver"))
|
|
||||||
self.rpc_ctx = n_context.get_admin_context_without_session()
|
|
||||||
self.migrated_ports = {}
|
|
||||||
self.remove_fdb_entries = {}
|
|
||||||
|
|
||||||
def _get_port_fdb_entries(self, port):
|
|
||||||
return [[port['mac_address'], port['device_owner'],
|
|
||||||
ip['ip_address']] for ip in port['fixed_ips']]
|
|
||||||
|
|
||||||
def _get_agent_host(self, context, port):
|
|
||||||
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
|
|
||||||
agent_host = context.binding.host
|
|
||||||
else:
|
|
||||||
agent_host = port['binding:host_id']
|
|
||||||
return agent_host
|
|
||||||
|
|
||||||
def delete_port_precommit(self, context):
|
|
||||||
# TODO(matrohon): revisit once the original bound segment will be
|
|
||||||
# available in delete_port_postcommit. in delete_port_postcommit
|
|
||||||
# agent_active_ports will be equal to 0, and the _update_port_down
|
|
||||||
# won't need agent_active_ports_count_for_flooding anymore
|
|
||||||
port = context.current
|
|
||||||
agent_host = self._get_agent_host(context, port)
|
|
||||||
|
|
||||||
if port['id'] not in self.remove_fdb_entries:
|
|
||||||
self.remove_fdb_entries[port['id']] = {}
|
|
||||||
|
|
||||||
self.remove_fdb_entries[port['id']][agent_host] = (
|
|
||||||
self._update_port_down(context, port, 1))
|
|
||||||
|
|
||||||
def delete_port_postcommit(self, context):
|
|
||||||
port = context.current
|
|
||||||
agent_host = context.host
|
|
||||||
|
|
||||||
fdb_entries = self._update_port_down(context, port, agent_host)
|
|
||||||
self.L2populationAgentNotify.remove_fdb_entries(self.rpc_ctx,
|
|
||||||
fdb_entries)
|
|
||||||
|
|
||||||
def _get_diff_ips(self, orig, port):
|
|
||||||
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
|
|
||||||
port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])
|
|
||||||
|
|
||||||
# check if an ip has been added or removed
|
|
||||||
orig_chg_ips = orig_ips.difference(port_ips)
|
|
||||||
port_chg_ips = port_ips.difference(orig_ips)
|
|
||||||
|
|
||||||
if orig_chg_ips or port_chg_ips:
|
|
||||||
return orig_chg_ips, port_chg_ips
|
|
||||||
|
|
||||||
def _fixed_ips_changed(self, context, orig, port, diff_ips):
|
|
||||||
orig_ips, port_ips = diff_ips
|
|
||||||
|
|
||||||
if (port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
|
|
||||||
agent_host = context.host
|
|
||||||
else:
|
|
||||||
agent_host = context.original_host
|
|
||||||
port_infos = self._get_port_infos(
|
|
||||||
context, orig, agent_host)
|
|
||||||
if not port_infos:
|
|
||||||
return
|
|
||||||
agent, agent_host, agent_ip, segment, port_fdb_entries = port_infos
|
|
||||||
|
|
||||||
orig_mac_ip = [[port['mac_address'], port['device_owner'], ip]
|
|
||||||
for ip in orig_ips]
|
|
||||||
port_mac_ip = [[port['mac_address'], port['device_owner'], ip]
|
|
||||||
for ip in port_ips]
|
|
||||||
|
|
||||||
upd_fdb_entries = {port['network_id']: {agent_ip: {}}}
|
|
||||||
|
|
||||||
ports = upd_fdb_entries[port['network_id']][agent_ip]
|
|
||||||
if orig_mac_ip:
|
|
||||||
ports['before'] = orig_mac_ip
|
|
||||||
|
|
||||||
if port_mac_ip:
|
|
||||||
ports['after'] = port_mac_ip
|
|
||||||
|
|
||||||
self.L2populationAgentNotify.update_fdb_entries(
|
|
||||||
self.rpc_ctx, {'chg_ip': upd_fdb_entries})
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def update_port_postcommit(self, context):
|
|
||||||
port = context.current
|
|
||||||
orig = context.original
|
|
||||||
|
|
||||||
diff_ips = self._get_diff_ips(orig, port)
|
|
||||||
if diff_ips:
|
|
||||||
self._fixed_ips_changed(context, orig, port, diff_ips)
|
|
||||||
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
|
|
||||||
if context.status == const.PORT_STATUS_ACTIVE:
|
|
||||||
self._update_port_up(context)
|
|
||||||
if context.status == const.PORT_STATUS_DOWN:
|
|
||||||
agent_host = context.host
|
|
||||||
fdb_entries = self._update_port_down(
|
|
||||||
context, port, agent_host)
|
|
||||||
self.L2populationAgentNotify.remove_fdb_entries(
|
|
||||||
self.rpc_ctx, fdb_entries)
|
|
||||||
elif (context.host != context.original_host
|
|
||||||
and context.status == const.PORT_STATUS_ACTIVE
|
|
||||||
and not self.migrated_ports.get(orig['id'])):
|
|
||||||
# The port has been migrated. We have to store the original
|
|
||||||
# binding to send appropriate fdb once the port will be set
|
|
||||||
# on the destination host
|
|
||||||
self.migrated_ports[orig['id']] = (
|
|
||||||
(orig, context.original_host))
|
|
||||||
elif context.status != context.original_status:
|
|
||||||
if context.status == const.PORT_STATUS_ACTIVE:
|
|
||||||
self._update_port_up(context)
|
|
||||||
elif context.status == const.PORT_STATUS_DOWN:
|
|
||||||
fdb_entries = self._update_port_down(
|
|
||||||
context, port, context.host)
|
|
||||||
self.L2populationAgentNotify.remove_fdb_entries(
|
|
||||||
self.rpc_ctx, fdb_entries)
|
|
||||||
elif context.status == const.PORT_STATUS_BUILD:
|
|
||||||
orig = self.migrated_ports.pop(port['id'], None)
|
|
||||||
if orig:
|
|
||||||
original_port = orig[0]
|
|
||||||
original_host = orig[1]
|
|
||||||
# this port has been migrated: remove its entries from fdb
|
|
||||||
fdb_entries = self._update_port_down(
|
|
||||||
context, original_port, original_host)
|
|
||||||
self.L2populationAgentNotify.remove_fdb_entries(
|
|
||||||
self.rpc_ctx, fdb_entries)
|
|
||||||
|
|
||||||
def _get_port_infos(self, context, port, agent_host):
|
|
||||||
if not agent_host:
|
|
||||||
return
|
|
||||||
|
|
||||||
session = db_api.get_session()
|
|
||||||
agent = self.get_agent_by_host(session, agent_host)
|
|
||||||
if not agent:
|
|
||||||
return
|
|
||||||
|
|
||||||
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
|
|
||||||
agent_ip = self.get_agent_ip(agent)
|
|
||||||
else:
|
|
||||||
agent_ip = self.get_host_ip_from_binding_profile(port)
|
|
||||||
if not agent_ip:
|
|
||||||
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
|
|
||||||
"configuration."))
|
|
||||||
return
|
|
||||||
|
|
||||||
segment = context.bound_segment
|
|
||||||
if not segment:
|
|
||||||
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
|
|
||||||
"isn't bound to any segment"),
|
|
||||||
{'port': port['id'], 'agent': agent})
|
|
||||||
return
|
|
||||||
|
|
||||||
network_types = self.get_agent_l2pop_network_types(agent)
|
|
||||||
if network_types is None:
|
|
||||||
network_types = self.get_agent_tunnel_types(agent)
|
|
||||||
if segment['network_type'] not in network_types:
|
|
||||||
return
|
|
||||||
|
|
||||||
fdb_entries = self._get_port_fdb_entries(port)
|
|
||||||
|
|
||||||
return agent, agent_host, agent_ip, segment, fdb_entries
|
|
||||||
|
|
||||||
def _update_port_up(self, context):
|
|
||||||
port = context.current
|
|
||||||
agent_host = context.host
|
|
||||||
port_infos = self._get_port_infos(context, port, agent_host)
|
|
||||||
if not port_infos:
|
|
||||||
return
|
|
||||||
agent, agent_host, agent_ip, segment, port_fdb_entries = port_infos
|
|
||||||
|
|
||||||
network_id = port['network_id']
|
|
||||||
|
|
||||||
session = db_api.get_session()
|
|
||||||
agent_active_ports = self.get_agent_network_active_port_count(
|
|
||||||
session, agent_host, network_id)
|
|
||||||
|
|
||||||
other_fdb_entries = {network_id:
|
|
||||||
{'segment_id': segment['segmentation_id'],
|
|
||||||
'network_type': segment['network_type'],
|
|
||||||
'ports': {agent_ip: []}}}
|
|
||||||
|
|
||||||
if agent_active_ports == 1 or (
|
|
||||||
self.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time):
|
|
||||||
# First port activated on current agent in this network,
|
|
||||||
# we have to provide it with the whole list of fdb entries
|
|
||||||
agent_fdb_entries = {network_id:
|
|
||||||
{'segment_id': segment['segmentation_id'],
|
|
||||||
'network_type': segment['network_type'],
|
|
||||||
'ports': {}}}
|
|
||||||
ports = agent_fdb_entries[network_id]['ports']
|
|
||||||
|
|
||||||
nondvr_network_ports = self.get_nondvr_network_ports(session,
|
|
||||||
network_id)
|
|
||||||
for network_port in nondvr_network_ports:
|
|
||||||
binding, agent = network_port
|
|
||||||
if agent.host == agent_host:
|
|
||||||
continue
|
|
||||||
|
|
||||||
#ip = self.get_agent_ip(agent)
|
|
||||||
profile = binding['profile']
|
|
||||||
ip = self.get_host_ip_from_binding_profile_str(profile)
|
|
||||||
if not ip:
|
|
||||||
LOG.debug(_("Unable to retrieve the agent ip, check "
|
|
||||||
"the agent %(agent_host)s configuration."),
|
|
||||||
{'agent_host': agent.host})
|
|
||||||
continue
|
|
||||||
|
|
||||||
agent_ports = ports.get(ip, [const.FLOODING_ENTRY])
|
|
||||||
agent_ports += self._get_port_fdb_entries(binding.port)
|
|
||||||
ports[ip] = agent_ports
|
|
||||||
# comment by j00209498
|
|
||||||
# dvr_network_ports = self.get_dvr_network_ports(session, network_id)
|
|
||||||
# for network_port in dvr_network_ports:
|
|
||||||
# binding, agent = network_port
|
|
||||||
# if agent.host == agent_host:
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# ip = self.get_agent_ip(agent)
|
|
||||||
# if not ip:
|
|
||||||
# LOG.debug(_("Unable to retrieve the agent ip, check "
|
|
||||||
# "the agent %(agent_host)s configuration."),
|
|
||||||
# {'agent_host': agent.host})
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# agent_ports = ports.get(ip, [const.FLOODING_ENTRY])
|
|
||||||
# ports[ip] = agent_ports
|
|
||||||
|
|
||||||
# And notify other agents to add flooding entry
|
|
||||||
other_fdb_entries[network_id]['ports'][agent_ip].append(
|
|
||||||
const.FLOODING_ENTRY)
|
|
||||||
|
|
||||||
if ports.keys():
|
|
||||||
self.L2populationAgentNotify.add_fdb_entries(
|
|
||||||
self.rpc_ctx, agent_fdb_entries, agent_host)
|
|
||||||
|
|
||||||
# Notify other agents to add fdb rule for current port
|
|
||||||
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
|
|
||||||
other_fdb_entries[network_id]['ports'][agent_ip] += (
|
|
||||||
port_fdb_entries)
|
|
||||||
|
|
||||||
self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
|
|
||||||
other_fdb_entries)
|
|
||||||
|
|
||||||
def _update_port_down(self, context, port, agent_host):
|
|
||||||
port_infos = self._get_port_infos(context, port, agent_host)
|
|
||||||
if not port_infos:
|
|
||||||
return
|
|
||||||
agent, agent_host, agent_ip, segment, port_fdb_entries = port_infos
|
|
||||||
|
|
||||||
network_id = port['network_id']
|
|
||||||
|
|
||||||
session = db_api.get_session()
|
|
||||||
agent_active_ports = self.get_agent_network_active_port_count(
|
|
||||||
session, agent_host, network_id)
|
|
||||||
|
|
||||||
other_fdb_entries = {network_id:
|
|
||||||
{'segment_id': segment['segmentation_id'],
|
|
||||||
'network_type': segment['network_type'],
|
|
||||||
'ports': {agent_ip: []}}}
|
|
||||||
if agent_active_ports == 0:
|
|
||||||
# Agent is removing its last activated port in this network,
|
|
||||||
# other agents needs to be notified to delete their flooding entry.
|
|
||||||
other_fdb_entries[network_id]['ports'][agent_ip].append(
|
|
||||||
const.FLOODING_ENTRY)
|
|
||||||
# Notify other agents to remove fdb rules for current port
|
|
||||||
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
|
|
||||||
fdb_entries = port_fdb_entries
|
|
||||||
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
|
|
||||||
|
|
||||||
return other_fdb_entries
|
|
||||||
Reference in New Issue
Block a user