Use the ovsdbapp library

This patch uses the ovsdbapp Python library, which is the new
project based on the Neutron OVSDB API.

The CLI implementation of the OVSDB API remains in the Neutron
tree.

Neutron continues providing the (deprecated) ability to allow
the OVSDB API to be imported from Neutron.

The deleted tests exist in the ovsdbapp project. More will be
moved later, but many of the tests in the Neutron tree use
ovs_lib, which doesn't exist in ovsdbapp so those tests will
probably stay in the Neutron tree.

Closes-Bug: #1684277
Depends-On: I3d3535b1d6fe37c78a9399903b65bbd688b1c4b9
Change-Id: Ic8c7db0e80d0ad104242322d3f1f70cab8caab92
changes/14/453014/20
Terry Wilson 6 years ago
parent 6ecdfbb82b
commit e6333593ae

@ -30,7 +30,7 @@ import tenacity
from neutron._i18n import _, _LE, _LI, _LW
from neutron.agent.common import ip_lib
from neutron.agent.common import utils
from neutron.agent.ovsdb import api as ovsdb
from neutron.agent.ovsdb import api as ovsdb_api
from neutron.conf.agent import ovs_conf
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
@ -106,7 +106,7 @@ class BaseOVS(object):
def __init__(self):
self.vsctl_timeout = cfg.CONF.ovs_vsctl_timeout
self.ovsdb = ovsdb.API.get(self)
self.ovsdb = ovsdb_api.from_config(self)
def add_manager(self, connection_uri, timeout=_SENTINEL):
"""Have ovsdb-server listen for manager connections

@ -12,20 +12,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import contextlib
import uuid
from debtcollector import moves
from oslo_config import cfg
from oslo_utils import importutils
import six
from ovsdbapp import api
from ovsdbapp import exceptions
from neutron._i18n import _
API = moves.moved_class(api.API, 'API', __name__)
Command = moves.moved_class(api.Command, 'Command', __name__)
Transaction = moves.moved_class(api.Transaction, 'Transaction', __name__)
TimeoutException = moves.moved_class(exceptions.TimeoutException,
'TimeoutException', __name__)
interface_map = {
'vsctl': 'neutron.agent.ovsdb.impl_vsctl.OvsdbVsctl',
'native': 'neutron.agent.ovsdb.impl_idl.NeutronOvsdbIdl',
'vsctl': 'neutron.agent.ovsdb.impl_vsctl',
'native': 'neutron.agent.ovsdb.impl_idl',
}
OPTS = [
@ -44,400 +50,11 @@ OPTS = [
cfg.CONF.register_opts(OPTS, 'OVS')
@six.add_metaclass(abc.ABCMeta)
class Command(object):
"""An OVSDB command that can be executed in a transaction
:attr result: The result of executing the command in a transaction
"""
@abc.abstractmethod
def execute(self, **transaction_options):
"""Immediately execute an OVSDB command
This implicitly creates a transaction with the passed options and then
executes it, returning the value of the executed transaction
:param transaction_options: Options to pass to the transaction
"""
@six.add_metaclass(abc.ABCMeta)
class Transaction(object):
@abc.abstractmethod
def commit(self):
"""Commit the transaction to OVSDB"""
@abc.abstractmethod
def add(self, command):
"""Append an OVSDB operation to the transaction"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, tb):
if exc_type is None:
self.result = self.commit()
@six.add_metaclass(abc.ABCMeta)
class API(object):
def __init__(self, context):
self.context = context
self._nested_txn = None
@staticmethod
def get(context, iface_name=None):
"""Return the configured OVSDB API implementation"""
iface = importutils.import_class(
interface_map[iface_name or cfg.CONF.OVS.ovsdb_interface])
return iface(context)
@abc.abstractmethod
def create_transaction(self, check_error=False, log_errors=True, **kwargs):
"""Create a transaction
:param check_error: Allow the transaction to raise an exception?
:type check_error: bool
:param log_errors: Log an error if the transaction fails?
:type log_errors: bool
:returns: A new transaction
:rtype: :class:`Transaction`
"""
@contextlib.contextmanager
def transaction(self, check_error=False, log_errors=True, **kwargs):
"""Create a transaction context.
:param check_error: Allow the transaction to raise an exception?
:type check_error: bool
:param log_errors: Log an error if the transaction fails?
:type log_errors: bool
:returns: Either a new transaction or an existing one.
:rtype: :class:`Transaction`
"""
if self._nested_txn:
yield self._nested_txn
else:
with self.create_transaction(
check_error, log_errors, **kwargs) as txn:
self._nested_txn = txn
try:
yield txn
finally:
self._nested_txn = None
@abc.abstractmethod
def add_manager(self, connection_uri):
"""Create a command to add a Manager to the OVS switch
This API will add a new manager without overriding the existing ones.
:param connection_uri: target to which manager needs to be set
:type connection_uri: string, see ovs-vsctl manpage for format
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def get_manager(self):
"""Create a command to get Manager list from the OVS switch
:returns: :class:`Command` with list of Manager names result
"""
@abc.abstractmethod
def remove_manager(self, connection_uri):
"""Create a command to remove a Manager from the OVS switch
This API will remove the manager configured on the OVS switch.
:param connection_uri: target identifying the manager uri that
needs to be removed.
:type connection_uri: string, see ovs-vsctl manpage for format
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def add_br(self, name, may_exist=True, datapath_type=None):
"""Create a command to add an OVS bridge
:param name: The name of the bridge
:type name: string
:param may_exist: Do not fail if bridge already exists
:type may_exist: bool
:param datapath_type: The datapath_type of the bridge
:type datapath_type: string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def del_br(self, name, if_exists=True):
"""Create a command to delete an OVS bridge
:param name: The name of the bridge
:type name: string
:param if_exists: Do not fail if the bridge does not exist
:type if_exists: bool
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def br_exists(self, name):
"""Create a command to check if an OVS bridge exists
:param name: The name of the bridge
:type name: string
:returns: :class:`Command` with bool result
"""
@abc.abstractmethod
def port_to_br(self, name):
"""Create a command to return the name of the bridge with the port
:param name: The name of the OVS port
:type name: string
:returns: :class:`Command` with bridge name result
"""
@abc.abstractmethod
def iface_to_br(self, name):
"""Create a command to return the name of the bridge with the interface
:param name: The name of the OVS interface
:type name: string
:returns: :class:`Command` with bridge name result
"""
@abc.abstractmethod
def list_br(self):
"""Create a command to return the current list of OVS bridge names
:returns: :class:`Command` with list of bridge names result
"""
@abc.abstractmethod
def br_get_external_id(self, name, field):
"""Create a command to return a field from the Bridge's external_ids
:param name: The name of the OVS Bridge
:type name: string
:param field: The external_ids field to return
:type field: string
:returns: :class:`Command` with field value result
"""
@abc.abstractmethod
def db_create(self, table, **col_values):
"""Create a command to create new record
:param table: The OVS table containing the record to be created
:type table: string
:param col_values: The columns and their associated values
to be set after create
:type col_values: Dictionary of columns id's and values
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_destroy(self, table, record):
"""Create a command to destroy a record
:param table: The OVS table containing the record to be destroyed
:type table: string
:param record: The record id (name/uuid) to be destroyed
:type record: uuid/string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_set(self, table, record, *col_values):
"""Create a command to set fields in a record
:param table: The OVS table containing the record to be modified
:type table: string
:param record: The record id (name/uuid) to be modified
:type table: string
:param col_values: The columns and their associated values
:type col_values: Tuples of (column, value). Values may be atomic
values or unnested sequences/mappings
:returns: :class:`Command` with no result
"""
# TODO(twilson) Consider handling kwargs for arguments where order
# doesn't matter. Though that would break the assert_called_once_with
# unit tests
@abc.abstractmethod
def db_add(self, table, record, column, *values):
"""Create a command to add a value to a record
Adds each value or key-value pair to column in record in table. If
column is a map, then each value will be a dict, otherwise a base type.
If key already exists in a map column, then the current value is not
replaced (use the set command to replace an existing value).
:param table: The OVS table containing the record to be modified
:type table: string
:param record: The record id (name/uuid) to modified
:type record: string
:param column: The column name to be modified
:type column: string
:param values: The values to be added to the column
:type values: The base type of the column. If column is a map, then
a dict containing the key name and the map's value type
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_clear(self, table, record, column):
"""Create a command to clear a field's value in a record
:param table: The OVS table containing the record to be modified
:type table: string
:param record: The record id (name/uuid) to be modified
:type record: string
:param column: The column whose value should be cleared
:type column: string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_get(self, table, record, column):
"""Create a command to return a field's value in a record
:param table: The OVS table containing the record to be queried
:type table: string
:param record: The record id (name/uuid) to be queried
:type record: string
:param column: The column whose value should be returned
:type column: string
:returns: :class:`Command` with the field's value result
"""
@abc.abstractmethod
def db_list(self, table, records=None, columns=None, if_exists=False):
"""Create a command to return a list of OVSDB records
:param table: The OVS table to query
:type table: string
:param records: The records to return values from
:type records: list of record ids (names/uuids)
:param columns: Limit results to only columns, None means all columns
:type columns: list of column names or None
:param if_exists: Do not fail if the record does not exist
:type if_exists: bool
:returns: :class:`Command` with [{'column', value}, ...] result
"""
@abc.abstractmethod
def db_find(self, table, *conditions, **kwargs):
"""Create a command to return find OVSDB records matching conditions
:param table: The OVS table to query
:type table: string
:param conditions:The conditions to satisfy the query
:type conditions: 3-tuples containing (column, operation, match)
Type of 'match' parameter MUST be identical to column
type
Examples:
atomic: ('tag', '=', 7)
map: ('external_ids' '=', {'iface-id': 'xxx'})
field exists?
('external_ids', '!=', {'iface-id', ''})
set contains?:
('protocols', '{>=}', 'OpenFlow13')
See the ovs-vsctl man page for more operations
:param columns: Limit results to only columns, None means all columns
:type columns: list of column names or None
:returns: :class:`Command` with [{'column', value}, ...] result
"""
@abc.abstractmethod
def set_controller(self, bridge, controllers):
"""Create a command to set an OVS bridge's OpenFlow controllers
:param bridge: The name of the bridge
:type bridge: string
:param controllers: The controller strings
:type controllers: list of strings, see ovs-vsctl manpage for format
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def del_controller(self, bridge):
"""Create a command to clear an OVS bridge's OpenFlow controllers
:param bridge: The name of the bridge
:type bridge: string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def get_controller(self, bridge):
"""Create a command to return an OVS bridge's OpenFlow controllers
:param bridge: The name of the bridge
:type bridge: string
:returns: :class:`Command` with list of controller strings result
"""
@abc.abstractmethod
def set_fail_mode(self, bridge, mode):
"""Create a command to set an OVS bridge's failure mode
:param bridge: The name of the bridge
:type bridge: string
:param mode: The failure mode
:type mode: "secure" or "standalone"
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def add_port(self, bridge, port, may_exist=True):
"""Create a command to add a port to an OVS bridge
:param bridge: The name of the bridge
:type bridge: string
:param port: The name of the port
:type port: string
:param may_exist: Do not fail if the port already exists
:type may_exist: bool
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def del_port(self, port, bridge=None, if_exists=True):
"""Create a command to delete a port an OVS port
:param port: The name of the port
:type port: string
:param bridge: Only delete port if it is attached to this bridge
:type bridge: string
:param if_exists: Do not fail if the port does not exist
:type if_exists: bool
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def list_ports(self, bridge):
"""Create a command to list the names of ports on a bridge
:param bridge: The name of the bridge
:type bridge: string
:returns: :class:`Command` with list of port names result
"""
@abc.abstractmethod
def list_ifaces(self, bridge):
"""Create a command to list the names of interfaces on a bridge
:param bridge: The name of the bridge
:type bridge: string
:returns: :class:`Command` with list of interfaces names result
"""
class TimeoutException(Exception):
pass
def from_config(context, iface_name=None):
"""Return the configured OVSDB API implementation"""
iface = importutils.import_module(
interface_map[iface_name or cfg.CONF.OVS.ovsdb_interface])
return iface.api_factory(context)
def val_to_py(val):

@ -12,289 +12,32 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from neutron_lib import exceptions
from debtcollector import moves
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from ovs.db import idl
from six.moves import queue as Queue
from ovsdbapp.schema.open_vswitch import impl_idl
from neutron._i18n import _, _LE
from neutron.agent.ovsdb import api
from neutron.agent.ovsdb.native import commands as cmd
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from neutron.agent.ovsdb.native import vlog
NeutronOVSDBTransaction = moves.moved_class(
impl_idl.OvsVsctlTransaction,
'NeutronOVSDBTransaction',
__name__)
cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.common.ovs_lib')
LOG = logging.getLogger(__name__)
class VswitchdInterfaceAddException(exceptions.NeutronException):
message = _("Failed to add interfaces: %(ifaces)s")
class Transaction(api.Transaction):
def __init__(self, api, ovsdb_connection, timeout,
check_error=False, log_errors=True):
self.api = api
self.check_error = check_error
self.log_errors = log_errors
self.commands = []
self.results = Queue.Queue(1)
self.ovsdb_connection = ovsdb_connection
self.timeout = timeout
self.expected_ifaces = set()
def __str__(self):
return ", ".join(str(cmd) for cmd in self.commands)
def add(self, command):
"""Add a command to the transaction
returns The command passed as a convenience
"""
self.commands.append(command)
return command
def commit(self):
self.ovsdb_connection.queue_txn(self)
try:
result = self.results.get(timeout=self.timeout)
except Queue.Empty:
raise api.TimeoutException(
_("Commands %(commands)s exceeded timeout %(timeout)d "
"seconds") % {'commands': self.commands,
'timeout': self.timeout})
if isinstance(result, idlutils.ExceptionResult):
if self.log_errors:
LOG.error(result.tb)
if self.check_error:
raise result.ex
return result
def pre_commit(self, txn):
pass
def post_commit(self, txn):
for command in self.commands:
command.post_commit(txn)
def do_commit(self):
self.start_time = time.time()
attempts = 0
while True:
if attempts > 0 and self.timeout_exceeded():
raise RuntimeError(_("OVS transaction timed out"))
attempts += 1
# TODO(twilson) Make sure we don't loop longer than vsctl_timeout
txn = idl.Transaction(self.api.idl)
self.pre_commit(txn)
for i, command in enumerate(self.commands):
LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s",
{'idx': i, 'cmd': command})
try:
command.run_idl(txn)
except Exception:
with excutils.save_and_reraise_exception() as ctx:
txn.abort()
if not self.check_error:
ctx.reraise = False
seqno = self.api.idl.change_seqno
status = txn.commit_block()
if status == txn.TRY_AGAIN:
LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying")
idlutils.wait_for_change(self.api.idl, self.time_remaining(),
seqno)
continue
elif status == txn.ERROR:
msg = _("OVSDB Error: %s") % txn.get_error()
if self.log_errors:
LOG.error(msg)
if self.check_error:
# For now, raise similar error to vsctl/utils.execute()
raise RuntimeError(msg)
return
elif status == txn.ABORTED:
LOG.debug("Transaction aborted")
return
elif status == txn.UNCHANGED:
LOG.debug("Transaction caused no change")
elif status == txn.SUCCESS:
self.post_commit(txn)
return [cmd.result for cmd in self.commands]
def elapsed_time(self):
return time.time() - self.start_time
def time_remaining(self):
return self.timeout - self.elapsed_time()
def timeout_exceeded(self):
return self.elapsed_time() > self.timeout
class NeutronOVSDBTransaction(Transaction):
def pre_commit(self, txn):
self.api._ovs.increment('next_cfg')
txn.expected_ifaces = set()
def post_commit(self, txn):
super(NeutronOVSDBTransaction, self).post_commit(txn)
# ovs-vsctl only logs these failures and does not return nonzero
try:
self.do_post_commit(txn)
except Exception:
LOG.exception(_LE("Post-commit checks failed"))
def do_post_commit(self, txn):
next_cfg = txn.get_increment_new_value()
while not self.timeout_exceeded():
self.api.idl.run()
if self.vswitchd_has_completed(next_cfg):
failed = self.post_commit_failed_interfaces(txn)
if failed:
raise VswitchdInterfaceAddException(
ifaces=", ".join(failed))
break
self.ovsdb_connection.poller.timer_wait(
self.time_remaining() * 1000)
self.api.idl.wait(self.ovsdb_connection.poller)
self.ovsdb_connection.poller.block()
else:
raise api.TimeoutException(
_("Commands %(commands)s exceeded timeout %(timeout)d "
"seconds post-commit") % {'commands': self.commands,
'timeout': self.timeout})
def post_commit_failed_interfaces(self, txn):
failed = []
for iface_uuid in txn.expected_ifaces:
uuid = txn.get_insert_uuid(iface_uuid)
if uuid:
ifaces = self.api.idl.tables['Interface']
iface = ifaces.rows.get(uuid)
if iface and (not iface.ofport or iface.ofport == -1):
failed.append(iface.name)
return failed
def vswitchd_has_completed(self, next_cfg):
return self.api._ovs.cur_cfg >= next_cfg
class OvsdbIdl(api.API):
ovsdb_connection = connection.Connection(cfg.CONF.OVS.ovsdb_connection,
cfg.CONF.ovs_vsctl_timeout,
'Open_vSwitch')
def __init__(self, context):
super(OvsdbIdl, self).__init__(context)
OvsdbIdl.ovsdb_connection.start()
self.idl = OvsdbIdl.ovsdb_connection.idl
@property
def _tables(self):
return self.idl.tables
@property
def _ovs(self):
return list(self._tables['Open_vSwitch'].rows.values())[0]
def create_transaction(self, check_error=False, log_errors=True, **kwargs):
return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection,
self.context.vsctl_timeout,
check_error, log_errors)
def add_manager(self, connection_uri):
return cmd.AddManagerCommand(self, connection_uri)
def get_manager(self):
return cmd.GetManagerCommand(self)
def remove_manager(self, connection_uri):
return cmd.RemoveManagerCommand(self, connection_uri)
def add_br(self, name, may_exist=True, datapath_type=None):
return cmd.AddBridgeCommand(self, name, may_exist, datapath_type)
def del_br(self, name, if_exists=True):
return cmd.DelBridgeCommand(self, name, if_exists)
def br_exists(self, name):
return cmd.BridgeExistsCommand(self, name)
def port_to_br(self, name):
return cmd.PortToBridgeCommand(self, name)
def iface_to_br(self, name):
return cmd.InterfaceToBridgeCommand(self, name)
def list_br(self):
return cmd.ListBridgesCommand(self)
def br_get_external_id(self, name, field):
return cmd.BrGetExternalIdCommand(self, name, field)
def br_set_external_id(self, name, field, value):
return cmd.BrSetExternalIdCommand(self, name, field, value)
def db_create(self, table, **col_values):
return cmd.DbCreateCommand(self, table, **col_values)
def db_destroy(self, table, record):
return cmd.DbDestroyCommand(self, table, record)
def db_set(self, table, record, *col_values):
return cmd.DbSetCommand(self, table, record, *col_values)
def db_add(self, table, record, column, *values):
return cmd.DbAddCommand(self, table, record, column, *values)
def db_clear(self, table, record, column):
return cmd.DbClearCommand(self, table, record, column)
def db_get(self, table, record, column):
return cmd.DbGetCommand(self, table, record, column)
def db_list(self, table, records=None, columns=None, if_exists=False):
return cmd.DbListCommand(self, table, records, columns, if_exists)
def db_find(self, table, *conditions, **kwargs):
return cmd.DbFindCommand(self, table, *conditions, **kwargs)
def set_controller(self, bridge, controllers):
return cmd.SetControllerCommand(self, bridge, controllers)
def del_controller(self, bridge):
return cmd.DelControllerCommand(self, bridge)
def get_controller(self, bridge):
return cmd.GetControllerCommand(self, bridge)
def set_fail_mode(self, bridge, mode):
return cmd.SetFailModeCommand(self, bridge, mode)
def add_port(self, bridge, port, may_exist=True):
return cmd.AddPortCommand(self, bridge, port, may_exist)
VswitchdInterfaceAddException = moves.moved_class(
impl_idl.VswitchdInterfaceAddException,
'VswitchdInterfaceAddException',
__name__)
def del_port(self, port, bridge=None, if_exists=True):
return cmd.DelPortCommand(self, port, bridge, if_exists)
_connection = connection.Connection(idl_factory=connection.idl_factory,
timeout=cfg.CONF.ovs_vsctl_timeout)
def list_ports(self, bridge):
return cmd.ListPortsCommand(self, bridge)
def list_ifaces(self, bridge):
return cmd.ListIfacesCommand(self, bridge)
def api_factory(context):
return NeutronOvsdbIdl(_connection)
class NeutronOvsdbIdl(OvsdbIdl):
def __init__(self, context):
vlog.use_oslo_logger()
super(NeutronOvsdbIdl, self).__init__(context)
class NeutronOvsdbIdl(impl_idl.OvsdbIdl):
def __init__(self, connection):
vlog.use_python_logger()
super(NeutronOvsdbIdl, self).__init__(connection)

@ -29,6 +29,10 @@ from neutron.agent.ovsdb import api as ovsdb
LOG = logging.getLogger(__name__)
def api_factory(context):
return OvsdbVsctl(context)
class Transaction(ovsdb.Transaction):
def __init__(self, context, check_error=False, log_errors=True, opts=None):
self.context = context
@ -178,6 +182,10 @@ class BrExistsCommand(DbCommand):
class OvsdbVsctl(ovsdb.API):
def __init__(self, context):
super(OvsdbVsctl, self).__init__()
self.context = context
def create_transaction(self, check_error=False, log_errors=True, **kwargs):
return Transaction(self.context, check_error, log_errors, **kwargs)

@ -12,561 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from ovsdbapp.schema.open_vswitch import commands
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.common import _deprecate
from neutron._i18n import _, _LE
from neutron.agent.ovsdb import api
from neutron.agent.ovsdb.native import idlutils
LOG = logging.getLogger(__name__)
class BaseCommand(api.Command):
def __init__(self, api):
self.api = api
self.result = None
def execute(self, check_error=False, log_errors=True):
try:
with self.api.transaction(check_error, log_errors) as txn:
txn.add(self)
return self.result
except Exception:
with excutils.save_and_reraise_exception() as ctx:
if log_errors:
LOG.exception(_LE("Error executing command"))
if not check_error:
ctx.reraise = False
def post_commit(self, txn):
pass
def __str__(self):
command_info = self.__dict__
return "%s(%s)" % (
self.__class__.__name__,
", ".join("%s=%s" % (k, v) for k, v in command_info.items()
if k not in ['api', 'result']))
__repr__ = __str__
class AddManagerCommand(BaseCommand):
def __init__(self, api, target):
super(AddManagerCommand, self).__init__(api)
self.target = target
def run_idl(self, txn):
row = txn.insert(self.api._tables['Manager'])
row.target = self.target
try:
self.api._ovs.addvalue('manager_options', row)
except AttributeError: # OVS < 2.6
self.api._ovs.verify('manager_options')
self.api._ovs.manager_options = (
self.api._ovs.manager_options + [row])
class GetManagerCommand(BaseCommand):
def __init__(self, api):
super(GetManagerCommand, self).__init__(api)
def run_idl(self, txn):
self.result = [m.target for m in
self.api._tables['Manager'].rows.values()]
class RemoveManagerCommand(BaseCommand):
def __init__(self, api, target):
super(RemoveManagerCommand, self).__init__(api)
self.target = target
def run_idl(self, txn):
try:
manager = idlutils.row_by_value(self.api.idl, 'Manager', 'target',
self.target)
except idlutils.RowNotFound:
msg = _("Manager with target %s does not exist") % self.target
LOG.error(msg)
raise RuntimeError(msg)
try:
self.api._ovs.delvalue('manager_options', manager)
except AttributeError: # OVS < 2.6
self.api._ovs.verify('manager_options')
manager_list = self.api._ovs.manager_options
manager_list.remove(manager)
self.api._ovs.manager_options = manager_list
manager.delete()
class AddBridgeCommand(BaseCommand):
def __init__(self, api, name, may_exist, datapath_type):
super(AddBridgeCommand, self).__init__(api)
self.name = name
self.may_exist = may_exist
self.datapath_type = datapath_type
def run_idl(self, txn):
if self.may_exist:
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
self.name, None)
if br:
if self.datapath_type:
br.datapath_type = self.datapath_type
return
row = txn.insert(self.api._tables['Bridge'])
row.name = self.name
if self.datapath_type:
row.datapath_type = self.datapath_type
try:
self.api._ovs.addvalue('bridges', row)
except AttributeError: # OVS < 2.6
self.api._ovs.verify('bridges')
self.api._ovs.bridges = self.api._ovs.bridges + [row]
# Add the internal bridge port
cmd = AddPortCommand(self.api, self.name, self.name, self.may_exist)
cmd.run_idl(txn)
cmd = DbSetCommand(self.api, 'Interface', self.name,
('type', 'internal'))
cmd.run_idl(txn)
class DelBridgeCommand(BaseCommand):
def __init__(self, api, name, if_exists):
super(DelBridgeCommand, self).__init__(api)
self.name = name
self.if_exists = if_exists
def run_idl(self, txn):
try:
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
self.name)
except idlutils.RowNotFound:
if self.if_exists:
return
else:
msg = _("Bridge %s does not exist") % self.name
LOG.error(msg)
raise RuntimeError(msg)
# Clean up cached ports/interfaces
for port in br.ports:
for interface in port.interfaces:
interface.delete()
port.delete()
try:
self.api._ovs.delvalue('bridges', br)
except AttributeError: # OVS < 2.6
self.api._ovs.verify('bridges')
bridges = self.api._ovs.bridges
bridges.remove(br)
self.api._ovs.bridges = bridges
br.delete()
class BridgeExistsCommand(BaseCommand):
def __init__(self, api, name):
super(BridgeExistsCommand, self).__init__(api)
self.name = name
def run_idl(self, txn):
self.result = bool(idlutils.row_by_value(self.api.idl, 'Bridge',
'name', self.name, None))
class ListBridgesCommand(BaseCommand):
def __init__(self, api):
super(ListBridgesCommand, self).__init__(api)
def run_idl(self, txn):
# NOTE (twilson) [x.name for x in rows.values()] if no index
self.result = [x.name for x in
self.api._tables['Bridge'].rows.values()]
class BrGetExternalIdCommand(BaseCommand):
def __init__(self, api, name, field):
super(BrGetExternalIdCommand, self).__init__(api)
self.name = name
self.field = field
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.name)
self.result = br.external_ids[self.field]
class BrSetExternalIdCommand(BaseCommand):
def __init__(self, api, name, field, value):
super(BrSetExternalIdCommand, self).__init__(api)
self.name = name
self.field = field
self.value = value
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.name)
external_ids = getattr(br, 'external_ids', {})
external_ids[self.field] = self.value
br.external_ids = external_ids
class DbCreateCommand(BaseCommand):
def __init__(self, api, table, **columns):
super(DbCreateCommand, self).__init__(api)
self.table = table
self.columns = columns
def run_idl(self, txn):
row = txn.insert(self.api._tables[self.table])
for col, val in self.columns.items():
setattr(row, col, idlutils.db_replace_record(val))
# This is a temporary row to be used within the transaction
self.result = row
def post_commit(self, txn):
# Replace the temporary row with the post-commit UUID to match vsctl
self.result = txn.get_insert_uuid(self.result.uuid)
class DbDestroyCommand(BaseCommand):
def __init__(self, api, table, record):
super(DbDestroyCommand, self).__init__(api)
self.table = table
self.record = record
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
record.delete()
class DbSetCommand(BaseCommand):
def __init__(self, api, table, record, *col_values):
super(DbSetCommand, self).__init__(api)
self.table = table
self.record = record
self.col_values = col_values
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
for col, val in self.col_values:
# TODO(twilson) Ugh, the OVS library doesn't like OrderedDict
# We're only using it to make a unit test work, so we should fix
# this soon.
if isinstance(val, collections.OrderedDict):
val = dict(val)
if isinstance(val, dict):
# NOTE(twilson) OVS 2.6's Python IDL has mutate methods that
# would make this cleaner, but it's too early to rely on them.
existing = getattr(record, col, {})
existing.update(val)
val = existing
setattr(record, col, idlutils.db_replace_record(val))
class DbAddCommand(BaseCommand):
def __init__(self, api, table, record, column, *values):
super(DbAddCommand, self).__init__(api)
self.table = table
self.record = record
self.column = column
self.values = values
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
for value in self.values:
if isinstance(value, collections.Mapping):
# We should be doing an add on a 'map' column. If the key is
# already set, do nothing, otherwise set the key to the value
# Since this operation depends on the previous value, verify()
# must be called.
field = getattr(record, self.column, {})
for k, v in value.items():
if k in field:
continue
field[k] = v
else:
# We should be appending to a 'set' column.
try:
record.addvalue(self.column,
idlutils.db_replace_record(value))
continue
except AttributeError: # OVS < 2.6
field = getattr(record, self.column, [])
field.append(value)
record.verify(self.column)
setattr(record, self.column, idlutils.db_replace_record(field))
class DbClearCommand(BaseCommand):
def __init__(self, api, table, record, column):
super(DbClearCommand, self).__init__(api)
self.table = table
self.record = record
self.column = column
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
# Create an empty value of the column type
value = type(getattr(record, self.column))()
setattr(record, self.column, value)
class DbGetCommand(BaseCommand):
def __init__(self, api, table, record, column):
super(DbGetCommand, self).__init__(api)
self.table = table
self.record = record
self.column = column
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
# TODO(twilson) This feels wrong, but ovs-vsctl returns single results
# on set types without the list. The IDL is returning them as lists,
# even if the set has the maximum number of items set to 1. Might be
# able to inspect the Schema and just do this conversion for that case.
result = idlutils.get_column_value(record, self.column)
if isinstance(result, list) and len(result) == 1:
self.result = result[0]
else:
self.result = result
class SetControllerCommand(BaseCommand):
def __init__(self, api, bridge, targets):
super(SetControllerCommand, self).__init__(api)
self.bridge = bridge
self.targets = targets
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
controllers = []
for target in self.targets:
controller = txn.insert(self.api._tables['Controller'])
controller.target = target
controllers.append(controller)
# Don't need to verify because we unconditionally overwrite
br.controller = controllers
class DelControllerCommand(BaseCommand):
def __init__(self, api, bridge):
super(DelControllerCommand, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
br.controller = []
class GetControllerCommand(BaseCommand):
def __init__(self, api, bridge):
super(GetControllerCommand, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [c.target for c in br.controller]
class SetFailModeCommand(BaseCommand):
def __init__(self, api, bridge, mode):
super(SetFailModeCommand, self).__init__(api)
self.bridge = bridge
self.mode = mode
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
br.fail_mode = self.mode
class AddPortCommand(BaseCommand):
def __init__(self, api, bridge, port, may_exist):
super(AddPortCommand, self).__init__(api)
self.bridge = bridge
self.port = port
self.may_exist = may_exist
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
if self.may_exist:
port = idlutils.row_by_value(self.api.idl, 'Port', 'name',
self.port, None)
if port:
return
port = txn.insert(self.api._tables['Port'])
port.name = self.port
try:
br.addvalue('ports', port)
except AttributeError: # OVS < 2.6
br.verify('ports')
ports = getattr(br, 'ports', [])
ports.append(port)
br.ports = ports
iface = txn.insert(self.api._tables['Interface'])
txn.expected_ifaces.add(iface.uuid)
iface.name = self.port
# This is a new port, so it won't have any existing interfaces
port.interfaces = [iface]
class DelPortCommand(BaseCommand):
def __init__(self, api, port, bridge, if_exists):
super(DelPortCommand, self).__init__(api)
self.port = port
self.bridge = bridge
self.if_exists = if_exists
def run_idl(self, txn):
try:
port = idlutils.row_by_value(self.api.idl, 'Port', 'name',
self.port)
except idlutils.RowNotFound:
if self.if_exists:
return
msg = _("Port %s does not exist") % self.port
raise RuntimeError(msg)
if self.bridge:
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name',
self.bridge)
else:
br = next(b for b in self.api._tables['Bridge'].rows.values()
if port in b.ports)
if port not in br.ports and not self.if_exists:
# TODO(twilson) Make real errors across both implementations
msg = _("Port %(port)s does not exist on %(bridge)s!") % {
'port': self.port, 'bridge': self.bridge
}
LOG.error(msg)
raise RuntimeError(msg)
try:
br.delvalue('ports', port)
except AttributeError: # OVS < 2.6
br.verify('ports')
ports = br.ports
ports.remove(port)
br.ports = ports
# The interface on the port will be cleaned up by ovsdb-server
for interface in port.interfaces:
interface.delete()
port.delete()
class ListPortsCommand(BaseCommand):
def __init__(self, api, bridge):
super(ListPortsCommand, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [p.name for p in br.ports if p.name != self.bridge]
class ListIfacesCommand(BaseCommand):
def __init__(self, api, bridge):
super(ListIfacesCommand, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [i.name for p in br.ports if p.name != self.bridge
for i in p.interfaces]
class PortToBridgeCommand(BaseCommand):
def __init__(self, api, name):
super(PortToBridgeCommand, self).__init__(api)
self.name = name
def run_idl(self, txn):
# TODO(twilson) This is expensive!
# This traversal of all ports could be eliminated by caching the bridge
# name on the Port's external_id field
# In fact, if we did that, the only place that uses to_br functions
# could just add the external_id field to the conditions passed to find
port = idlutils.row_by_value(self.api.idl, 'Port', 'name', self.name)
bridges = self.api._tables['Bridge'].rows.values()
self.result = next(br.name for br in bridges if port in br.ports)
class InterfaceToBridgeCommand(BaseCommand):
def __init__(self, api, name):
super(InterfaceToBridgeCommand, self).__init__(api)
self.name = name
def run_idl(self, txn):
interface = idlutils.row_by_value(self.api.idl, 'Interface', 'name',
self.name)
ports = self.api._tables['Port'].rows.values()
pname = next(
port for port in ports if interface in port.interfaces)
bridges = self.api._tables['Bridge'].rows.values()
self.result = next(br.name for br in bridges if pname in br.ports)
class DbListCommand(BaseCommand):
def __init__(self, api, table, records, columns, if_exists):
super(DbListCommand, self).__init__(api)
self.table = table
self.columns = columns
self.if_exists = if_exists
self.records = records
def run_idl(self, txn):
table_schema = self.api._tables[self.table]
columns = self.columns or list(table_schema.columns.keys()) + ['_uuid']
if self.records:
row_uuids = []
for record in self.records:
try:
row_uuids.append(idlutils.row_by_record(
self.api.idl, self.table, record).uuid)
except idlutils.RowNotFound:
if self.if_exists:
continue
# NOTE(kevinbenton): this is converted to a RuntimeError
# for compat with the vsctl version. It might make more
# sense to change this to a RowNotFoundError in the future.
raise RuntimeError(_(
"Row doesn't exist in the DB. Request info: "
"Table=%(table)s. Columns=%(columns)s. "
"Records=%(records)s.") % {
"table": self.table,
"columns": self.columns,
"records": self.records,
})
else:
row_uuids = table_schema.rows.keys()
self.result = [
{
c: idlutils.get_column_value(table_schema.rows[uuid], c)
for c in columns
}
for uuid in row_uuids
]
class DbFindCommand(BaseCommand):
def __init__(self, api, table, *conditions, **kwargs):
super(DbFindCommand, self).__init__(api)
self.table = self.api._tables[table]
self.conditions = conditions
self.columns = (kwargs.get('columns') or
list(self.table.columns.keys()) + ['_uuid'])
def run_idl(self, txn):
self.result = [
{
c: idlutils.get_column_value(r, c)
for c in self.columns
}
for r in self.table.rows.values()
if idlutils.row_match(r, self.conditions)
]
_deprecate._MovedGlobals(commands)

@ -12,150 +12,36 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import threading
import traceback
from debtcollector import removals
from debtcollector import moves
from oslo_config import cfg
from ovs.db import idl
from ovs import poller
import six
from six.moves import queue as Queue
from neutron._i18n import _
from neutron.agent.ovsdb.native import idlutils
class TransactionQueue(Queue.Queue, object):
def __init__(self, *args, **kwargs):
super(TransactionQueue, self).__init__(*args, **kwargs)
alertpipe = os.pipe()
# NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. Will get
# around this constraint by using binary mode.
self.alertin = os.fdopen(alertpipe[0], 'rb', 0)
self.alertout = os.fdopen(alertpipe[1], 'wb', 0)
def get_nowait(self, *args, **kwargs):
try:
result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
except Queue.Empty:
return None
self.alertin.read(1)
return result
def put(self, *args, **kwargs):
super(TransactionQueue, self).put(*args, **kwargs)
self.alertout.write(six.b('X'))
self.alertout.flush()
@property
def alert_fileno(self):
return self.alertin.fileno()