Add Logical DB Abstraction layer API and OVSDB Implementation

The current abstract the NB API (distributed DB shared among
all compute nodes) and the SWITCH API (local switch API).
It also adds implementation of OVSDB to these API's

Change-Id: I57dff6d27ef620ce183b27012bafd11afdd56fcb
This commit is contained in:
Gal Sagie 2015-07-02 18:00:12 +03:00
parent 014f71b5c0
commit 267718ad16
7 changed files with 523 additions and 232 deletions

View File

@ -14,22 +14,19 @@
# under the License.
import eventlet
import sys
import time
import eventlet
from oslo_log import log
from ryu.base.app_manager import AppManager
from ryu.controller.ofp_handler import OFPHandler
from ovs.db import idl
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from dragonflow.controller.l2_app import L2App
from dragonflow.db.drivers import ovsdb_nb_impl, ovsdb_vswitch_impl
from oslo_log import log
#from dragonflow.db.drivers import etcd_nb_impl
LOG = log.getLogger(__name__)
@ -38,34 +35,26 @@ eventlet.monkey_patch()
class DfLocalController(object):
def __init__(self, chassis_name, ip, sb_db_ip):
def __init__(self, chassis_name, ip, remote_db_ip):
self.l3_app = None
self.l2_app = None
self.open_flow_app = None
self.next_network_id = 0
self.networks = {}
self.ports = {}
self.ovsdb_sb = None
self.ovsdb_local = None
self.idl = None
self.idl_sb = None
self.nb_api = None
self.vswitch_api = None
self.chassis_name = chassis_name
self.ip = ip
self.sb_db_ip = sb_db_ip
self.remote_db_ip = remote_db_ip
def run(self):
sb_db_connection = ('tcp:%s:6640' % self.sb_db_ip)
self.ovsdb_sb = connection.Connection(sb_db_connection,
10,
'OVN_Southbound')
local_connection = ('tcp:%s:6640' % self.ip)
self.ovsdb_local = connection.Connection(local_connection,
10,
'Open_vSwitch')
self.ovsdb_sb.start()
self.ovsdb_local.start()
self.idl_sb = self.ovsdb_sb.idl
self.idl = self.ovsdb_local.idl
self.nb_api = ovsdb_nb_impl.OvsdbNbApi(self.remote_db_ip)
#self.nb_api = etcd_nb_impl.EtcdNbApi()
self.nb_api.initialize()
self.vswitch_api = ovsdb_vswitch_impl.OvsdbSwitchApi(self.ip)
self.vswitch_api.initialize()
app_mgr = AppManager.get_instance()
self.open_flow_app = app_mgr.instantiate(OFPHandler, None, None)
self.open_flow_app.start()
@ -80,8 +69,8 @@ class DfLocalController(object):
def run_db_poll(self):
try:
self.idl.run()
self.idl_sb.run()
self.nb_api.sync()
self.vswitch_api.sync()
self.register_chassis()
@ -93,215 +82,93 @@ class DfLocalController(object):
except Exception:
pass
def clean_tables(self):
txn = idl.Transaction(self.idl_sb)
for chassis in self.idl_sb.tables['Chassis'].rows.values():
chassis.delete()
for encap in self.idl_sb.tables['Encap'].rows.values():
encap.delete()
for binding in self.idl_sb.tables['Binding'].rows.values():
binding.delete()
status = txn.commit_block()
return status
def register_chassis(self):
chassis = self.nb_api.get_chassis(self.chassis_name)
# TODO(gsagie) Support tunnel type change here ?
try:
chassis = idlutils.row_by_value(self.idl_sb,
'Chassis',
'name', self.chassis_name)
if chassis is not None:
# TODO(gsagie) Support tunnel type change here ?
return
except idlutils.RowNotFound:
txn = idl.Transaction(self.idl_sb)
encap_row = txn.insert(self.idl_sb.tables['Encap'])
encap_row.ip = self.ip
encap_row.type = 'geneve'
chassis_row = txn.insert(self.idl_sb.tables['Chassis'])
chassis_row.encaps = encap_row
chassis_row.name = self.chassis_name
status = txn.commit_block()
return status
if chassis is None:
self.nb_api.add_chassis(self.chassis_name,
self.ip,
'geneve')
def create_tunnels(self):
tunnel_ports = {}
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
t_ports = self.vswitch_api.get_tunnel_ports()
for t_port in t_ports:
tunnel_ports[t_port.get_chassis_id()] = t_port
for port in br_int.ports:
if 'df-chassis-id' in port.external_ids:
chassis_id = port.external_ids['df-chassis-id']
tunnel_ports[chassis_id] = port
for chassis in self.idl_sb.tables['Chassis'].rows.values():
if chassis.name in tunnel_ports:
# Chassis already set
del tunnel_ports[chassis.name]
elif chassis.name == self.chassis_name:
for chassis in self.nb_api.get_all_chassis():
if chassis.get_name() in tunnel_ports:
del tunnel_ports[chassis.get_name()]
elif chassis.get_name() == self.chassis_name:
pass
else:
encap = chassis.encaps[0]
self.tunnel_add(br_int, chassis, encap)
self.vswitch_api.add_tunnel_port(chassis)
# Iterate all tunnel ports that needs to be deleted
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
for port in tunnel_ports.values():
self.delete_bridge_port(br_int, port)
def tunnel_add(self, bridge, chassis, encap):
txn = idl.Transaction(self.idl)
port_name = "df-" + chassis.name
interface = txn.insert(self.idl.tables['Interface'])
interface.name = port_name
interface.type = encap.type
options_dict = getattr(interface, 'options', {})
options_dict['remote_ip'] = encap.ip
options_dict['key'] = 'flow'
interface.options = options_dict
port = txn.insert(self.idl.tables['Port'])
port.name = port_name
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
ifaces.append(interface)
port.interfaces = ifaces
external_ids_dict = getattr(interface, 'external_ids', {})
external_ids_dict['df-chassis-id'] = chassis.name
port.external_ids = external_ids_dict
bridge.verify('ports')
ports = getattr(bridge, 'ports', [])
ports.append(port)
bridge.ports = ports
status = txn.commit_block()
return status
def delete_bridge_port(self, bridge, port):
txn = idl.Transaction(self.idl)
bridge.verify('ports')
ports = bridge.ports
ports.remove(port)
bridge.ports = ports
# Remote Port Interfaces
port.verify('interfaces')
for iface in port.interfaces:
self.idl.tables['Interface'].rows[iface.uuid].delete()
self.idl.tables['Port'].rows[port.uuid].delete()
status = txn.commit_block()
return status
self.vswitch_api.delete_port(port)
def set_binding(self):
local_ports = self.get_local_ports()
txn = idl.Transaction(self.idl_sb)
local_ports = self.vswitch_api.get_local_port_ids()
self.nb_api.register_local_ports(self.chassis_name, local_ports)
chassis = idlutils.row_by_value(self.idl_sb,
'Chassis',
'name', self.chassis_name)
for binding in self.idl_sb.tables['Binding'].rows.values():
if binding.logical_port in local_ports:
if binding.chassis == self.chassis_name:
continue
# Bind this port to this chassis
binding.chassis = chassis
elif binding.chassis == self.chassis_name:
binding.chassis = []
status = txn.commit_block()
return status
# TODO(gsagie) refactor this method for smaller methods
def port_mappings(self):
lport_to_ofport = {}
chassis_to_ofport = {}
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
for port in br_int.ports:
if port.name == 'br-int':
continue
chassis_id = port.external_ids.get('df-chassis-id')
if chassis_id is not None and chassis_id == self.chassis_name:
continue
for interface in port.interfaces:
if interface.ofport is None:
continue
ofport = interface.ofport[0]
if ofport < 1 or ofport > 65533:
continue
if chassis_id is not None:
chassis_to_ofport[chassis_id] = ofport
else:
ifaceid = interface.external_ids.get('iface-id')
if ifaceid is not None:
lport_to_ofport[ifaceid] = ofport
chassis_to_ofport, lport_to_ofport = (
self.vswitch_api.get_local_ports_to_ofport_mapping())
ports_to_remove = set(self.ports.keys())
for binding in self.idl_sb.tables['Binding'].rows.values():
if not binding.chassis:
continue
logical_port = binding.logical_port
mac_address = binding.mac[0]
chassis = binding.chassis[0]
ldp = str(binding.logical_datapath)
network = self.get_network_id(ldp)
tunnel_key = binding.tunnel_key
if chassis.name == self.chassis_name:
ofport = lport_to_ofport.get(logical_port, 0)
for lport in self.nb_api.get_all_logical_ports():
network = self.get_network_id(lport.get_network_id())
lport.set_external_value('local_network_id', network)
if lport.get_chassis() == self.chassis_name:
ofport = lport_to_ofport.get(lport.get_id(), 0)
if ofport != 0:
port = self._create_port_dict(logical_port,
mac_address,
network,
ofport,
tunnel_key, True)
self.ports[logical_port] = port
if logical_port in ports_to_remove:
ports_to_remove.remove(logical_port)
self.l2_app.add_local_port(logical_port,
mac_address,
lport.set_external_value('ofport', ofport)
lport.set_external_value('is_local', True)
self.ports[lport.get_id()] = lport
if lport.get_id() in ports_to_remove:
ports_to_remove.remove(lport.get_id())
self.l2_app.add_local_port(lport.get_id(),
lport.get_mac(),
network,
ofport,
tunnel_key)
lport.get_tunnel_key())
else:
ofport = chassis_to_ofport.get(chassis.name, 0)
ofport = chassis_to_ofport.get(lport.get_chassis(), 0)
if ofport != 0:
port = self._create_port_dict(logical_port,
mac_address,
network,
ofport,
tunnel_key, False)
self.ports[logical_port] = port
if logical_port in ports_to_remove:
ports_to_remove.remove(logical_port)
self.l2_app.add_remote_port(logical_port,
mac_address,
lport.set_external_value('ofport', ofport)
lport.set_external_value('is_local', False)
self.ports[lport.get_id()] = lport
if lport.get_id() in ports_to_remove:
ports_to_remove.remove(lport.get_id())
self.l2_app.add_remote_port(lport.get_id(),
lport.get_mac(),
network,
ofport,
tunnel_key)
lport.get_tunnel_key())
# TODO(gsagie) use port dictionary in all methods in l2 app
# and here instead of always moving all arguments
for port_to_remove in ports_to_remove:
p = self.ports[port_to_remove]
if p['is_local']:
self.l2_app.remove_local_port(p['lport_id'],
p['mac'],
p['network_id'],
p['ofport'],
p['tunnel_key'])
if p.get_external_value('is_local'):
self.l2_app.remove_local_port(p.get_id(),
p.get_mac(),
p.get_external_value(
'local_network_id'),
p.get_external_value(
'ofport'),
p.get_tunnel_key())
del self.ports[port_to_remove]
else:
self.l2_app.remove_remote_port(p['lport_id'],
p['mac'],
p['network_id'],
p['tunnel_key'])
self.l2_app.remove_remote_port(p.get_id(),
p.get_mac(),
p.get_external_value(
'local_network_id'),
p.get_tunnel_key())
del self.ports[port_to_remove]
def get_network_id(self, logical_dp_id):
@ -313,31 +180,6 @@ class DfLocalController(object):
# TODO(gsagie) verify self.next_network_id didnt wrap
self.networks[logical_dp_id] = self.next_network_id
def get_local_ports(self):
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
port_ids = set()
for port in br_int.ports:
if port.name == 'br-int':
continue
if 'df-chassis-id' in port.external_ids:
continue
for interface in port.interfaces:
if 'iface-id' in interface.external_ids:
port_ids.add(interface.external_ids['iface-id'])
return port_ids
def _create_port_dict(self, lport_id, mac, network_id, ofport,
tunnel_key, is_local):
port = {'lport_id': lport_id,
'mac': mac,
'network_id': network_id,
'ofport': ofport,
'tunnel_key': tunnel_key,
'is_local': is_local}
return port
# Run this application like this:
# python df_local_controller.py <chassis_unique_name>
@ -345,8 +187,8 @@ class DfLocalController(object):
def main():
chassis_name = sys.argv[1] # unique name 'df_chassis'
ip = sys.argv[2] # local ip '10.100.100.4'
sb_db_ip = sys.argv[3] # remote SB DB IP '10.100.100.4'
controller = DfLocalController(chassis_name, ip, sb_db_ip)
remote_db_ip = sys.argv[3] # remote SB DB IP '10.100.100.4'
controller = DfLocalController(chassis_name, ip, remote_db_ip)
controller.run()
if __name__ == "__main__":

