diff --git a/neutronproxy/l2proxy/neutron/plugins/l2_proxy/agent/l2_proxy.py b/neutronproxy/l2proxy/neutron/plugins/l2_proxy/agent/l2_proxy.py index 853f8c9..6d025b4 100644 --- a/neutronproxy/l2proxy/neutron/plugins/l2_proxy/agent/l2_proxy.py +++ b/neutronproxy/l2proxy/neutron/plugins/l2_proxy/agent/l2_proxy.py @@ -19,6 +19,9 @@ import hashlib import signal import sys import time +import os +import socket +import select from neutron import context as n_context from neutron.common import constants as const @@ -27,7 +30,6 @@ import eventlet eventlet.monkey_patch() import netaddr -from neutron.plugins.openvswitch.agent import ovs_dvr_neutron_agent from oslo.config import cfg from six import moves @@ -48,6 +50,7 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall +from neutron.openstack.common import jsonutils from neutron.plugins.common import constants as p_const from neutron.plugins.l2_proxy.common import config # noqa from neutron.plugins.l2_proxy.common import constants @@ -69,7 +72,7 @@ class DeviceListRetrievalError(exceptions.NeutronException): "because of error: %(error)s") -class QueryPortsInfoInterface: +class QueryPortsInterface: cascaded_neutron_client = None @@ -93,6 +96,34 @@ class QueryPortsInfoInterface: neutronClient = openStackClients.neutron() return neutronClient + def _show_port(self, port_id): + portResponse = None + if(not QueryPortsFromCascadedNeutron.cascaded_neutron_client): + QueryPortsFromCascadedNeutron.cascaded_neutron_client = \ + self._get_cascaded_neutron_client() + retry = 0 + while(True): + try: + portResponse = QueryPortsFromCascadedNeutron.\ + cascaded_neutron_client.show_port(port_id) + LOG.debug(_('show port, port_id=%s, Response:%s'), str(port_id), + str(portResponse)) + return portResponse + except exceptions.Unauthorized: + retry = retry + 1 + if(retry <= 3): + QueryPortsFromCascadedNeutron.cascaded_neutron_client = \ + self._get_cascaded_neutron_client() + continue + else: + with excutils.save_and_reraise_exception(): + LOG.error(_('ERR: Try 3 times,Unauthorized to list ports!')) + return None + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error(_('ERR: show port %s failed!'), port_id) + return None + def _list_ports(self, since_time=None, pagination_limit=None, pagination_marker=None): @@ -105,25 +136,33 @@ class QueryPortsInfoInterface: if(pagination_marker): filters['marker'] = pagination_marker portResponse = None - if(not QueryPortsInfoInterface.cascaded_neutron_client): - QueryPortsInfoInterface.cascaded_neutron_client = \ - self._get_cascaded_neutron_client() - try: - portResponse = QueryPortsInfoInterface.\ - cascaded_neutron_client.get('/ports', params=filters) - LOG.debug(_('list ports, filters:%s, since_time:%s, limit=%s, ' - 'marker=%s, Response:%s'), str(filters), - str(since_time), str(pagination_limit), - str(pagination_marker), str(portResponse)) - except exceptions.Unauthorized: - with excutils.save_and_reraise_exception(): - LOG.error(_('ERR: Unauthorized to list ports!')) - return None - except Exception: - with excutils.save_and_reraise_exception(): - LOG.error(_('ERR: list ports failed!')) - return None - return portResponse + if(not QueryPortsFromCascadedNeutron.cascaded_neutron_client): + QueryPortsFromCascadedNeutron.cascaded_neutron_client = \ + self._get_cascaded_neutron_client() + retry = 0 + while(True): + try: + portResponse = QueryPortsFromCascadedNeutron.\ + cascaded_neutron_client.get('/ports', params=filters) + LOG.debug(_('list ports, filters:%s, since_time:%s, limit=%s, ' + 'marker=%s, Response:%s'), str(filters), + str(since_time), str(pagination_limit), + str(pagination_marker), str(portResponse)) + return portResponse + except exceptions.Unauthorized: + retry = retry + 1 + if(retry <= 3): + QueryPortsFromCascadedNeutron.cascaded_neutron_client = \ + self._get_cascaded_neutron_client() + continue + else: + with excutils.save_and_reraise_exception(): + LOG.error(_('ERR: Try 3 times,Unauthorized to list ports!')) + return None + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error(_('ERR: list ports failed!')) + return None def _get_ports_pagination(self, since_time=None): ports_info = {'ports': []} @@ -155,13 +194,74 @@ class QueryPortsInfoInterface: ports_info['ports'].extend(current_page.get('ports', [])) ports_links_list = current_page.get('ports_links', []) - def get_update_net_port_info(self): - ports = self._get_ports_pagination() + +class QueryPortsFromNovaproxy(QueryPortsInterface): + + ports_info = {'ports': {'add': [], 'del': []}} + + def __init__(self): + self.context = n_context.get_admin_context_without_session() + self.sock_path = None + self.sock = None + + def listen_and_recv_port_info(self, sock_path): + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + path = sock_path + if os.path.exists(path): + os.unlink(path) + sock.bind(path) + sock.listen(5) + while(True): + infds, outfds, errfds = select.select([sock,], [], [], 5) + if len(infds) != 0: + con, addr = sock.accept() + recv_data = con.recv(1024) + self.process_recv_data(recv_data) + except socket.error as e: + LOG.warn(_('Error while connecting to socket: %s'), e) + return {} +# con.close() +# sock.close() + + def process_recv_data(self, data): + LOG.debug(_('process_recv_data begin! data:%s'), data) + data_dict = jsonutils.loads(data) + ports = data_dict.get('ports', None) + if(ports): + added_ports = ports.get('add', []) + for port_id in added_ports: + port_ret = self._show_port(port_id) + if port_ret and port_ret.get('port', None): + QueryPortsFromNovaproxy.ports_info['ports']['add']. \ + append(port_ret.get('port')) +# removed_ports = ports.get('delete', []) + + def get_update_net_port_info(self, since_time=None): + if(since_time): + ports_info = QueryPortsFromNovaproxy.ports_info['ports'].get('add', []) + QueryPortsFromNovaproxy.ports_info['ports']['add'] = [] + else: + all_ports = self._get_ports_pagination() + ports_info = all_ports.get('ports', []) + return ports_info + + +class QueryPortsFromCascadedNeutron(QueryPortsInterface): + + def __init__(self): + self.context = n_context.get_admin_context_without_session() + + def get_update_net_port_info(self, since_time=None): + if since_time: + ports = self._get_ports_pagination(since_time) + else: + ports = self._get_ports_pagination() return ports.get("ports", []) - def get_update_port_info_since(self, since_time): - ports = self._get_ports_pagination(since_time) - return ports.get("ports", []) +# def get_update_port_info_since(self, since_time): +# ports = self._get_ports_pagination(since_time) +# return ports.get("ports", []) class RemotePort: @@ -322,8 +422,13 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.enable_distributed_routing}, 'agent_type': q_const.AGENT_TYPE_OVS, 'start_flag': True} - - self.query_ports_info_inter = QueryPortsInfoInterface() + if(cfg.CONF.AGENT.query_ports_mode == 'cascaded_neutron'): + self.query_ports_info_inter = QueryPortsFromCascadedNeutron() + elif(cfg.CONF.AGENT.query_ports_mode == 'nova_proxy'): + self.sock_path = cfg.CONF.AGENT.proxy_sock_path + self.query_ports_info_inter = QueryPortsFromNovaproxy() + eventlet.spawn_n(self.query_ports_info_inter.listen_and_recv_port_info, + self.sock_path) self.cascaded_port_info = {} self.cascaded_host_map = {} self.first_scan_flag = True @@ -332,12 +437,12 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.int_br_device_count = 0 self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper) - self.setup_integration_br() +# self.setup_integration_br() # Stores port update notifications for processing in main rpc loop self.updated_ports = set() self.setup_rpc() self.bridge_mappings = bridge_mappings - self.setup_physical_bridges(self.bridge_mappings) +# self.setup_physical_bridges(self.bridge_mappings) self.local_vlan_map = {} self.tun_br_ofports = {p_const.TYPE_GRE: {}, p_const.TYPE_VXLAN: {}} @@ -357,27 +462,9 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.tun_br = None self.patch_int_ofport = constants.OFPORT_INVALID self.patch_tun_ofport = constants.OFPORT_INVALID - if self.enable_tunneling: - # The patch_int_ofport and patch_tun_ofport are updated - # here inside the call to setup_tunnel_br - self.setup_tunnel_br(tun_br) - - self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent( - self.context, - self.plugin_rpc, - self.int_br, - self.tun_br, - self.patch_int_ofport, - self.patch_tun_ofport, - cfg.CONF.host, - self.enable_tunneling, - self.enable_distributed_routing) # self.dvr_agent.setup_dvr_flows_on_integ_tun_br() - # Collect additional bridges to monitor - self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br) - # Security group agent support self.sg_agent = OVSSecurityGroupAgent(self.context, self.plugin_rpc, @@ -454,45 +541,13 @@ class OVSNeutronAgent(n_rpc.RpcCallback, def tunnel_update(self, context, **kwargs): LOG.debug(_("tunnel_update received")) - if not self.enable_tunneling: - return - tunnel_ip = kwargs.get('tunnel_ip') - tunnel_id = kwargs.get('tunnel_id', self.get_ip_in_hex(tunnel_ip)) - if not tunnel_id: - return - tunnel_type = kwargs.get('tunnel_type') - if not tunnel_type: - LOG.error(_("No tunnel_type specified, cannot create tunnels")) - return - if tunnel_type not in self.tunnel_types: - LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type) - return - if tunnel_ip == self.local_ip: - return - tun_name = '%s-%s' % (tunnel_type, tunnel_id) - if not self.l2_pop: - self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip, - tunnel_type) def _create_port(self, context, network_id, binding_profile, port_name, mac_address, ips): if(not network_id): LOG.error(_("No network id is specified, cannot create port")) return - keystone_auth_url = cfg.CONF.AGENT.keystone_auth_url - kwargs = {'auth_token': None, - 'username': cfg.CONF.AGENT.neutron_user_name, - 'password': cfg.CONF.AGENT.neutron_password, - 'aws_creds': None, - 'tenant': cfg.CONF.AGENT.neutron_tenant_name, - # 'tenant_id':'e8f280855dbe42a189eebb0f3ecb94bb', #context.values['tenant'], - 'auth_url': keystone_auth_url, - 'roles': context.roles, - 'is_admin': context.is_admin, - 'region_name': cfg.CONF.AGENT.os_region_name} - reqCon = neutron_proxy_context.RequestContext(**kwargs) - openStackClients = clients.OpenStackClients(reqCon) - neutronClient = openStackClients.neutron() + neutronClient = self.get_cascaded_neutron_client() req_props = {'network_id': network_id, 'name': port_name, 'admin_state_up': True, @@ -509,19 +564,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback, if(not port_id): LOG.error(_("No port id is specified, cannot destroy port")) return - keystone_auth_url = cfg.CONF.AGENT.keystone_auth_url - kwargs = {'auth_token': None, - 'username': cfg.CONF.AGENT.neutron_user_name, - 'password': cfg.CONF.AGENT.neutron_password, - 'aws_creds': None, - 'tenant': cfg.CONF.AGENT.neutron_tenant_name, - # 'tenant_id':'e8f280855dbe42a189eebb0f3ecb94bb', #context.values['tenant'], - 'auth_url': keystone_auth_url, - 'roles': context.roles, - 'is_admin': context.is_admin, - 'region_name': cfg.CONF.AGENT.os_region_name} - reqCon = neutron_proxy_context.RequestContext(**kwargs) - openStackClients = clients.OpenStackClients(reqCon) + + openStackClients = self.get_cascaded_neutron_client() neutronClient = openStackClients.neutron() bodyResponse = neutronClient.delete_port(port_id) LOG.debug(_('destroy port, Response:%s'), str(bodyResponse)) @@ -600,77 +644,29 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self._destroy_port(context, remote_p.port_id) def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport): - if port_info == q_const.FLOODING_ENTRY: - lvm.tun_ofports.add(ofport) - ofports = ','.join(lvm.tun_ofports) - br.mod_flow(table=constants.FLOOD_TO_TUN, - dl_vlan=lvm.vlan, - actions="strip_vlan,set_tunnel:%s,output:%s" % - (lvm.segmentation_id, ofports)) - else: - self.setup_entry_for_arp_reply(br, 'add', lvm.vlan, port_info[0], - port_info[1]) - br.add_flow(table=constants.UCAST_TO_TUN, - priority=2, - dl_vlan=lvm.vlan, - dl_dst=port_info[0], - actions="strip_vlan,set_tunnel:%s,output:%s" % - (lvm.segmentation_id, ofport)) + '''TODO can not delete, by jiahaojie + if delete,it will raise TypeError: + Can't instantiate abstract class OVSNeutronAgent with abstract + methods add_fdb_flow, cleanup_tunnel_port, del_fdb_flow, + setup_entry_for_arp_reply, setup_tunnel_port ''' + LOG.debug("add_fdb_flow received") def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport): - if port_info == q_const.FLOODING_ENTRY: - lvm.tun_ofports.remove(ofport) - if len(lvm.tun_ofports) > 0: - ofports = ','.join(lvm.tun_ofports) - br.mod_flow(table=constants.FLOOD_TO_TUN, - dl_vlan=lvm.vlan, - actions="strip_vlan,set_tunnel:%s,output:%s" % - (lvm.segmentation_id, ofports)) - else: - # This local vlan doesn't require any more tunnelling - br.delete_flows(table=constants.FLOOD_TO_TUN, dl_vlan=lvm.vlan) - else: - self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan, - port_info[0], port_info[1]) - br.delete_flows(table=constants.UCAST_TO_TUN, - dl_vlan=lvm.vlan, - dl_dst=port_info[0]) - - def _fdb_chg_ip(self, context, fdb_entries): - LOG.debug("update chg_ip received") - with self.tun_br.deferred() as deferred_br: - self.fdb_chg_ip_tun(context, deferred_br, fdb_entries, - self.local_ip, self.local_vlan_map) + '''TODO can not delete, by jiahaojie + if delete,it will raise TypeError: + Can't instantiate abstract class OVSNeutronAgent with abstract + methods add_fdb_flow, cleanup_tunnel_port, del_fdb_flow, + setup_entry_for_arp_reply, setup_tunnel_port ''' + LOG.debug("del_fdb_flow received") def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address, ip_address): - '''Set the ARP respond entry. - - When the l2 population mechanism driver and OVS supports to edit ARP - fields, a table (ARP_RESPONDER) to resolve ARP locally is added to the - tunnel bridge. - ''' - if not self.arp_responder_enabled: - return - - mac = netaddr.EUI(mac_address, dialect=netaddr.mac_unix) - ip = netaddr.IPAddress(ip_address) - - if action == 'add': - actions = constants.ARP_RESPONDER_ACTIONS % {'mac': mac, 'ip': ip} - br.add_flow(table=constants.ARP_RESPONDER, - priority=1, - proto='arp', - dl_vlan=local_vid, - nw_dst='%s' % ip, - actions=actions) - elif action == 'remove': - br.delete_flows(table=constants.ARP_RESPONDER, - proto='arp', - dl_vlan=local_vid, - nw_dst='%s' % ip) - else: - LOG.warning(_('Action %s not supported'), action) + '''TODO can not delete, by jiahaojie + if delete,it will raise TypeError: + Can't instantiate abstract class OVSNeutronAgent with abstract + methods add_fdb_flow, cleanup_tunnel_port, del_fdb_flow, + setup_entry_for_arp_reply, setup_tunnel_port ''' + LOG.debug("setup_entry_for_arp_reply is called!") def provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id, cascaded_net_id): @@ -694,7 +690,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback, LOG.error(_("No local VLAN available for net-id=%s"), net_uuid) return lvid = self.available_local_vlans.pop() - self.local_vlan_map[net_uuid] = LocalVLANMapping(network_type, + self.local_vlan_map[net_uuid] = LocalVLANMapping( + network_type, physical_network, segmentation_id, cascaded_net_id) @@ -719,41 +716,9 @@ class OVSNeutronAgent(n_rpc.RpcCallback, {'vlan_id': lvm.vlan, 'net_uuid': net_uuid}) - if lvm.network_type in constants.TUNNEL_NETWORK_TYPES: - if self.enable_tunneling: - self.tun_br.delete_flows( - table=constants.TUN_TABLE[lvm.network_type], - tun_id=lvm.segmentation_id) - self.tun_br.delete_flows(dl_vlan=lvm.vlan) - if self.l2_pop: - # Try to remove tunnel ports if not used by other networks - for ofport in lvm.tun_ofports: - self.cleanup_tunnel_port(self.tun_br, ofport, - lvm.network_type) - elif lvm.network_type == p_const.TYPE_FLAT: - if lvm.physical_network in self.phys_brs: - # outbound - br = self.phys_brs[lvm.physical_network] - br.delete_flows(in_port=self.phys_ofports[lvm. - physical_network], - dl_vlan=lvm.vlan) - # inbound - br = self.int_br - br.delete_flows(in_port=self.int_ofports[lvm.physical_network], - dl_vlan=0xffff) - elif lvm.network_type == p_const.TYPE_VLAN: - if lvm.physical_network in self.phys_brs: - # outbound - br = self.phys_brs[lvm.physical_network] - br.delete_flows(in_port=self.phys_ofports[lvm. - physical_network], - dl_vlan=lvm.vlan) - # inbound - br = self.int_br - br.delete_flows(in_port=self.int_ofports[lvm.physical_network], - dl_vlan=lvm.segmentation_id) - elif lvm.network_type == p_const.TYPE_LOCAL: - # no flows needed for local networks + if len(lvm.vif_ports) > 0 or len(lvm.remote_ports) > 0: + # should clear ports and delete network of cascaded layer + # by jiahaojie 00209498 pass else: LOG.error(_("Cannot reclaim unknown network type " @@ -790,301 +755,14 @@ class OVSNeutronAgent(n_rpc.RpcCallback, cascaded_port_info['id'], cascaded_port_info['mac_address']) - def port_unbound(self, vif_id, net_uuid=None): - '''Unbind port. - - Removes corresponding local vlan mapping object if this is its last - VIF. - - :param vif_id: the id of the vif - :param net_uuid: the net_uuid this port is associated with. - ''' - if net_uuid is None: - net_uuid = self.get_net_uuid(vif_id) - - if not self.local_vlan_map.get(net_uuid): - LOG.info(_('port_unbound(): net_uuid %s not in local_vlan_map'), - net_uuid) - return - - lvm = self.local_vlan_map[net_uuid] - -# if vif_id in lvm.vif_ports: -# vif_port = lvm.vif_ports[vif_id] -# self.dvr_agent.unbind_port_from_dvr(vif_port, -# local_vlan_id=lvm.vlan) - lvm.vif_ports.pop(vif_id, None) - - if not lvm.vif_ports: - self.reclaim_local_vlan(net_uuid) - - def port_dead(self, port): - '''Once a port has no binding, put it on the "dead vlan". - - :param port: a ovs_lib.VifPort object. - ''' - # Don't kill a port if it's already dead - cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag") - if cur_tag != DEAD_VLAN_TAG: - self.int_br.set_db_attribute("Port", port.port_name, "tag", - DEAD_VLAN_TAG) - self.int_br.add_flow(priority=2, in_port=port.ofport, - actions="drop") - - def setup_integration_br(self): - '''Setup the integration bridge. - - Create patch ports and remove all existing flows. - - :param bridge_name: the name of the integration bridge. - :returns: the integration bridge - ''' - # Ensure the integration bridge is created. - # ovs_lib.OVSBridge.create() will run - # ovs-vsctl -- --may-exist add-br BRIDGE_NAME - # which does nothing if bridge already exists. - self.int_br.create() - self.int_br.set_secure_mode() - - self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port) - self.int_br.remove_all_flows() - # switch all traffic using L2 learning - self.int_br.add_flow(priority=1, actions="normal") - # Add a canary flow to int_br to track OVS restarts - self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0, - actions="drop") - - def setup_ancillary_bridges(self, integ_br, tun_br): - '''Setup ancillary bridges - for example br-ex.''' - ovs_bridges = set(ovs_lib.get_bridges(self.root_helper)) - # Remove all known bridges - ovs_bridges.remove(integ_br) - if self.enable_tunneling: - ovs_bridges.remove(tun_br) - br_names = [self.phys_brs[physical_network].br_name for - physical_network in self.phys_brs] - ovs_bridges.difference_update(br_names) - # Filter list of bridges to those that have external - # bridge-id's configured - br_names = [] - for bridge in ovs_bridges: - id = ovs_lib.get_bridge_external_bridge_id(self.root_helper, - bridge) - if id != bridge: - br_names.append(bridge) - ovs_bridges.difference_update(br_names) - ancillary_bridges = [] - for bridge in ovs_bridges: - br = ovs_lib.OVSBridge(bridge, self.root_helper) - LOG.info(_('Adding %s to list of bridges.'), bridge) - ancillary_bridges.append(br) - return ancillary_bridges - - def setup_tunnel_br(self, tun_br_name=None): - '''Setup the tunnel bridge. - - Creates tunnel bridge, and links it to the integration bridge - using a patch port. - - :param tun_br_name: the name of the tunnel bridge. - ''' - if not self.tun_br: - self.tun_br = ovs_lib.OVSBridge(tun_br_name, self.root_helper) - - self.tun_br.reset_bridge() - self.patch_tun_ofport = self.int_br.add_patch_port( - cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port) - self.patch_int_ofport = self.tun_br.add_patch_port( - cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port) - if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0: - LOG.error(_("Failed to create OVS patch port. Cannot have " - "tunneling enabled on this agent, since this version " - "of OVS does not support tunnels or patch ports. " - "Agent terminated!")) - exit(1) - self.tun_br.remove_all_flows() - - # Table 0 (default) will sort incoming traffic depending on in_port - self.tun_br.add_flow(priority=1, - in_port=self.patch_int_ofport, - actions="resubmit(,%s)" % - constants.PATCH_LV_TO_TUN) - self.tun_br.add_flow(priority=0, actions="drop") - if self.arp_responder_enabled: - # ARP broadcast-ed request go to the local ARP_RESPONDER table to - # be locally resolved - self.tun_br.add_flow(table=constants.PATCH_LV_TO_TUN, - priority=1, - proto='arp', - dl_dst="ff:ff:ff:ff:ff:ff", - actions=("resubmit(,%s)" % - constants.ARP_RESPONDER)) - # PATCH_LV_TO_TUN table will handle packets coming from patch_int - # unicasts go to table UCAST_TO_TUN where remote addresses are learnt - self.tun_br.add_flow(table=constants.PATCH_LV_TO_TUN, - priority=0, - dl_dst="00:00:00:00:00:00/01:00:00:00:00:00", - actions="resubmit(,%s)" % constants.UCAST_TO_TUN) - # Broadcasts/multicasts go to table FLOOD_TO_TUN that handles flooding - self.tun_br.add_flow(table=constants.PATCH_LV_TO_TUN, - priority=0, - dl_dst="01:00:00:00:00:00/01:00:00:00:00:00", - actions="resubmit(,%s)" % constants.FLOOD_TO_TUN) - # Tables [tunnel_type]_TUN_TO_LV will set lvid depending on tun_id - # for each tunnel type, and resubmit to table LEARN_FROM_TUN where - # remote mac addresses will be learnt - for tunnel_type in constants.TUNNEL_NETWORK_TYPES: - self.tun_br.add_flow(table=constants.TUN_TABLE[tunnel_type], - priority=0, - actions="drop") - # LEARN_FROM_TUN table will have a single flow using a learn action to - # dynamically set-up flows in UCAST_TO_TUN corresponding to remote mac - # addresses (assumes that lvid has already been set by a previous flow) - learned_flow = ("table=%s," - "priority=1," - "hard_timeout=300," - "NXM_OF_VLAN_TCI[0..11]," - "NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[]," - "load:0->NXM_OF_VLAN_TCI[]," - "load:NXM_NX_TUN_ID[]->NXM_NX_TUN_ID[]," - "output:NXM_OF_IN_PORT[]" % - constants.UCAST_TO_TUN) - # Once remote mac addresses are learnt, output packet to patch_int - self.tun_br.add_flow(table=constants.LEARN_FROM_TUN, - priority=1, - actions="learn(%s),output:%s" % - (learned_flow, self.patch_int_ofport)) - # Egress unicast will be handled in table UCAST_TO_TUN, where remote - # mac addresses will be learned. For now, just add a default flow that - # will resubmit unknown unicasts to table FLOOD_TO_TUN to treat them - # as broadcasts/multicasts - self.tun_br.add_flow(table=constants.UCAST_TO_TUN, - priority=0, - actions="resubmit(,%s)" % - constants.FLOOD_TO_TUN) - if self.arp_responder_enabled: - # If none of the ARP entries correspond to the requested IP, the - # broadcast-ed packet is resubmitted to the flooding table - self.tun_br.add_flow(table=constants.ARP_RESPONDER, - priority=0, - actions="resubmit(,%s)" % - constants.FLOOD_TO_TUN) - # FLOOD_TO_TUN will handle flooding in tunnels based on lvid, - # for now, add a default drop action - self.tun_br.add_flow(table=constants.FLOOD_TO_TUN, - priority=0, - actions="drop") - - def get_peer_name(self, prefix, name): - """Construct a peer name based on the prefix and name. - - The peer name can not exceed the maximum length allowed for a linux - device. Longer names are hashed to help ensure uniqueness. - """ - if len(prefix + name) <= q_const.DEVICE_NAME_MAX_LEN: - return prefix + name - # We can't just truncate because bridges may be distinguished - # by an ident at the end. A hash over the name should be unique. - # Leave part of the bridge name on for easier identification - hashlen = 6 - namelen = q_const.DEVICE_NAME_MAX_LEN - len(prefix) - hashlen - new_name = ('%(prefix)s%(truncated)s%(hash)s' % - {'prefix': prefix, 'truncated': name[0:namelen], - 'hash': hashlib.sha1(name).hexdigest()[0:hashlen]}) - LOG.warning(_("Creating an interface named %(name)s exceeds the " - "%(limit)d character limitation. It was shortened to " - "%(new_name)s to fit."), - {'name': name, 'limit': q_const.DEVICE_NAME_MAX_LEN, - 'new_name': new_name}) - return new_name - - def setup_physical_bridges(self, bridge_mappings): - '''Setup the physical network bridges. - - Creates physical network bridges and links them to the - integration bridge using veths. - - :param bridge_mappings: map physical network names to bridge names. - ''' - self.phys_brs = {} - self.int_ofports = {} - self.phys_ofports = {} - ip_wrapper = ip_lib.IPWrapper(self.root_helper) - ovs_bridges = ovs_lib.get_bridges(self.root_helper) - for physical_network, bridge in bridge_mappings.iteritems(): - LOG.info(_("Mapping physical network %(physical_network)s to " - "bridge %(bridge)s"), - {'physical_network': physical_network, - 'bridge': bridge}) - # setup physical bridge - if bridge not in ovs_bridges: - LOG.error(_("Bridge %(bridge)s for physical network " - "%(physical_network)s does not exist. Agent " - "terminated!"), - {'physical_network': physical_network, - 'bridge': bridge}) - sys.exit(1) - br = ovs_lib.OVSBridge(bridge, self.root_helper) - br.remove_all_flows() - br.add_flow(priority=1, actions="normal") - self.phys_brs[physical_network] = br - - # interconnect physical and integration bridges using veth/patchs - int_if_name = self.get_peer_name(constants.PEER_INTEGRATION_PREFIX, - bridge) - phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX, - bridge) - self.int_br.delete_port(int_if_name) - br.delete_port(phys_if_name) - if self.use_veth_interconnection: - if ip_lib.device_exists(int_if_name, self.root_helper): - ip_lib.IPDevice(int_if_name, - self.root_helper).link.delete() - # Give udev a chance to process its rules here, to avoid - # race conditions between commands launched by udev rules - # and the subsequent call to ip_wrapper.add_veth - utils.execute(['/sbin/udevadm', 'settle', '--timeout=10']) - int_veth, phys_veth = ip_wrapper.add_veth(int_if_name, - phys_if_name) - int_ofport = self.int_br.add_port(int_veth) - phys_ofport = br.add_port(phys_veth) - else: - # Create patch ports without associating them in order to block - # untranslated traffic before association - int_ofport = self.int_br.add_patch_port( - int_if_name, constants.NONEXISTENT_PEER) - phys_ofport = br.add_patch_port( - phys_if_name, constants.NONEXISTENT_PEER) - - self.int_ofports[physical_network] = int_ofport - self.phys_ofports[physical_network] = phys_ofport - - # block all untranslated traffic between bridges - self.int_br.add_flow(priority=2, in_port=int_ofport, - actions="drop") - br.add_flow(priority=2, in_port=phys_ofport, actions="drop") - - if self.use_veth_interconnection: - # enable veth to pass traffic - int_veth.link.set_up() - phys_veth.link.set_up() - if self.veth_mtu: - # set up mtu size for veth interfaces - int_veth.link.set_mtu(self.veth_mtu) - phys_veth.link.set_mtu(self.veth_mtu) - else: - # associate patch ports to pass traffic - self.int_br.set_db_attribute('Interface', int_if_name, - 'options:peer', phys_if_name) - br.set_db_attribute('Interface', phys_if_name, - 'options:peer', int_if_name) - def get_port_id_from_profile(self, profile): - return profile.get('cascading_port_id') + return profile.get('cascading_port_id', None) def analysis_ports_info(self, ports_info): cur_ports = set() + LOG.debug(_('jiahaojie---ports_info: %s'), str(ports_info)) for port in ports_info: + LOG.debug(_('jiahaojie---port: %s'), str(port)) profile = port['binding:profile'] cascading_port_id = self.get_port_id_from_profile(profile) if(not cascading_port_id): @@ -1101,8 +779,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback, pre_time = time.time() - self.polling_interval - 1 since_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(pre_time)) - ports_info = self.query_ports_info_inter.get_update_port_info_since( - since_time) + ports_info = self.query_ports_info_inter.get_update_net_port_info( + since_time) added_or_updated_ports = self.analysis_ports_info(ports_info) cur_ports = set(self.cascaded_port_info.keys()) | added_or_updated_ports self.int_br_device_count = len(cur_ports) @@ -1130,43 +808,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback, port_info['removed'] = registered_ports - cur_ports return port_info - def check_changed_vlans(self, registered_ports): - """Return ports which have lost their vlan tag. - - The returned value is a set of port ids of the ports concerned by a - vlan tag loss. - """ - port_tags = self.int_br.get_port_tag_dict() - changed_ports = set() - for lvm in self.local_vlan_map.values(): - for port in registered_ports: - if ( - port in lvm.vif_ports - and lvm.vif_ports[port].port_name in port_tags - and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan - ): - LOG.info( - _("Port '%(port_name)s' has lost " - "its vlan tag '%(vlan_tag)d'!"), - {'port_name': lvm.vif_ports[port].port_name, - 'vlan_tag': lvm.vlan} - ) - changed_ports.add(port) - return changed_ports - - def update_ancillary_ports(self, registered_ports): - ports = set() - for bridge in self.ancillary_brs: - ports |= bridge.get_vif_port_set() - - if ports == registered_ports: - return - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} - def treat_vif_port(self, vif_port, port_id, network_id, network_type, physical_network, segmentation_id, admin_state_up, fixed_ips, device_owner, cascaded_port_info, @@ -1181,71 +822,22 @@ class OVSNeutronAgent(n_rpc.RpcCallback, physical_network, segmentation_id, fixed_ips, device_owner, cascaded_port_info, ovs_restarted) - else: - self.port_dead(vif_port) - - def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type): - ofport = br.add_tunnel_port(port_name, - remote_ip, - self.local_ip, - tunnel_type, - self.vxlan_udp_port, - self.dont_fragment) - ofport_int = -1 - try: - ofport_int = int(ofport) - except (TypeError, ValueError): - LOG.exception(_("ofport should have a value that can be " - "interpreted as an integer")) - if ofport_int < 0: - LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"), - {'type': tunnel_type, 'ip': remote_ip}) - return 0 - - self.tun_br_ofports[tunnel_type][remote_ip] = ofport - # Add flow in default table to resubmit to the right - # tunnelling table (lvid will be set in the latter) - br.add_flow(priority=1, - in_port=ofport, - actions="resubmit(,%s)" % - constants.TUN_TABLE[tunnel_type]) - - ofports = ','.join(self.tun_br_ofports[tunnel_type].values()) - if ofports and not self.l2_pop: - # Update flooding flows to include the new tunnel - for network_id, vlan_mapping in self.local_vlan_map.iteritems(): - if vlan_mapping.network_type == tunnel_type: - br.mod_flow(table=constants.FLOOD_TO_TUN, - dl_vlan=vlan_mapping.vlan, - actions="strip_vlan,set_tunnel:%s,output:%s" % - (vlan_mapping.segmentation_id, ofports)) - return ofport def setup_tunnel_port(self, br, remote_ip, network_type): - remote_ip_hex = self.get_ip_in_hex(remote_ip) - if not remote_ip_hex: - return 0 - port_name = '%s-%s' % (network_type, remote_ip_hex) - ofport = self._setup_tunnel_port(br, - port_name, - remote_ip, - network_type) - return ofport + '''TODO can not delete, by jiahaojie + if delete,it will raise TypeError: + Can't instantiate abstract class OVSNeutronAgent with abstract + methods add_fdb_flow, cleanup_tunnel_port, del_fdb_flow, + setup_entry_for_arp_reply, setup_tunnel_port ''' + LOG.debug("cleanup_tunnel_port is called!") def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type): - # Check if this tunnel port is still used - for lvm in self.local_vlan_map.values(): - if tun_ofport in lvm.tun_ofports: - break - # If not, remove it - else: - for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items(): - if ofport == tun_ofport: - port_name = '%s-%s' % (tunnel_type, - self.get_ip_in_hex(remote_ip)) - br.delete_port(port_name) - br.delete_flows(in_port=ofport) - self.tun_br_ofports[tunnel_type].pop(remote_ip, None) + '''TODO can not delete, by jiahaojie + if delete,it will raise TypeError: + Can't instantiate abstract class OVSNeutronAgent with abstract + methods add_fdb_flow, cleanup_tunnel_port, del_fdb_flow, + setup_entry_for_arp_reply, setup_tunnel_port ''' + LOG.debug("cleanup_tunnel_port is called!") def compare_port_info(self, details, cascaded_port_info): if details is None or cascaded_port_info is None: @@ -1398,65 +990,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback, # self.port_dead(port) return skipped_devices - def treat_ancillary_devices_added(self, devices): - try: - devices_details_list = self.plugin_rpc.get_devices_details_list( - self.context, - devices, - self.agent_id, - cfg.CONF.host) - except Exception as e: - raise DeviceListRetrievalError(devices=devices, error=e) - - for details in devices_details_list: - device = details['device'] - LOG.info(_("Ancillary Port %s added"), device) - - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - cfg.CONF.host) - - def treat_devices_removed(self, devices): - resync = False - self.sg_agent.remove_devices_filter(devices) - for device in devices: - LOG.info(_("Attachment %s removed"), device) - try: - self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - cfg.CONF.host) - except Exception as e: - LOG.debug(_("port_removed failed for %(device)s: %(e)s"), - {'device': device, 'e': e}) - resync = True - continue - self.port_unbound(device) - return resync - - def treat_ancillary_devices_removed(self, devices): - resync = False - for device in devices: - LOG.info(_("Attachment %s removed"), device) - try: - details = self.plugin_rpc.update_device_down(self.context, - device, - self.agent_id, - cfg.CONF.host) - except Exception as e: - LOG.debug(_("port_removed failed for %(device)s: %(e)s"), - {'device': device, 'e': e}) - resync = True - continue - if details['exists']: - LOG.info(_("Port %s updated."), device) - # Nothing to do regarding local networking - else: - LOG.debug(_("Device %s not defined on plugin"), device) - return resync - def process_network_ports(self, port_info, ovs_restarted): resync_a = False resync_b = False @@ -1504,7 +1037,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback, resync_a = True if 'removed' in port_info: start = time.time() - resync_b = self.treat_devices_removed(port_info['removed']) + #resync_b = self.treat_devices_removed(port_info['removed']) LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" "treat_devices_removed completed in %(elapsed).3f"), {'iter_num': self.iter_num, @@ -1512,38 +1045,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback, # If one of the above operations fails => resync with plugin return (resync_a | resync_b) - def process_ancillary_network_ports(self, port_info): - resync_a = False - resync_b = False - if 'added' in port_info: - start = time.time() - try: - self.treat_ancillary_devices_added(port_info['added']) - LOG.debug(_("process_ancillary_network_ports - iteration: " - "%(iter_num)d - treat_ancillary_devices_added " - "completed in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - except DeviceListRetrievalError: - # Need to resync as there was an error with server - # communication. - LOG.exception(_("process_ancillary_network_ports - " - "iteration:%d - failure while retrieving " - "port details from server"), self.iter_num) - resync_a = True - if 'removed' in port_info: - start = time.time() - resync_b = self.treat_ancillary_devices_removed( - port_info['removed']) - LOG.debug(_("process_ancillary_network_ports - iteration: " - "%(iter_num)d - treat_ancillary_devices_removed " - "completed in %(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - - # If one of the above operations fails => resync with plugin - return (resync_a | resync_b) - def get_ip_in_hex(self, ip_address): try: return '%08x' % netaddr.IPAddress(ip_address, version=4) @@ -1552,60 +1053,19 @@ class OVSNeutronAgent(n_rpc.RpcCallback, ip_address) return - def tunnel_sync(self): - try: - for tunnel_type in self.tunnel_types: - details = self.plugin_rpc.tunnel_sync(self.context, - self.local_ip, - tunnel_type) - if not self.l2_pop: - tunnels = details['tunnels'] - for tunnel in tunnels: - if self.local_ip != tunnel['ip_address']: - tunnel_id = tunnel.get('id') - # Unlike the OVS plugin, ML2 doesn't return an id - # key. So use ip_address to form port name instead. - # Port name must be <=15 chars, so use shorter hex. - remote_ip = tunnel['ip_address'] - remote_ip_hex = self.get_ip_in_hex(remote_ip) - if not tunnel_id and not remote_ip_hex: - continue - tun_name = '%s-%s' % (tunnel_type, - tunnel_id or remote_ip_hex) - self._setup_tunnel_port(self.tun_br, - tun_name, - tunnel['ip_address'], - tunnel_type) - except Exception as e: - LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"), - {'local_ip': self.local_ip, 'e': e}) - return True - return False - - def _agent_has_updates(self, polling_manager): - return (polling_manager.is_polling_required or - self.updated_ports or - self.sg_agent.firewall_refresh_needed()) - def _port_info_has_changes(self, port_info): return (port_info.get('added') or port_info.get('removed') or port_info.get('updated')) - def check_ovs_restart(self): - # Check for the canary flow - canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE) - return not canary_flow - def rpc_loop(self, polling_manager=None): - if not polling_manager: - polling_manager = polling.AlwaysPoll() +# if not polling_manager: +# polling_manager = polling.AlwaysPoll() sync = True ports = set() updated_ports_copy = set() ancillary_ports = set() - tunnel_sync = True ovs_restarted = False while self.run_daemon_loop: start = time.time() @@ -1616,32 +1076,13 @@ class OVSNeutronAgent(n_rpc.RpcCallback, 'removed': 0}} LOG.debug(_("Agent rpc_loop - iteration:%d started"), self.iter_num) - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - ancillary_ports.clear() - sync = False - polling_manager.force_polling() - #ovs_restarted = self.check_ovs_restart() - if ovs_restarted: - self.setup_integration_br() - self.setup_physical_bridges(self.bridge_mappings) - if self.enable_tunneling: - self.setup_tunnel_br() - tunnel_sync = True - self.dvr_agent.reset_ovs_parameters(self.int_br, - self.tun_br, - self.patch_int_ofport, - self.patch_tun_ofport) - self.dvr_agent.setup_dvr_flows_on_integ_tun_br() - # Notify the plugin of tunnel IP - if self.enable_tunneling and tunnel_sync: - LOG.info(_("Agent tunnel out of sync with plugin!")) - try: - tunnel_sync = self.tunnel_sync() - except Exception: - LOG.exception(_("Error while synchronizing tunnels")) - tunnel_sync = True +# if sync: +# LOG.info(_("Agent out of sync with plugin!")) +# ports.clear() +# ancillary_ports.clear() +# sync = False +# polling_manager.force_polling() + # if self._agent_has_updates(polling_manager) or ovs_restarted: if True: try: @@ -1684,32 +1125,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback, port_stats['regular']['removed'] = ( len(port_info.get('removed', []))) ports = port_info['current'] - # Treat ancillary devices if they exist - if self.ancillary_brs: - port_info = self.update_ancillary_ports( - ancillary_ports) - LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -" - "ancillary port info retrieved. " - "Elapsed:%(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - if port_info: - rc = self.process_ancillary_network_ports( - port_info) - LOG.debug(_("Agent rpc_loop - iteration:" - "%(iter_num)d - ancillary ports " - "processed. Elapsed:%(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - ancillary_ports = port_info['current'] - port_stats['ancillary']['added'] = ( - len(port_info.get('added', []))) - port_stats['ancillary']['removed'] = ( - len(port_info.get('removed', []))) - sync = sync | rc - - polling_manager.polling_completed() +# polling_manager.polling_completed() except Exception: LOG.exception(_("Error while processing VIF ports")) # Put the ports back in self.updated_port @@ -1735,11 +1152,14 @@ class OVSNeutronAgent(n_rpc.RpcCallback, def daemon_loop(self): with polling.get_polling_manager( - self.minimize_polling, - self.root_helper, - self.ovsdb_monitor_respawn_interval) as pm: - - self.rpc_loop(polling_manager=pm) + self.minimize_polling) as pm: + self.rpc_loop() +# with polling.get_polling_manager( +# self.minimize_polling, +# self.root_helper, +# self.ovsdb_monitor_respawn_interval) as pm: +# +# self.rpc_loop(polling_manager=pm) def _handle_sigterm(self, signum, frame): LOG.debug("Agent caught SIGTERM, quitting daemon loop.") diff --git a/neutronproxy/l2proxy/neutron/plugins/l2_proxy/common/config.py b/neutronproxy/l2proxy/neutron/plugins/l2_proxy/common/config.py index 3b0a1d4..74b329e 100644 --- a/neutronproxy/l2proxy/neutron/plugins/l2_proxy/common/config.py +++ b/neutronproxy/l2proxy/neutron/plugins/l2_proxy/common/config.py @@ -121,6 +121,11 @@ agent_opts = [ cfg.IntOpt('pagination_limit', default=-1, help=_("list ports pagination limit, default value is -1," "means no pagination")), + cfg.StrOpt('query_ports_mode', default='nova_proxy', + help=_("query ports mode, default value is nova_proxy," + "means query ports from nova_proxy")), + cfg.StrOpt('proxy_sock_path', default='/var/l2proxysock', + help=_("socket path when query ports from nova_proxy")), ]