From 1121807b8528590b20e1109dbdff400fccec3c38 Mon Sep 17 00:00:00 2001 From: Dan Wendlandt Date: Tue, 15 May 2012 12:41:52 -0700 Subject: [PATCH] OVS plugin: add tunnel ips to central database bp simplify-ovs-tunnel-mgmt Makes the OVS plugin agent report its own IP address to the centralized quantum database, and build its set of tunnels based on the contents of this centralized database. This removes the need for a removes need for a 'remote-ips' file on each compute node that needs to be updated when a new host is added. Also: - simplifies error handling within tunnel manager daemon_loop - fixes issues with operational status not working for tunnel-mode - fixes issue that not-stripping vlan of tunneled packet potentially crashes OVS, causing flows to get wiped. Change-Id: I4d285669e29beecf745fe620581fa6bc332a446c --- .../openvswitch/ovs_quantum_plugin.ini | 10 +- .../openvswitch/agent/ovs_quantum_agent.py | 328 +++++++++--------- quantum/plugins/openvswitch/ovs_models.py | 13 + .../openvswitch/tests/unit/remote-ip-file.txt | 0 .../openvswitch/tests/unit/test_tunnel.py | 41 --- 5 files changed, 186 insertions(+), 206 deletions(-) delete mode 100644 quantum/plugins/openvswitch/tests/unit/remote-ip-file.txt diff --git a/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini b/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini index 2cb39b69c..7ca451abd 100644 --- a/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini +++ b/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini @@ -20,15 +20,9 @@ enable-tunneling = False # connectivity. integration-bridge = br-int -# Uncomment this line if enable-tunneling is True above. +# Only used if enable-tunneling (above) is True. # In most cases, the default value should be fine. -# tunnel-bridge = br-tun - -# Uncomment this line if enable-tunneling is True above. -# This file contains a list of IP addresses (one per line) that point to -# hypervisors to which tunnels should be connected. It is best to use -# an absolute path to this file. -# remote-ip-file = /opt/stack/remote-ips.txt +tunnel-bridge = br-tun # Uncomment this line if enable-tunneling is True above. # Set local-ip to be the local IP address of this hypervisor. diff --git a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py index 8d62758aa..d7ef94a0a 100755 --- a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py +++ b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py @@ -28,7 +28,10 @@ import subprocess import sys import time -from sqlalchemy.ext.sqlsoup import SqlSoup +import sqlalchemy +from sqlalchemy.ext import sqlsoup + +from quantum.plugins.openvswitch import ovs_models logging.basicConfig() LOG = logging.getLogger(__name__) @@ -100,6 +103,10 @@ class OVSBridge: full_args = ["ovs-ofctl", cmd, self.br_name] + args return self.run_cmd(full_args) + def count_flows(self): + flow_list = self.run_ofctl("dump-flows", []).split("\n")[1:] + return len(flow_list) - 1 + def remove_all_flows(self): self.run_ofctl("del-flows", []) @@ -214,6 +221,36 @@ class LocalVLANMapping: return "lv-id = %s ls-id = %s" % (self.vlan, self.lsw_id) +class Port(object): + '''class stores port data in an ORM-free way, + so attributes are still available even if a + row has been deleted. + ''' + + def __init__(self, p): + self.uuid = p.uuid + self.network_id = p.network_id + self.interface_id = p.interface_id + self.state = p.state + self.op_status = p.op_status + + def __eq__(self, other): + '''compare only fields that will cause us to re-wire + ''' + try: + return (self and other + and self.interface_id == other.interface_id + and self.state == other.state) + except: + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.uuid) + + class OVSQuantumAgent(object): def __init__(self, integ_br, root_helper, @@ -239,6 +276,10 @@ class OVSQuantumAgent(object): self.int_br.add_flow(priority=1, actions="normal") def daemon_loop(self, db_connection_url): + '''Main processing loop for Non-Tunneling Agent. + + :param options: database information - in the event need to reconnect + ''' self.local_vlan_map = {} old_local_bindings = {} old_vif_ports = {} @@ -247,7 +288,7 @@ class OVSQuantumAgent(object): while True: if not db_connected: time.sleep(self.reconnect_interval) - db = SqlSoup(db_connection_url) + db = sqlsoup.SqlSoup(db_connection_url) db_connected = True LOG.info("Connecting to database \"%s\" on %s" % (db.engine.url.database, db.engine.url.host)) @@ -358,25 +399,30 @@ class OVSQuantumTunnelAgent(object): # Upper bound on available vlans. MAX_VLAN_TAG = 4094 - def __init__(self, integ_br, tun_br, remote_ip_file, local_ip, - root_helper, polling_interval, reconnect_interval): + def __init__(self, integ_br, tun_br, local_ip, root_helper, + polling_interval, reconnect_interval): '''Constructor. :param integ_br: name of the integration bridge. :param tun_br: name of the tunnel bridge. - :param remote_ip_file: name of file containing list of hypervisor IPs. - :param local_ip: local IP address of this hypervisor.''' + :param local_ip: local IP address of this hypervisor. + :param root_helper: utility to use when running shell cmds. + :param polling_interval: interval (secs) to poll DB. + :param reconnect_internal: retry interval (secs) on DB error.''' self.root_helper = root_helper self.available_local_vlans = set( xrange(OVSQuantumTunnelAgent.MIN_VLAN_TAG, OVSQuantumTunnelAgent.MAX_VLAN_TAG)) self.setup_integration_br(integ_br) self.local_vlan_map = {} - self.setup_tunnel_br(tun_br, remote_ip_file, local_ip) - self.db_connected = False + self.polling_interval = polling_interval self.reconnect_interval = reconnect_interval + self.local_ip = local_ip + self.tunnel_count = 0 + self.setup_tunnel_br(tun_br) + def provision_local_vlan(self, net_uuid, lsw_id): '''Provisions a local VLAN. @@ -391,8 +437,8 @@ class OVSQuantumTunnelAgent(object): # outbound self.tun_br.add_flow(priority=4, match="in_port=%s,dl_vlan=%s" % (self.patch_int_ofport, lvid), - actions="set_tunnel:%s,normal" % (lsw_id)) - + actions="strip_vlan,set_tunnel:%s,normal" % + (lsw_id)) # inbound self.tun_br.add_flow(priority=3, match="tun_id=%s" % lsw_id, actions="mod_vlan_vid:%s,output:%s" % @@ -471,184 +517,159 @@ class OVSQuantumTunnelAgent(object): # switch all traffic using L2 learning self.int_br.add_flow(priority=1, actions="normal") - def setup_tunnel_br(self, tun_br, remote_ip_file, local_ip): + def setup_tunnel_br(self, tun_br): '''Setup the tunnel bridge. - Reads in list of IP addresses. Creates GRE tunnels to each of these - addresses and then clears out existing flows. local_ip is the address - of the local node. A tunnel is not created to this IP address. + Creates tunnel bridge, and links it to the integration bridge + using a patch port. - :param tun_br: the name of the tunnel bridge. - :param remote_ip_file: path to file that contains list of destination - IP addresses. - :param local_ip: the ip address of this node.''' + :param tun_br: the name of the tunnel bridge.''' self.tun_br = OVSBridge(tun_br, self.root_helper) self.tun_br.reset_bridge() self.patch_int_ofport = self.tun_br.add_patch_port("patch-int", "patch-tun") - try: - with open(remote_ip_file, 'r') as f: - remote_ip_list = f.readlines() - clean_ips = (x.rstrip() for x in remote_ip_list) - tunnel_ips = (x for x in clean_ips if x != local_ip and x) - for i, remote_ip in enumerate(tunnel_ips): - self.tun_br.add_tunnel_port("gre-" + str(i), remote_ip) - except Exception as e: - LOG.error("Error configuring tunnels: '%s' %s" % - (remote_ip_file, str(e))) - raise - self.tun_br.remove_all_flows() - # default drop self.tun_br.add_flow(priority=1, actions="drop") - def get_db_port_bindings(self, db): - '''Get database port bindings from central Quantum database. + def manage_tunnels(self, tunnel_ips, old_tunnel_ips, db): + if self.local_ip in tunnel_ips: + tunnel_ips.remove(self.local_ip) + else: + db.tunnel_ips.insert(ip_address=self.local_ip) - The central quantum database 'ovs_quantum' resides on the openstack - mysql server. + new_tunnel_ips = tunnel_ips - old_tunnel_ips + if new_tunnel_ips: + LOG.info("adding tunnels to: %s" % new_tunnel_ips) + for ip in new_tunnel_ips: + tun_name = "gre-" + str(self.tunnel_count) + self.tun_br.add_tunnel_port(tun_name, ip) + self.tunnel_count += 1 - :returns: a dictionary containing port bindings.''' - ports = [] - try: - ports = db.ports.all() - except Exceptioni as e: - LOG.info("Unable to get port bindings! Exception: %s" % e) - self.db_connected = False - return {} + # adding new ports can void flows, so reset flows + self.tun_br.remove_all_flows() + self.tun_br.add_flow(priority=1, actions="drop") + for lv_ojb in self.local_vlan_map.values(): + self.add_tun_br_flows_for_local_vlan(lv_obj) - return dict([(port.interface_id, port) for port in ports]) - - def get_db_vlan_bindings(self, db): - '''Get database vlan bindings from central Quantum database. - - The central quantum database 'ovs_quantum' resides on the openstack - mysql server. - - :returns: a dictionary containing vlan bindings.''' - lsw_id_binds = [] - try: - lsw_id_binds.extend(db.vlan_bindings.all()) - except Exception as e: - LOG.info("Unable to get vlan bindings! Exception: %s" % e) - self.db_connected = False - return {} - - return dict([(bind.network_id, bind.vlan_id) - for bind in lsw_id_binds]) + def rollback_until_success(self, db): + while True: + time.sleep(self.reconnect_interval) + try: + db.rollback() + break + except: + LOG.exception("Problem connecting to database") def daemon_loop(self, db_connection_url): - '''Main processing loop (not currently used). + '''Main processing loop for Tunneling Agent. :param options: database information - in the event need to reconnect ''' old_local_bindings = {} old_vif_ports = {} - self.db_connected = False + old_tunnel_ips = set() + + db = sqlsoup.SqlSoup(db_connection_url) + LOG.info("Connecting to database \"%s\" on %s" % + (db.engine.url.database, db.engine.url.host)) while True: - if not self.db_connected: - time.sleep(self.reconnect_interval) - db = SqlSoup(db_connection_url) - self.db_connected = True - LOG.info("Connecting to database \"%s\" on %s" % - (db.engine.url.database, db.engine.url.host)) + try: + all_bindings = dict((p.interface_id, Port(p)) + for p in db.ports.all()) + all_bindings_vif_port_ids = set(all_bindings) + lsw_id_bindings = dict((bind.network_id, bind.vlan_id) + for bind in db.vlan_bindings.all()) - # Get bindings from db. - all_bindings = self.get_db_port_bindings(db) - if not self.db_connected: - continue - all_bindings_vif_port_ids = set(all_bindings.keys()) - lsw_id_bindings = self.get_db_vlan_bindings(db) - if not self.db_connected: - continue + tunnel_ips = set(x.ip_address for x in db.tunnel_ips.all()) + self.manage_tunnels(tunnel_ips, old_tunnel_ips, db) - # Get bindings from OVS bridge. - vif_ports = self.int_br.get_vif_ports() - new_vif_ports = dict([(p.vif_id, p) for p in vif_ports]) - new_vif_ports_ids = set(new_vif_ports.keys()) + # Get bindings from OVS bridge. + vif_ports = self.int_br.get_vif_ports() + new_vif_ports = dict([(p.vif_id, p) for p in vif_ports]) + new_vif_ports_ids = set(new_vif_ports.keys()) - old_vif_ports_ids = set(old_vif_ports.keys()) - dead_vif_ports_ids = new_vif_ports_ids - all_bindings_vif_port_ids - dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids] - disappeared_vif_ports_ids = old_vif_ports_ids - new_vif_ports_ids - new_local_bindings_ids = all_bindings_vif_port_ids.intersection( - new_vif_ports_ids) - new_local_bindings = dict([(p, all_bindings.get(p)) - for p in new_vif_ports_ids]) - new_bindings = set( - (p, old_local_bindings.get(p), - new_local_bindings.get(p)) for p in new_vif_ports_ids) - changed_bindings = set([b for b in new_bindings if b[2] != b[1]]) + old_vif_ports_ids = set(old_vif_ports.keys()) + dead_vif_ports_ids = (new_vif_ports_ids - + all_bindings_vif_port_ids) + dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids] + disappeared_vif_ports_ids = (old_vif_ports_ids - + new_vif_ports_ids) + new_local_bindings_ids = (all_bindings_vif_port_ids. + intersection(new_vif_ports_ids)) + new_local_bindings = dict([(p, all_bindings.get(p)) + for p in new_vif_ports_ids]) + new_bindings = set( + (p, old_local_bindings.get(p), + new_local_bindings.get(p)) for p in new_vif_ports_ids) + changed_bindings = set([b for b in new_bindings + if b[2] != b[1]]) - LOG.debug('all_bindings: %s' % all_bindings) - LOG.debug('lsw_id_bindings: %s' % lsw_id_bindings) - LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids) - LOG.debug('dead_vif_ports_ids: %s' % dead_vif_ports_ids) - LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids) - LOG.debug('new_local_bindings_ids: %s' % new_local_bindings_ids) - LOG.debug('new_local_bindings: %s' % new_local_bindings) - LOG.debug('new_bindings: %s' % new_bindings) - LOG.debug('changed_bindings: %s' % changed_bindings) + LOG.debug('all_bindings: %s', all_bindings) + LOG.debug('lsw_id_bindings: %s', lsw_id_bindings) + LOG.debug('new_vif_ports_ids: %s', new_vif_ports_ids) + LOG.debug('dead_vif_ports_ids: %s', dead_vif_ports_ids) + LOG.debug('old_vif_ports_ids: %s', old_vif_ports_ids) + LOG.debug('new_local_bindings_ids: %s', + new_local_bindings_ids) + LOG.debug('new_local_bindings: %s', new_local_bindings) + LOG.debug('new_bindings: %s', new_bindings) + LOG.debug('changed_bindings: %s', changed_bindings) - # Take action. - for p in dead_vif_ports: - LOG.info("No quantum binding for port " + str(p) - + "putting on dead vlan") - self.port_dead(p) + # Take action. + for p in dead_vif_ports: + LOG.info("No quantum binding for port " + str(p) + + "putting on dead vlan") + self.port_dead(p) - for b in changed_bindings: - port_id, old_port, new_port = b - p = new_vif_ports[port_id] - if old_port: - old_net_uuid = old_port.network_id - LOG.info("Removing binding to net-id = " + - old_net_uuid + " for " + str(p) - + " added to dead vlan") - self.port_unbound(p, old_net_uuid) - if not new_port: - self.port_dead(p) + for b in changed_bindings: + port_id, old_port, new_port = b + p = new_vif_ports[port_id] + if old_port: + old_net_uuid = old_port.network_id + LOG.info("Removing binding to net-id = " + + old_net_uuid + " for " + str(p) + + " added to dead vlan") + self.port_unbound(p, old_net_uuid) + all_bindings[p.vif_id].op_status = OP_STATUS_DOWN + if not new_port: + self.port_dead(p) - if new_port: - new_net_uuid = new_port.network_id - if new_net_uuid not in lsw_id_bindings: - LOG.warn("No ls-id binding found for net-id '%s'" % - new_net_uuid) - continue + if new_port: + new_net_uuid = new_port.network_id + if new_net_uuid not in lsw_id_bindings: + LOG.warn("No ls-id binding found for net-id '%s'" % + new_net_uuid) + continue - lsw_id = lsw_id_bindings[new_net_uuid] - try: + lsw_id = lsw_id_bindings[new_net_uuid] self.port_bound(p, new_net_uuid, lsw_id) + all_bindings[p.vif_id].op_status = OP_STATUS_UP LOG.info("Port " + str(p) + " on net-id = " + new_net_uuid + " bound to " + str(self.local_vlan_map[new_net_uuid])) - except Exception as e: - LOG.info("Unable to bind Port " + str(p) + - " on netid = " + new_net_uuid + " to " - + str(self.local_vlan_map[new_net_uuid])) - for vif_id in disappeared_vif_ports_ids: - LOG.info("Port Disappeared: " + vif_id) - old_port = old_local_bindings.get(vif_id) - if old_port: - try: + for vif_id in disappeared_vif_ports_ids: + LOG.info("Port Disappeared: " + vif_id) + if vif_id in all_bindings: + all_bindings[vif_id].op_status = OP_STATUS_DOWN + old_port = old_local_bindings.get(vif_id) + if old_port: self.port_unbound(old_vif_ports[vif_id], old_port.network_id) - except Exception: - LOG.info("Unable to unbind Port " + str(p) + - " on net-id = " + old_port.network_uuid) - - old_vif_ports = new_vif_ports - old_local_bindings = new_local_bindings - try: + # commit any DB changes and expire + # data loaded from the database db.commit() - except Exception as e: - LOG.info("Unable to commit to database! Exception: %s" % e) - db.rollback() - old_local_bindings = {} - old_vif_ports = {} - time.sleep(self.polling_interval) + # sleep and re-initialize state for next pass + time.sleep(self.polling_interval) + old_tunnel_ips = tunnel_ips + old_vif_ports = new_vif_ports + old_local_bindings = new_local_bindings + + except: + LOG.exception("Main-loop Exception:") + self.rollback_until_success(db) def main(): @@ -721,12 +742,6 @@ def main(): raise Exception('Empty tunnel-bridge in configuration file.') # Mandatory parameter. - remote_ip_file = config.get("OVS", "remote-ip-file") - if not len(remote_ip_file): - raise Exception('Empty remote-ip-file in configuration file.') - - # Mandatory parameter. - remote_ip_file = config.get("OVS", "remote-ip-file") local_ip = config.get("OVS", "local-ip") if not len(local_ip): raise Exception('Empty local-ip in configuration file.') @@ -736,8 +751,7 @@ def main(): (config_file, str(e))) sys.exit(1) - plugin = OVSQuantumTunnelAgent(integ_br, tun_br, remote_ip_file, - local_ip, root_helper, + plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper, polling_interval, reconnect_interval) else: # Get parameters for OVSQuantumAgent. diff --git a/quantum/plugins/openvswitch/ovs_models.py b/quantum/plugins/openvswitch/ovs_models.py index 91edafd51..580402436 100644 --- a/quantum/plugins/openvswitch/ovs_models.py +++ b/quantum/plugins/openvswitch/ovs_models.py @@ -40,3 +40,16 @@ class VlanBinding(BASE): def __repr__(self): return "" % \ (self.vlan_id, self.network_id) + + +class TunnelIP(BASE): + """Represents a remote IP in tunnel mode""" + __tablename__ = 'tunnel_ips' + + ip_address = Column(String(255), primary_key=True) + + def __init__(self, ip_address): + self.ip_address = ip_address + + def __repr__(self): + return "" % (self.ip_address) diff --git a/quantum/plugins/openvswitch/tests/unit/remote-ip-file.txt b/quantum/plugins/openvswitch/tests/unit/remote-ip-file.txt deleted file mode 100644 index e69de29bb..000000000 diff --git a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py index 061f7c02d..e1d70c47f 100644 --- a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py +++ b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py @@ -24,9 +24,6 @@ import mox from quantum.plugins.openvswitch.agent import ovs_quantum_agent -LOCAL_DIR = os.path.dirname(__file__) -REMOTE_IP_FILE = LOCAL_DIR + '/remote-ip-file.txt' - # Useful global dummy variables. NET_UUID = '3faeebfe-5d37-11e1-a64b-000c29d5f0a7' LS_ID = '42' @@ -53,7 +50,6 @@ class DummyVlanBinding: class TunnelTest(unittest.TestCase): def setUp(self): - print LOCAL_DIR self.mox = mox.Mox() self.INT_BRIDGE = 'integration_bridge' @@ -86,7 +82,6 @@ class TunnelTest(unittest.TestCase): b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) self.mox.VerifyAll() @@ -106,7 +101,6 @@ class TunnelTest(unittest.TestCase): a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) @@ -123,7 +117,6 @@ class TunnelTest(unittest.TestCase): self.mox.ReplayAll() a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) a.available_local_vlans = set() @@ -140,7 +133,6 @@ class TunnelTest(unittest.TestCase): self.mox.ReplayAll() a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) a.local_vlan_map[NET_UUID] = LVM @@ -151,7 +143,6 @@ class TunnelTest(unittest.TestCase): self.mox.ReplayAll() a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) @@ -170,41 +161,9 @@ class TunnelTest(unittest.TestCase): self.mox.ReplayAll() a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, self.TUN_BRIDGE, - REMOTE_IP_FILE, '10.0.0.1', 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) a.local_vlan_map[NET_UUID] = LVM a.port_dead(VIF_PORT) self.mox.VerifyAll() - - def testDbBindings(self): - db = self.mox.CreateMockAnything() - db.ports = self.mox.CreateMockAnything() - interface_ids = ['interface-id-%d' % x for x in range(3)] - db.ports.all().AndReturn([DummyPort(x) for x in interface_ids]) - - db.vlan_bindings = self.mox.CreateMockAnything() - vlan_bindings = [ - ['network-id-%d' % x, 'vlan-id-%d' % x] for x in range(3)] - db.vlan_bindings.all().AndReturn( - [DummyVlanBinding(*x) for x in vlan_bindings]) - - self.mox.ReplayAll() - a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE, - self.TUN_BRIDGE, - REMOTE_IP_FILE, - '10.0.0.1', - 'sudo', 2, 2) - - all_bindings = a.get_db_port_bindings(db) - lsw_id_bindings = a.get_db_vlan_bindings(db) - - for interface_id, port in all_bindings.iteritems(): - self.assertTrue(interface_id in interface_ids) - - for network_id, vlan_id in lsw_id_bindings.iteritems(): - self.assertTrue(network_id in [x[0] for x in vlan_bindings]) - self.assertTrue(vlan_id in [x[1] for x in vlan_bindings]) - - self.mox.VerifyAll()