View File

75
dragonflow/db/api_nb.py Normal file
View File

@ -0,0 +1,75 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class NbApi(object):
def initialize(self):
pass
def sync(self):
pass
def get_chassis(self, name):
pass
def get_all_chassis(self):
pass
def add_chassis(self, name, ip, tunnel_type):
pass
def register_local_ports(self, chassis, local_ports_id):
pass
def get_all_logical_ports(self):
pass
class Chassis(object):
def get_name(self):
pass
def get_ip(self):
pass
def get_encap_type(self):
pass
class LogicalPort(object):
def get_id(self):
pass
def get_mac(self):
pass
def get_chassis(self):
pass
def get_network_id(self):
pass
def get_tunnel_key(self):
pass
def set_external_value(self, key, value):
pass
def get_external_value(self, key):
pass

View File

@ -0,0 +1,54 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class SwitchApi(object):
def sync(self):
pass
def get_tunnel_ports(self):
pass
def add_tunnel_port(self, chassis):
pass
def delete_port(self, switch_port):
pass
def get_logical_ports_to_ofport(self):
pass
def get_chassis_ids_to_ofport(self):
pass
def get_local_port_ids(self):
pass
class SwitchPort(object):
def get_name(self):
pass
def get_id(self):
pass
class TunnelPort(SwitchPort):
def get_chassis_id(self):
pass

