Refactor ovsdb_vswitch_impl.py

The ovsdb_vswitch_impl.py contains everything about ovs, for example,
commands, connection, idl. This fact makes the file hard to maintain.

This patch separates ovsdb_vswitch_impl.py into several files

ovsdb/commands.py:The customized ovs db command for df
ovsdb/impl_idl.py: The classes to connect ovs db
ovsdb/objects.py: The customized ovs object model for df
ovsdb/vswitch_impl.py: The interface to manage ovs

Besides, some obselete code is removed.

Change-Id: Ie359268a2e72fe1f57b1a2d9dc7fad4f388e0a38
Related-Bug: #1622879
This commit is contained in:
Hong Hui Xiao 2016-09-25 22:48:07 +08:00
parent ba0ad9a121
commit d17ae9705a
8 changed files with 432 additions and 458 deletions

View File

@ -34,7 +34,7 @@ from dragonflow.controller import ryu_base_app
from dragonflow.controller import topology
from dragonflow.db import api_nb
from dragonflow.db import db_store
from dragonflow.db.drivers import ovsdb_vswitch_impl
from dragonflow.ovsdb import vswitch_impl
df_ryu_opts = [
@ -66,7 +66,7 @@ class DfLocalController(object):
self.nb_api = api_nb.NbApi(
nb_driver,
use_pubsub=cfg.CONF.df.enable_df_pub_sub)
self.vswitch_api = ovsdb_vswitch_impl.OvsdbSwitchApi(self.ip)
self.vswitch_api = vswitch_impl.OvsApi(self.ip)
kwargs = dict(
nb_api=self.nb_api,
vswitch_api=self.vswitch_api,
@ -89,7 +89,6 @@ class DfLocalController(object):
self.topology = topology.Topology(self,
self.enable_selective_topo_dist)
self.vswitch_api.sync()
# both set_controller and del_controller will delete flows.
# for reliability, here we should check if controller is set for OVS,
# if yes, don't set controller and don't delete controller.

View File

@ -1,404 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.ovsdb import impl_idl
from neutron.agent.ovsdb.native import commands
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import helpers
from neutron.agent.ovsdb.native import idlutils
from oslo_config import cfg
from oslo_log import log
from ovs.db import idl
from ovs import poller
from ovs import vlog
import retrying
import six
import threading
from dragonflow._i18n import _LW
from dragonflow.common import constants
from dragonflow.db import api_vswitch
LOG = log.getLogger(__name__)
OFPORT_RANGE_MIN = 1
OFPORT_RANGE_MAX = 65533
ovsdb_monitor_table_filter_default = {
'Interface': [
'ofport',
'name',
'admin_state',
'type',
'external_ids',
'options',
'mac_in_use',
],
'Bridge': [
'ports',
'name',
'controller',
'fail_mode',
'datapath_type',
],
'Port': [
'name',
'external_ids',
'interfaces',
],
'Controller': [
'target',
],
'Open_vSwitch': [
'bridges',
'cur_cfg',
'next_cfg'
]
}
def get_schema_helper(connection_string, db_name='Open_vSwitch', tables='all'):
try:
helper = idlutils.get_schema_helper(connection_string,
db_name)
except Exception:
# We may have failed do to set-manager not being called
helpers.enable_connection_uri(connection_string)
# There is a small window for a race, so retry up to a second
@retrying.retry(wait_exponential_multiplier=10,
stop_max_delay=1000)
def do_get_schema_helper():
return idlutils.get_schema_helper(connection_string,
db_name)
helper = do_get_schema_helper()
if tables == 'all':
helper.register_all()
elif isinstance(tables, dict):
for table_name, columns in six.iteritems(tables):
if columns == 'all':
helper.register_table(table_name)
else:
helper.register_columns(table_name, columns)
return helper
class DFConnection(connection.Connection):
"""
Extend the Neutron OVS Connection class to support being given the IDL
schema externally or manually.
Much of this code was taken directly from connection.Connection class.
"""
def __init__(
self, connection, timeout, schema_helper):
super(DFConnection, self).__init__(connection, timeout, None)
assert schema_helper is not None, "schema_helper parameter is None"
self._schema_helper = schema_helper
def start(self, nb_api):
with self.lock:
if self.idl is not None:
return
self.idl = DFIdl(nb_api, self.connection, self._schema_helper)
idlutils.wait_for_change(self.idl, self.timeout)
self.poller = poller.Poller()
self.thread = threading.Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start()
class DFOvsdbApi(impl_idl.OvsdbIdl):
"""The command generator of OVS DB operation
This is a sub-class of OvsdbIdl, which is defined in neutron. The super
class OvsdbIdl has defined lots of command. Dragonflow can use
them. And Dragonflow can extend its own commands in this class.
"""
ovsdb_connection = None
def __init__(self, context, db_connection, timeout):
self.context = context
if DFOvsdbApi.ovsdb_connection is None:
DFOvsdbApi.ovsdb_connection = DFConnection(
db_connection,
timeout,
get_schema_helper(
db_connection,
tables=ovsdb_monitor_table_filter_default))
# Override the super class's attribute
impl_idl.OvsdbIdl.ovsdb_connection = DFOvsdbApi.ovsdb_connection
def start(self, nb_api):
DFOvsdbApi.ovsdb_connection.start(nb_api)
self.idl = DFOvsdbApi.ovsdb_connection.idl
def add_tunnel_port(self, chassis):
return AddTunnelPort(self, chassis)
def get_bridge_ports(self, bridge):
return GetBridgePorts(self, bridge)
def add_patch_port(self, bridge, port, remote_name):
return AddPatchPort(self, bridge, port, remote_name)
class OvsdbSwitchApi(api_vswitch.SwitchApi):
"""The interface of openvswitch
Consumers use this class to set openvswitch or get results from
openvswitch.
"""
def __init__(self, ip, protocol='tcp', port='6640', timeout=10):
super(OvsdbSwitchApi, self).__init__()
self.ip = ip
self.protocol = protocol
self.port = port
# NOTE: This has to be this name vsctl_timeout, as neutron will use
# this attribute to set the timeout of ovs db.
self.vsctl_timeout = timeout
self.ovsdb = None
self.integration_bridge = cfg.CONF.df.integration_bridge
vlog.Vlog.init('dragonflow')
def initialize(self, nb_api):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
self.ovsdb = DFOvsdbApi(self, db_connection, self.vsctl_timeout)
table = constants.OVS_INTERFACE
nb_api.db_change_callback(table, None, 'sync_started', None)
self.ovsdb.start(nb_api)
nb_api.db_change_callback(table, None, 'sync_finished', None)
def _db_get_val(self, table, record, column, check_error=False,
log_errors=True):
return self.ovsdb.db_get(table, record, column).execute(
check_error=check_error, log_errors=log_errors)
def _get_bridge_for_iface(self, iface_name):
return self.ovsdb.iface_to_br(iface_name).execute()
def set_controller(self, bridge, targets):
self.ovsdb.set_controller(bridge, targets).execute()
def set_controller_fail_mode(self, bridge, fail_mode):
self.ovsdb.set_fail_mode(bridge, fail_mode).execute()
def check_controller(self, target):
controllers = self.ovsdb.get_controller(
self.integration_bridge).execute()
return target in controllers
def check_controller_fail_mode(self, fail_mode):
return fail_mode == self._db_get_val('Bridge',
self.integration_bridge,
'fail_mode')
def get_tunnel_ports(self):
res = []
ports = self.ovsdb.get_bridge_ports(self.integration_bridge).execute()
for port in ports:
if 'df-chassis-id' in port.external_ids:
chassis_id = port.external_ids['df-chassis-id']
res.append(OvsdbTunnelPort(port, chassis_id))
return res
def add_tunnel_port(self, chassis):
self.ovsdb.add_tunnel_port(chassis).execute()
def delete_port(self, switch_port):
self.ovsdb.del_port(switch_port.get_name(),
self.integration_bridge).execute()
@staticmethod
def _check_ofport(port_name, ofport):
if ofport is None:
LOG.warning(_LW("Can't find ofport for port %s."), port_name)
return False
if ofport < OFPORT_RANGE_MIN or ofport > OFPORT_RANGE_MAX:
LOG.warning(_LW("ofport %(ofport)s for port %(port)s is invalid."),
{'ofport': ofport, 'port': port_name})
return False
return True
def get_chassis_ofport(self, chassis_id):
# TODO(xiaohhui): Can we just call get_port_ofport('df-'+chassis_id)?
ports = self.ovsdb.db_find(
'Port', ('external_ids', '=', {'df-chassis-id': chassis_id}),
columns=['external_ids', 'name']).execute()
for port in ports:
ofport = self.get_port_ofport(port['name'])
if self._check_ofport(port['name'], ofport):
return ofport
def get_port_ofport_by_id(self, port_id):
ifaces = self.ovsdb.db_find(
'Interface', ('external_ids', '=', {'iface-id': port_id}),
columns=['external_ids', 'name', 'ofport']).execute()
for iface in ifaces:
if (self.integration_bridge !=
self._get_bridge_for_iface(iface['name'])):
# iface-id is the port id in neutron, the same neutron port
# might create multiple interfaces in different bridges
continue
if self._check_ofport(iface['name'], iface['ofport']):
return iface['ofport']
def create_patch_port(self, bridge, port, remote_name):
self.ovsdb.add_br(bridge, datapath_type='system').execute()
if not self.patch_port_exist(port):
self.ovsdb.add_patch_port(bridge, port, remote_name).execute()
return self.get_port_ofport(port)
def patch_port_exist(self, port):
return 'patch' == self._db_get_val('Interface', port, 'type',
check_error=False,
log_errors=False)
def get_port_ofport(self, port):
return self._db_get_val('Interface', port, 'ofport',
check_error=False, log_errors=False)
class OvsdbSwitchPort(api_vswitch.SwitchPort):
def __init__(self, row):
self.port_row = row
def get_name(self):
return self.port_row.name
def get_id(self):
pass
class OvsdbTunnelPort(OvsdbSwitchPort):
def __init__(self, row, chassis_id):
super(OvsdbTunnelPort, self).__init__(row)
self.chassis_id = chassis_id
def get_chassis_id(self):
return self.chassis_id
class AddTunnelPort(commands.BaseCommand):
def __init__(self, api, chassis):
super(AddTunnelPort, self).__init__(api)
self.chassis = chassis
self.integration_bridge = cfg.CONF.df.integration_bridge
def run_idl(self, txn):
bridge = idlutils.row_by_value(self.api.idl, 'Bridge',
'name', self.integration_bridge)
port_name = "df-" + self.chassis.get_id()
interface = txn.insert(self.api.idl.tables['Interface'])
interface.name = port_name
interface.type = self.chassis.get_encap_type()
options_dict = getattr(interface, 'options', {})
options_dict['remote_ip'] = self.chassis.get_ip()
options_dict['key'] = 'flow'
interface.options = options_dict
port = txn.insert(self.api.idl.tables['Port'])
port.name = port_name
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
ifaces.append(interface)
port.interfaces = ifaces
external_ids_dict = getattr(interface, 'external_ids', {})
external_ids_dict['df-chassis-id'] = self.chassis.get_id()
port.external_ids = external_ids_dict
bridge.verify('ports')
ports = getattr(bridge, 'ports', [])
ports.append(port)
bridge.ports = ports
class GetBridgePorts(commands.BaseCommand):
def __init__(self, api, bridge):
super(GetBridgePorts, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [p for p in br.ports if p.name != self.bridge]
class DFIdl(idl.Idl):
def __init__(self, nb_api, remote, schema):
super(DFIdl, self).__init__(remote, schema)
self.nb_api = nb_api
self.interface_type = (constants.OVS_VM_INTERFACE,
constants.OVS_BRIDGE_INTERFACE)
def _is_handle_interface_update(self, interface):
if interface.name == cfg.CONF.df.metadata_interface:
return True
if interface.type not in self.interface_type:
return False
if interface.name.startswith('qg'):
return False
return True
def _notify_update_local_interface(self, local_interface, action):
if self._is_handle_interface_update(local_interface):
table = constants.OVS_INTERFACE
key = local_interface.uuid
self.nb_api.db_change_callback(table, key, action, local_interface)
def notify(self, event, row, updates=None):
if not row or not hasattr(row, '_table'):
return
if row._table.name == 'Interface':
_interface = api_vswitch.LocalInterface.from_idl_row(row)
action = event if event != 'update' else 'set'
self._notify_update_local_interface(_interface, action)
class AddPatchPort(commands.BaseCommand):
def __init__(self, api, bridge, port, remote_name):
super(AddPatchPort, self).__init__(api)
self.bridge = bridge
self.port = port
self.remote_name = remote_name
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
port = txn.insert(self.api.idl.tables['Port'])
port.name = self.port
br.verify('ports')
ports = getattr(br, 'ports', [])
ports.append(port)
br.ports = ports
iface = txn.insert(self.api.idl.tables['Interface'])
iface.name = self.port
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
options_dict = getattr(iface, 'options', {})
options_dict['peer'] = self.remote_name
iface.options = options_dict
iface.type = 'patch'
ifaces.append(iface)
port.interfaces = ifaces

View File

View File

@ -0,0 +1,88 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.ovsdb.native import commands
from neutron.agent.ovsdb.native import idlutils
from oslo_config import cfg
class AddPatchPort(commands.BaseCommand):
def __init__(self, api, bridge, port, remote_name):
super(AddPatchPort, self).__init__(api)
self.bridge = bridge
self.port = port
self.remote_name = remote_name
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
port = txn.insert(self.api.idl.tables['Port'])
port.name = self.port
br.verify('ports')
ports = getattr(br, 'ports', [])
ports.append(port)
br.ports = ports
iface = txn.insert(self.api.idl.tables['Interface'])
iface.name = self.port
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
options_dict = getattr(iface, 'options', {})
options_dict['peer'] = self.remote_name
iface.options = options_dict
iface.type = 'patch'
ifaces.append(iface)
port.interfaces = ifaces
class GetBridgePorts(commands.BaseCommand):
def __init__(self, api, bridge):
super(GetBridgePorts, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [p for p in br.ports if p.name != self.bridge]
class AddTunnelPort(commands.BaseCommand):
def __init__(self, api, chassis):
super(AddTunnelPort, self).__init__(api)
self.chassis = chassis
self.integration_bridge = cfg.CONF.df.integration_bridge
def run_idl(self, txn):
bridge = idlutils.row_by_value(self.api.idl, 'Bridge',
'name', self.integration_bridge)
port_name = "df-" + self.chassis.get_id()
interface = txn.insert(self.api.idl.tables['Interface'])
interface.name = port_name
interface.type = self.chassis.get_encap_type()
options_dict = getattr(interface, 'options', {})
options_dict['remote_ip'] = self.chassis.get_ip()
options_dict['key'] = 'flow'
interface.options = options_dict
port = txn.insert(self.api.idl.tables['Port'])
port.name = port_name
port.verify('interfaces')
ifaces = getattr(port, 'interfaces', [])
ifaces.append(interface)
port.interfaces = ifaces
external_ids_dict = getattr(interface, 'external_ids', {})
external_ids_dict['df-chassis-id'] = self.chassis.get_id()
port.external_ids = external_ids_dict
bridge.verify('ports')
ports = getattr(bridge, 'ports', [])
ports.append(port)
bridge.ports = ports

View File

@ -0,0 +1,175 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.ovsdb import impl_idl
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import helpers
from neutron.agent.ovsdb.native import idlutils
from oslo_config import cfg
from ovs.db import idl
from ovs import poller
import retrying
import six
import threading
from dragonflow.common import constants
from dragonflow.ovsdb import commands
from dragonflow.ovsdb import objects
ovsdb_monitor_table_filter_default = {
'Interface': [
'ofport',
'name',
'admin_state',
'type',
'external_ids',
'options',
'mac_in_use',
],
'Bridge': [
'ports',
'name',
'controller',
'fail_mode',
'datapath_type',
],
'Port': [
'name',
'external_ids',
'interfaces',
],
'Controller': [
'target',
],
'Open_vSwitch': [
'bridges',
'cur_cfg',
'next_cfg'
]
}
def get_schema_helper(connection_string, db_name='Open_vSwitch', tables='all'):
try:
helper = idlutils.get_schema_helper(connection_string,
db_name)
except Exception:
# We may have failed do to set-manager not being called
helpers.enable_connection_uri(connection_string)
# There is a small window for a race, so retry up to a second
@retrying.retry(wait_exponential_multiplier=10,
stop_max_delay=1000)
def do_get_schema_helper():
return idlutils.get_schema_helper(connection_string,
db_name)
helper = do_get_schema_helper()
if tables == 'all':
helper.register_all()
elif isinstance(tables, dict):
for table_name, columns in six.iteritems(tables):
if columns == 'all':
helper.register_table(table_name)
else:
helper.register_columns(table_name, columns)
return helper
class DFIdl(idl.Idl):
def __init__(self, nb_api, remote, schema):
super(DFIdl, self).__init__(remote, schema)
self.nb_api = nb_api
self.interface_type = (constants.OVS_VM_INTERFACE,
constants.OVS_BRIDGE_INTERFACE)
def _is_handle_interface_update(self, interface):
if interface.name == cfg.CONF.df.metadata_interface:
return True
if interface.type not in self.interface_type:
return False
if interface.name.startswith('qg'):
return False
return True
def _notify_update_local_interface(self, local_interface, action):
if self._is_handle_interface_update(local_interface):
table = constants.OVS_INTERFACE
key = local_interface.uuid
self.nb_api.db_change_callback(table, key, action, local_interface)
def notify(self, event, row, updates=None):
if not row or not hasattr(row, '_table'):
return
if row._table.name == 'Interface':
_interface = objects.LocalInterface.from_idl_row(row)
action = event if event != 'update' else 'set'
self._notify_update_local_interface(_interface, action)
class DFConnection(connection.Connection):
"""
Extend the Neutron OVS Connection class to support being given the IDL
schema externally or manually.
Much of this code was taken directly from connection.Connection class.
"""
def __init__(
self, connection, timeout, schema_helper):
super(DFConnection, self).__init__(connection, timeout, None)
assert schema_helper is not None, "schema_helper parameter is None"
self._schema_helper = schema_helper
def start(self, nb_api):
with self.lock:
if self.idl is not None:
return
self.idl = DFIdl(nb_api, self.connection, self._schema_helper)
idlutils.wait_for_change(self.idl, self.timeout)
self.poller = poller.Poller()
self.thread = threading.Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start()
class DFOvsdbApi(impl_idl.OvsdbIdl):
"""The command generator of OVS DB operation
This is a sub-class of OvsdbIdl, which is defined in neutron. The super
class OvsdbIdl has defined lots of command. Dragonflow can use
them. And Dragonflow can extend its own commands in this class.
"""
ovsdb_connection = None
def __init__(self, context, db_connection, timeout):
self.context = context
if DFOvsdbApi.ovsdb_connection is None:
DFOvsdbApi.ovsdb_connection = DFConnection(
db_connection,
timeout,
get_schema_helper(
db_connection,
tables=ovsdb_monitor_table_filter_default))
# Override the super class's attribute
impl_idl.OvsdbIdl.ovsdb_connection = DFOvsdbApi.ovsdb_connection
def start(self, nb_api):
DFOvsdbApi.ovsdb_connection.start(nb_api)
self.idl = DFOvsdbApi.ovsdb_connection.idl
def add_tunnel_port(self, chassis):
return commands.AddTunnelPort(self, chassis)
def get_bridge_ports(self, bridge):
return commands.GetBridgePorts(self, bridge)
def add_patch_port(self, bridge, port, remote_name):
return commands.AddPatchPort(self, bridge, port, remote_name)

View File

@ -1,7 +1,3 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -17,51 +13,6 @@
from dragonflow.common import constants
class SwitchApi(object):
def sync(self):
pass
def set_controllers(self, bridge, targets):
pass
def del_controller(self, bridge):
pass
def get_tunnel_ports(self):
pass
def add_tunnel_port(self, chassis):
pass
def delete_port(self, switch_port):
pass
def get_logical_ports_to_ofport(self):
pass
def get_chassis_ids_to_ofport(self):
pass
def get_local_port_ids(self):
pass
class SwitchPort(object):
def get_name(self):
pass
def get_id(self):
pass
class TunnelPort(SwitchPort):
def get_chassis_id(self):
pass
class LocalInterface(object):
def __init__(self):
@ -171,3 +122,17 @@ class LocalInterface(object):
self.mac_in_use,
self.remote_ip,
self.tunnel_type))
class OvsdbTunnelPort(object):
def __init__(self, name, chassis_id):
super(OvsdbTunnelPort, self).__init__()
self.name = name
self.chassis_id = chassis_id
def get_chassis_id(self):
return self.chassis_id
def get_name(self):
return self.name

View File

@ -0,0 +1,151 @@
# Copyright (c) 2015 OpenStack Foundation.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log
from ovs import vlog
from dragonflow._i18n import _LW
from dragonflow.common import constants
from dragonflow.ovsdb import impl_idl
from dragonflow.ovsdb import objects
LOG = log.getLogger(__name__)
OFPORT_RANGE_MIN = 1
OFPORT_RANGE_MAX = 65533
class OvsApi(object):
"""The interface of openvswitch
Consumers use this class to set openvswitch or get results from
openvswitch.
"""
def __init__(self, ip, protocol='tcp', port='6640', timeout=10):
super(OvsApi, self).__init__()
self.ip = ip
self.protocol = protocol
self.port = port
# NOTE: This has to be this name vsctl_timeout, as neutron will use
# this attribute to set the timeout of ovs db.
self.vsctl_timeout = timeout
self.ovsdb = None
self.integration_bridge = cfg.CONF.df.integration_bridge
vlog.Vlog.init('dragonflow')
def initialize(self, nb_api):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
self.ovsdb = impl_idl.DFOvsdbApi(
self, db_connection, self.vsctl_timeout)
table = constants.OVS_INTERFACE
nb_api.db_change_callback(table, None, 'sync_started', None)
self.ovsdb.start(nb_api)
nb_api.db_change_callback(table, None, 'sync_finished', None)
def _db_get_val(self, table, record, column, check_error=False,
log_errors=True):
return self.ovsdb.db_get(table, record, column).execute(
check_error=check_error, log_errors=log_errors)
def _get_bridge_for_iface(self, iface_name):
return self.ovsdb.iface_to_br(iface_name).execute()
def set_controller(self, bridge, targets):
self.ovsdb.set_controller(bridge, targets).execute()
def set_controller_fail_mode(self, bridge, fail_mode):
self.ovsdb.set_fail_mode(bridge, fail_mode).execute()
def check_controller(self, target):
controllers = self.ovsdb.get_controller(
self.integration_bridge).execute()
return target in controllers
def check_controller_fail_mode(self, fail_mode):
return fail_mode == self._db_get_val('Bridge',
self.integration_bridge,
'fail_mode')
def get_tunnel_ports(self):
res = []
ports = self.ovsdb.get_bridge_ports(self.integration_bridge).execute()
for port in ports:
if 'df-chassis-id' in port.external_ids:
chassis_id = port.external_ids['df-chassis-id']
res.append(objects.OvsdbTunnelPort(port.name, chassis_id))
return res
def add_tunnel_port(self, chassis):
self.ovsdb.add_tunnel_port(chassis).execute()
def delete_port(self, switch_port):
self.ovsdb.del_port(switch_port.get_name(),
self.integration_bridge).execute()
@staticmethod
def _check_ofport(port_name, ofport):
if ofport is None:
LOG.warning(_LW("Can't find ofport for port %s."), port_name)
return False
if ofport < OFPORT_RANGE_MIN or ofport > OFPORT_RANGE_MAX:
LOG.warning(_LW("ofport %(ofport)s for port %(port)s is invalid."),
{'ofport': ofport, 'port': port_name})
return False
return True
def get_chassis_ofport(self, chassis_id):
# TODO(xiaohhui): Can we just call get_port_ofport('df-'+chassis_id)?
ports = self.ovsdb.db_find(
'Port', ('external_ids', '=', {'df-chassis-id': chassis_id}),
columns=['external_ids', 'name']).execute()
for port in ports:
ofport = self.get_port_ofport(port['name'])
if self._check_ofport(port['name'], ofport):
return ofport
def get_port_ofport_by_id(self, port_id):
ifaces = self.ovsdb.db_find(
'Interface', ('external_ids', '=', {'iface-id': port_id}),
columns=['external_ids', 'name', 'ofport']).execute()
for iface in ifaces:
if (self.integration_bridge !=
self._get_bridge_for_iface(iface['name'])):
# iface-id is the port id in neutron, the same neutron port
# might create multiple interfaces in different bridges
continue
if self._check_ofport(iface['name'], iface['ofport']):
return iface['ofport']
def create_patch_port(self, bridge, port, remote_name):
self.ovsdb.add_br(bridge, datapath_type='system').execute()
if not self.patch_port_exist(port):
self.ovsdb.add_patch_port(bridge, port, remote_name).execute()
return self.get_port_ofport(port)
def patch_port_exist(self, port):
return 'patch' == self._db_get_val('Interface', port, 'type',
check_error=False,
log_errors=False)
def get_port_ofport(self, port):
return self._db_get_val('Interface', port, 'ofport',
check_error=False, log_errors=False)

View File

@ -12,7 +12,7 @@
from neutron.agent.linux.utils import wait_until_true
from dragonflow.db.drivers import ovsdb_vswitch_impl
from dragonflow.ovsdb import vswitch_impl
from dragonflow.tests.common import utils
from dragonflow.tests.fullstack import test_base
from dragonflow.tests.fullstack import test_objects as objects
@ -22,7 +22,7 @@ class TestOvsdbMonitor(test_base.DFTestBase):
def setUp(self):
super(TestOvsdbMonitor, self).setUp()
self.set_wanted_vms = set()
self.vswitch_api = ovsdb_vswitch_impl.OvsdbSwitchApi(self.local_ip)
self.vswitch_api = vswitch_impl.OvsApi(self.local_ip)
self.vswitch_api.initialize(self.nb_api)
def tearDown(self):