View File

View File

@ -0,0 +1,150 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from dragonflow.db import api_nb
from ovs.db import idl
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
class OvsdbNbApi(api_nb.NbApi):
def __init__(self, ip, protocol='tcp', port='6640', timeout=10):
super(OvsdbNbApi, self).__init__()
self.ip = ip
self.db_name = 'OVN_Southbound'
self.protocol = protocol
self.port = port
self.timeout = timeout
self.ovsdb = None
self.idl = None
def initialize(self):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
self.ovsdb = connection.Connection(db_connection,
self.timeout,
self.db_name)
self.ovsdb.start()
self.idl = self.ovsdb.idl
def sync(self):
self.idl.run()
def get_chassis(self, name):
try:
chassis = idlutils.row_by_value(self.idl,
'Chassis',
'name', name)
return OvsdbChassis(chassis)
except idlutils.RowNotFound:
return None
def get_all_chassis(self):
res = []
for chassis in self.idl.tables['Chassis'].rows.values():
res.append(OvsdbChassis(chassis))
return res
def add_chassis(self, name, ip, tunnel_type):
txn = idl.Transaction(self.idl)
encap_row = txn.insert(self.idl.tables['Encap'])
encap_row.ip = ip
encap_row.type = tunnel_type
chassis_row = txn.insert(self.idl.tables['Chassis'])
chassis_row.encaps = encap_row
chassis_row.name = name
status = txn.commit_block()
return status
def register_local_ports(self, chassis_name, local_ports_ids):
txn = idl.Transaction(self.idl)
chassis = idlutils.row_by_value(self.idl,
'Chassis',
'name', chassis_name)
for binding in self.idl.tables['Binding'].rows.values():
if binding.logical_port in local_ports_ids:
if binding.chassis == chassis_name:
continue
# Bind this port to this chassis
binding.chassis = chassis
elif binding.chassis == chassis_name:
binding.chassis = []
status = txn.commit_block()
return status
def get_all_logical_ports(self):
res = []
for binding in self.idl.tables['Binding'].rows.values():
if not binding.chassis:
continue
res.append(OvsdbLogicalPort(binding))
return res
class OvsdbChassis(api_nb.Chassis):
def __init__(self, row):
self.chassis_row = row
def get_name(self):
return self.chassis_row.name
def get_ip(self):
encap = self.chassis_row.encaps[0]
return encap.ip
def get_encap_type(self):
encap = self.chassis_row.encaps[0]
return encap.type
class OvsdbLogicalPort(api_nb.LogicalPort):
def __init__(self, row):
self.id = row.logical_port
self.mac = row.mac[0]
self.chassis = row.chassis[0].name
self.network_id = str(row.logical_datapath)
self.tunnel_key = row.tunnel_key
self.external_dict = {}
def get_id(self):
return self.id
def get_mac(self):
return self.mac
def get_chassis(self):
return self.chassis
def get_network_id(self):
return self.network_id
def get_tunnel_key(self):
return self.tunnel_key
def set_external_value(self, key, value):
self.external_dict[key] = value
def get_external_value(self, key):
return self.external_dict.get(key)

View File

@ -0,0 +1,170 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from dragonflow.db import api_vswitch
from ovs.db import idl
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
class OvsdbSwitchApi(api_vswitch.SwitchApi):
def __init__(self, ip, protocol='tcp', port='6640', timeout=10):
super(OvsdbSwitchApi, self).__init__()
self.ip = ip
self.db_name = 'Open_vSwitch'
self.protocol = protocol
self.port = port
self.timeout = timeout
self.ovsdb = None
self.idl = None
def initialize(self):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
self.ovsdb = connection.Connection(db_connection,
self.timeout,
self.db_name)
self.ovsdb.start()
self.idl = self.ovsdb.idl
def sync(self):
self.idl.run()
def get_tunnel_ports(self):
res = []
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
for port in br_int.ports:
if 'df-chassis-id' in port.external_ids:
chassis_id = port.external_ids['df-chassis-id']
res.append(OvsdbTunnelPort(port, chassis_id))
return res
def add_tunnel_port(self, chassis):
bridge = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
txn = idl.Transaction(self.idl)
port_name = "df-" + chassis.get_name()
interface = txn.insert(self.idl.tables['Interface'])
interface.name = port_name
interface.type = chassis.get_encap_type()
options_dict = getattr(interface, 'options', {})
options_dict['remote_ip'] = chassis.get_ip()
options_dict['key'] = 'flow'
interface.options = options_dict
port = txn.insert(self.idl.tables['Port'])
port.name = port_name
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
ifaces.append(interface)
port.interfaces = ifaces
external_ids_dict = getattr(interface, 'external_ids', {})
external_ids_dict['df-chassis-id'] = chassis.get_name()
port.external_ids = external_ids_dict
bridge.verify('ports')
ports = getattr(bridge, 'ports', [])
ports.append(port)
bridge.ports = ports
status = txn.commit_block()
return status
def delete_port(self, switch_port):
port = switch_port.port_row
bridge = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
txn = idl.Transaction(self.idl)
bridge.verify('ports')
ports = bridge.ports
ports.remove(port)
bridge.ports = ports
# Remote Port Interfaces
port.verify('interfaces')
for iface in port.interfaces:
self.idl.tables['Interface'].rows[iface.uuid].delete()
self.idl.tables['Port'].rows[port.uuid].delete()
status = txn.commit_block()
return status
def get_local_port_ids(self):
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
port_ids = set()
for port in br_int.ports:
if port.name == 'br-int':
continue
if 'df-chassis-id' in port.external_ids:
continue
for interface in port.interfaces:
if 'iface-id' in interface.external_ids:
port_ids.add(interface.external_ids['iface-id'])
return port_ids
def get_local_ports_to_ofport_mapping(self):
lport_to_ofport = {}
chassis_to_ofport = {}
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
for port in br_int.ports:
if port.name == 'br-int':
continue
chassis_id = port.external_ids.get('df-chassis-id')
for interface in port.interfaces:
if interface.ofport is None:
# TODO(gsagie) log error
continue
ofport = interface.ofport[0]
if ofport < 1 or ofport > 65533:
# TODO(gsagie) log error
continue
if chassis_id is not None:
chassis_to_ofport[chassis_id] = ofport
else:
ifaceid = interface.external_ids.get('iface-id')
if ifaceid is not None:
lport_to_ofport[ifaceid] = ofport
return chassis_to_ofport, lport_to_ofport
class OvsdbSwitchPort(api_vswitch.SwitchPort):
def __init__(self, row):
self.port_row = row
def get_name(self):
return self.port_row.name
def get_id(self):
pass
class OvsdbTunnelPort(OvsdbSwitchPort):
def __init__(self, row, chassis_id):
super(OvsdbTunnelPort, self).__init__(row)
self.chassis_id = chassis_id
def get_chassis_id(self):
return self.chassis_id