Merge "Add native OVSDB implementation of OVSDB API"
This commit is contained in:
commit
62b1cac8f2
@ -40,6 +40,16 @@
|
||||
# so long as it is set to True.
|
||||
# use_veth_interconnection = False
|
||||
|
||||
# (StrOpt) Which OVSDB backend to use, defaults to 'vsctl'
|
||||
# vsctl - The backend based on executing ovs-vsctl
|
||||
# native - The backend based on using native OVSDB
|
||||
# ovsdb_interface = vsctl
|
||||
|
||||
# (StrOpt) The connection string for the native OVSDB backend
|
||||
# To enable ovsdb-server to listen on port 6640:
|
||||
# ovs-vsctl set-manager ptcp:6640:127.0.0.1
|
||||
# ovsdb_connection = tcp:127.0.0.1:6640
|
||||
|
||||
[agent]
|
||||
# Agent's polling interval in seconds
|
||||
# polling_interval = 2
|
||||
|
@ -102,7 +102,10 @@ class BaseOVS(object):
|
||||
|
||||
def add_bridge(self, bridge_name):
|
||||
self.ovsdb.add_br(bridge_name).execute()
|
||||
return OVSBridge(bridge_name)
|
||||
br = OVSBridge(bridge_name)
|
||||
# Don't return until vswitchd sets up the internal port
|
||||
br.get_port_ofport(bridge_name)
|
||||
return br
|
||||
|
||||
def delete_bridge(self, bridge_name):
|
||||
self.ovsdb.del_br(bridge_name).execute()
|
||||
@ -162,6 +165,8 @@ class OVSBridge(BaseOVS):
|
||||
|
||||
def create(self):
|
||||
self.ovsdb.add_br(self.br_name).execute()
|
||||
# Don't return until vswitchd sets up the internal port
|
||||
self.get_port_ofport(self.br_name)
|
||||
|
||||
def destroy(self):
|
||||
self.delete_bridge(self.br_name)
|
||||
@ -191,6 +196,8 @@ class OVSBridge(BaseOVS):
|
||||
if interface_attr_tuples:
|
||||
txn.add(self.ovsdb.db_set('Interface', port_name,
|
||||
*interface_attr_tuples))
|
||||
# Don't return until the port has been assigned by vswitchd
|
||||
self.get_port_ofport(port_name)
|
||||
|
||||
def delete_port(self, port_name):
|
||||
self.ovsdb.del_port(port_name, self.br_name).execute()
|
||||
|
@ -20,6 +20,7 @@ import six
|
||||
|
||||
interface_map = {
|
||||
'vsctl': 'neutron.agent.ovsdb.impl_vsctl.OvsdbVsctl',
|
||||
'native': 'neutron.agent.ovsdb.impl_idl.OvsdbIdl',
|
||||
}
|
||||
|
||||
OPTS = [
|
||||
|
195
neutron/agent/ovsdb/impl_idl.py
Normal file
195
neutron/agent/ovsdb/impl_idl.py
Normal file
@ -0,0 +1,195 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import Queue
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from ovs.db import idl
|
||||
|
||||
from neutron.agent.ovsdb import api
|
||||
from neutron.agent.ovsdb.native import commands as cmd
|
||||
from neutron.agent.ovsdb.native import connection
|
||||
from neutron.agent.ovsdb.native import idlutils
|
||||
from neutron.i18n import _LE
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('ovsdb_connection',
|
||||
default='tcp:127.0.0.1:6640',
|
||||
help=_('The connection string for the native OVSDB backend')),
|
||||
]
|
||||
cfg.CONF.register_opts(OPTS, 'OVS')
|
||||
# TODO(twilson) DEFAULT.ovs_vsctl_timeout should be OVS.vsctl_timeout
|
||||
cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.linux.ovs_lib')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ovsdb_connection = connection.Connection(cfg.CONF.OVS.ovsdb_connection,
|
||||
cfg.CONF.ovs_vsctl_timeout)
|
||||
|
||||
|
||||
class Transaction(api.Transaction):
|
||||
def __init__(self, context, api, check_error=False, log_errors=False):
|
||||
self.context = context
|
||||
self.api = api
|
||||
self.check_error = check_error
|
||||
self.log_errors = log_errors
|
||||
self.commands = []
|
||||
self.results = Queue.Queue(1)
|
||||
|
||||
def add(self, command):
|
||||
"""Add a command to the transaction
|
||||
|
||||
returns The command passed as a convenience
|
||||
"""
|
||||
|
||||
self.commands.append(command)
|
||||
return command
|
||||
|
||||
def commit(self):
|
||||
ovsdb_connection.queue_txn(self)
|
||||
result = self.results.get()
|
||||
if isinstance(result, Exception) and self.check_error:
|
||||
raise result
|
||||
return result
|
||||
|
||||
def do_commit(self):
|
||||
start_time = time.time()
|
||||
attempts = 0
|
||||
while True:
|
||||
elapsed_time = time.time() - start_time
|
||||
if attempts > 0 and elapsed_time > self.context.vsctl_timeout:
|
||||
raise RuntimeError("OVS transaction timed out")
|
||||
attempts += 1
|
||||
# TODO(twilson) Make sure we don't loop longer than vsctl_timeout
|
||||
txn = idl.Transaction(self.api.idl)
|
||||
for i, command in enumerate(self.commands):
|
||||
LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s",
|
||||
{'idx': i, 'cmd': command})
|
||||
try:
|
||||
command.run_idl(txn)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
txn.abort()
|
||||
if not self.check_error:
|
||||
ctx.reraise = False
|
||||
seqno = self.api.idl.change_seqno
|
||||
status = txn.commit_block()
|
||||
if status == txn.TRY_AGAIN:
|
||||
LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying")
|
||||
if self.api.idl._session.rpc.status != 0:
|
||||
LOG.debug("Lost connection to OVSDB, reconnecting!")
|
||||
self.api.idl.force_reconnect()
|
||||
idlutils.wait_for_change(
|
||||
self.api.idl, self.context.vsctl_timeout - elapsed_time,
|
||||
seqno)
|
||||
continue
|
||||
elif status == txn.ERROR:
|
||||
msg = _LE("OVSDB Error: %s") % txn.get_error()
|
||||
if self.log_errors:
|
||||
LOG.error(msg)
|
||||
if self.check_error:
|
||||
# For now, raise similar error to vsctl/utils.execute()
|
||||
raise RuntimeError(msg)
|
||||
return
|
||||
elif status == txn.ABORTED:
|
||||
LOG.debug("Transaction aborted")
|
||||
return
|
||||
elif status == txn.UNCHANGED:
|
||||
LOG.debug("Transaction caused no change")
|
||||
|
||||
return [cmd.result for cmd in self.commands]
|
||||
|
||||
|
||||
class OvsdbIdl(api.API):
|
||||
def __init__(self, context):
|
||||
super(OvsdbIdl, self).__init__(context)
|
||||
ovsdb_connection.start()
|
||||
self.idl = ovsdb_connection.idl
|
||||
|
||||
@property
|
||||
def _tables(self):
|
||||
return self.idl.tables
|
||||
|
||||
@property
|
||||
def _ovs(self):
|
||||
return self._tables['Open_vSwitch'].rows.values()[0]
|
||||
|
||||
def transaction(self, check_error=False, log_errors=True, **kwargs):
|
||||
return Transaction(self.context, self, check_error, log_errors)
|
||||
|
||||
def add_br(self, name, may_exist=True):
|
||||
return cmd.AddBridgeCommand(self, name, may_exist)
|
||||
|
||||
def del_br(self, name, if_exists=True):
|
||||
return cmd.DelBridgeCommand(self, name, if_exists)
|
||||
|
||||
def br_exists(self, name):
|
||||
return cmd.BridgeExistsCommand(self, name)
|
||||
|
||||
def port_to_br(self, name):
|
||||
return cmd.PortToBridgeCommand(self, name)
|
||||
|
||||
def iface_to_br(self, name):
|
||||
# For our purposes, ports and interfaces always have the same name
|
||||
return cmd.PortToBridgeCommand(self, name)
|
||||
|
||||
def list_br(self):
|
||||
return cmd.ListBridgesCommand(self)
|
||||
|
||||
def br_get_external_id(self, name, field):
|
||||
return cmd.BrGetExternalIdCommand(self, name, field)
|
||||
|
||||
def br_set_external_id(self, name, field, value):
|
||||
return cmd.BrSetExternalIdCommand(self, name, field, value)
|
||||
|
||||
def db_set(self, table, record, *col_values):
|
||||
return cmd.DbSetCommand(self, table, record, *col_values)
|
||||
|
||||
def db_clear(self, table, record, column):
|
||||
return cmd.DbClearCommand(self, table, record, column)
|
||||
|
||||
def db_get(self, table, record, column):
|
||||
return cmd.DbGetCommand(self, table, record, column)
|
||||
|
||||
def db_list(self, table, records=None, columns=None, if_exists=False):
|
||||
return cmd.DbListCommand(self, table, records, columns, if_exists)
|
||||
|
||||
def db_find(self, table, *conditions, **kwargs):
|
||||
return cmd.DbFindCommand(self, table, *conditions, **kwargs)
|
||||
|
||||
def set_controller(self, bridge, controllers):
|
||||
return cmd.SetControllerCommand(self, bridge, controllers)
|
||||
|
||||
def del_controller(self, bridge):
|
||||
return cmd.DelControllerCommand(self, bridge)
|
||||
|
||||
def get_controller(self, bridge):
|
||||
return cmd.GetControllerCommand(self, bridge)
|
||||
|
||||
def set_fail_mode(self, bridge, mode):
|
||||
return cmd.SetFailModeCommand(self, bridge, mode)
|
||||
|
||||
def add_port(self, bridge, port, may_exist=True):
|
||||
return cmd.AddPortCommand(self, bridge, port, may_exist)
|
||||
|
||||
def del_port(self, port, bridge=None, if_exists=True):
|
||||
return cmd.DelPortCommand(self, port, bridge, if_exists)
|
||||
|
||||
def list_ports(self, bridge):
|
||||
return cmd.ListPortsCommand(self, bridge)
|
0
neutron/agent/ovsdb/native/__init__.py
Normal file
0
neutron/agent/ovsdb/native/__init__.py
Normal file
406
neutron/agent/ovsdb/native/commands.py
Normal file
406
neutron/agent/ovsdb/native/commands.py
Normal file
@ -0,0 +1,406 @@
|
||||
# Copyright (c) 2015 Openstack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from neutron.agent.ovsdb import api
|
||||
from neutron.agent.ovsdb.native import idlutils
|
||||
from neutron.common import exceptions
|
||||
from neutron.i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RowNotFound(exceptions.NeutronException):
|
||||
message = _("Table %(table)s has no row with %(col)s=%(match)s")
|
||||
|
||||
|
||||
class BaseCommand(api.Command):
|
||||
def __init__(self, api):
|
||||
self.api = api
|
||||
self.result = None
|
||||
|
||||
def execute(self, check_error=False, log_errors=True):
|
||||
try:
|
||||
with self.api.transaction(check_error, log_errors) as txn:
|
||||
txn.add(self)
|
||||
return self.result
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
if log_errors:
|
||||
LOG.exception(_LE("Error executing command"))
|
||||
if not check_error:
|
||||
ctx.reraise = False
|
||||
|
||||
def row_by_index(self, table, match, *default):
|
||||
tab = self.api._tables[table]
|
||||
idx = idlutils.get_index_column(tab)
|
||||
return self.row_by_value(table, idx, match, *default)
|
||||
|
||||
def row_by_value(self, table, column, match, *default):
|
||||
tab = self.api._tables[table]
|
||||
try:
|
||||
return next(r for r in tab.rows.values()
|
||||
if getattr(r, column) == match)
|
||||
except StopIteration:
|
||||
if len(default) == 1:
|
||||
return default[0]
|
||||
else:
|
||||
raise RowNotFound(table=table, col=column, match=match)
|
||||
|
||||
def __str__(self):
|
||||
command_info = self.__dict__
|
||||
return "%s(%s)" % (
|
||||
self.__class__.__name__,
|
||||
", ".join("%s=%s" % (k, v) for k, v in command_info.items()
|
||||
if k not in ['api', 'result']))
|
||||
|
||||
|
||||
class AddBridgeCommand(BaseCommand):
|
||||
def __init__(self, api, name, may_exist):
|
||||
super(AddBridgeCommand, self).__init__(api)
|
||||
self.name = name
|
||||
self.may_exist = may_exist
|
||||
|
||||
def run_idl(self, txn):
|
||||
if self.may_exist:
|
||||
br = self.row_by_value('Bridge', 'name', self.name, None)
|
||||
if br:
|
||||
return
|
||||
row = txn.insert(self.api._tables['Bridge'])
|
||||
row.name = self.name
|
||||
self.api._ovs.verify('bridges')
|
||||
self.api._ovs.bridges = self.api._ovs.bridges + [row]
|
||||
|
||||
# Add the internal bridge port
|
||||
cmd = AddPortCommand(self.api, self.name, self.name, self.may_exist)
|
||||
cmd.run_idl(txn)
|
||||
|
||||
cmd = DbSetCommand(self.api, 'Interface', self.name,
|
||||
('type', 'internal'))
|
||||
cmd.run_idl(txn)
|
||||
|
||||
|
||||
class DelBridgeCommand(BaseCommand):
|
||||
def __init__(self, api, name, if_exists):
|
||||
super(DelBridgeCommand, self).__init__(api)
|
||||
self.name = name
|
||||
self.if_exists = if_exists
|
||||
|
||||
def run_idl(self, txn):
|
||||
try:
|
||||
br = self.row_by_value('Bridge', 'name', self.name)
|
||||
except RowNotFound:
|
||||
if self.if_exists:
|
||||
return
|
||||
else:
|
||||
msg = _LE("Bridge %s does not exist") % self.name
|
||||
LOG.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.api._ovs.verify('bridges')
|
||||
for port in br.ports:
|
||||
cmd = DelPortCommand(self.api, port.name, self.name,
|
||||
if_exists=True)
|
||||
cmd.run_idl(txn)
|
||||
bridges = self.api._ovs.bridges
|
||||
bridges.remove(br)
|
||||
self.api._ovs.bridges = bridges
|
||||
del self.api._tables['Bridge'].rows[br.uuid]
|
||||
|
||||
|
||||
class BridgeExistsCommand(BaseCommand):
|
||||
def __init__(self, api, name):
|
||||
super(BridgeExistsCommand, self).__init__(api)
|
||||
self.name = name
|
||||
|
||||
def run_idl(self, txn):
|
||||
self.result = bool(self.row_by_value('Bridge',
|
||||
'name', self.name, None))
|
||||
|
||||
|
||||
class ListBridgesCommand(BaseCommand):
|
||||
def __init__(self, api):
|
||||
super(ListBridgesCommand, self).__init__(api)
|
||||
|
||||
def run_idl(self, txn):
|
||||
# NOTE (twilson) [x.name for x in rows.values()] if no index
|
||||
self.result = [x.name for x in
|
||||
self.api._tables['Bridge'].rows.values()]
|
||||
|
||||
|
||||
class BrGetExternalIdCommand(BaseCommand):
|
||||
def __init__(self, api, name, field):
|
||||
super(BrGetExternalIdCommand, self).__init__(api)
|
||||
self.name = name
|
||||
self.field = field
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.name)
|
||||
self.result = br.external_ids[self.field]
|
||||
|
||||
|
||||
class BrSetExternalIdCommand(BaseCommand):
|
||||
def __init__(self, api, name, field, value):
|
||||
super(BrSetExternalIdCommand, self).__init__(api)
|
||||
self.name = name
|
||||
self.field = field
|
||||
self.value = value
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.name)
|
||||
external_ids = getattr(br, 'external_ids', {})
|
||||
external_ids[self.field] = self.value
|
||||
br.external_ids = external_ids
|
||||
|
||||
|
||||
class DbSetCommand(BaseCommand):
|
||||
def __init__(self, api, table, record, *col_values):
|
||||
super(DbSetCommand, self).__init__(api)
|
||||
self.table = table
|
||||
self.record = record
|
||||
self.col_values = col_values
|
||||
|
||||
def run_idl(self, txn):
|
||||
record = self.row_by_index(self.table, self.record)
|
||||
for col, val in self.col_values:
|
||||
# TODO(twilson) Ugh, the OVS library doesn't like OrderedDict
|
||||
# We're only using it to make a unit test work, so we should fix
|
||||
# this soon.
|
||||
if isinstance(val, collections.OrderedDict):
|
||||
val = dict(val)
|
||||
setattr(record, col, val)
|
||||
|
||||
|
||||
class DbClearCommand(BaseCommand):
|
||||
def __init__(self, api, table, record, column):
|
||||
super(DbClearCommand, self).__init__(api)
|
||||
self.table = table
|
||||
self.record = record
|
||||
self.column = column
|
||||
|
||||
def run_idl(self, txn):
|
||||
record = self.row_by_index(self.table, self.record)
|
||||
# Create an empty value of the column type
|
||||
value = type(getattr(record, self.column))()
|
||||
setattr(record, self.column, value)
|
||||
|
||||
|
||||
class DbGetCommand(BaseCommand):
|
||||
def __init__(self, api, table, record, column):
|
||||
super(DbGetCommand, self).__init__(api)
|
||||
self.table = table
|
||||
self.record = record
|
||||
self.column = column
|
||||
|
||||
def run_idl(self, txn):
|
||||
record = self.row_by_index(self.table, self.record)
|
||||
# TODO(twilson) This feels wrong, but ovs-vsctl returns single results
|
||||
# on set types without the list. The IDL is returning them as lists,
|
||||
# even if the set has the maximum number of items set to 1. Might be
|
||||
# able to inspect the Schema and just do this conversion for that case.
|
||||
result = getattr(record, self.column)
|
||||
if isinstance(result, list) and len(result) == 1:
|
||||
self.result = result[0]
|
||||
else:
|
||||
self.result = result
|
||||
|
||||
|
||||
class SetControllerCommand(BaseCommand):
|
||||
def __init__(self, api, bridge, targets):
|
||||
super(SetControllerCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
self.targets = targets
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
controllers = []
|
||||
for target in self.targets:
|
||||
controller = txn.insert(self.api._tables['Controller'])
|
||||
controller.target = target
|
||||
controllers.append(controller)
|
||||
br.verify('controller')
|
||||
br.controller = controllers
|
||||
|
||||
|
||||
class DelControllerCommand(BaseCommand):
|
||||
def __init__(self, api, bridge):
|
||||
super(DelControllerCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
br.controller = []
|
||||
|
||||
|
||||
class GetControllerCommand(BaseCommand):
|
||||
def __init__(self, api, bridge):
|
||||
super(GetControllerCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
br.verify('controller')
|
||||
self.result = [c.target for c in br.controller]
|
||||
|
||||
|
||||
class SetFailModeCommand(BaseCommand):
|
||||
def __init__(self, api, bridge, mode):
|
||||
super(SetFailModeCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
self.mode = mode
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
br.verify('fail_mode')
|
||||
br.fail_mode = self.mode
|
||||
|
||||
|
||||
class AddPortCommand(BaseCommand):
|
||||
def __init__(self, api, bridge, port, may_exist):
|
||||
super(AddPortCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
self.port = port
|
||||
self.may_exist = may_exist
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
if self.may_exist:
|
||||
port = self.row_by_value('Port', 'name', self.port, None)
|
||||
if port:
|
||||
return
|
||||
port = txn.insert(self.api._tables['Port'])
|
||||
port.name = self.port
|
||||
br.verify('ports')
|
||||
ports = getattr(br, 'ports', [])
|
||||
ports.append(port)
|
||||
br.ports = ports
|
||||
|
||||
iface = txn.insert(self.api._tables['Interface'])
|
||||
iface.name = self.port
|
||||
port.verify('interfaces')
|
||||
ifaces = getattr(port, 'interfaces', [])
|
||||
ifaces.append(iface)
|
||||
port.interfaces = ifaces
|
||||
|
||||
|
||||
class DelPortCommand(BaseCommand):
|
||||
def __init__(self, api, port, bridge, if_exists):
|
||||
super(DelPortCommand, self).__init__(api)
|
||||
self.port = port
|
||||
self.bridge = bridge
|
||||
self.if_exists = if_exists
|
||||
|
||||
def run_idl(self, txn):
|
||||
try:
|
||||
port = self.row_by_value('Port', 'name', self.port)
|
||||
except RowNotFound:
|
||||
if self.if_exists:
|
||||
return
|
||||
msg = _LE("Port %s does not exist") % self.port
|
||||
raise RuntimeError(msg)
|
||||
if self.bridge:
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
else:
|
||||
br = next(b for b in self.api._tables['Bridge'].rows.values()
|
||||
if port in b.ports)
|
||||
|
||||
if port.uuid not in br.ports and not self.if_exists:
|
||||
# TODO(twilson) Make real errors across both implementations
|
||||
msg = _LE("Port %(port)s does not exist on %(bridge)s!") % {
|
||||
'port': self.name, 'bridge': self.bridge
|
||||
}
|
||||
LOG.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
br.verify('ports')
|
||||
ports = br.ports
|
||||
ports.remove(port)
|
||||
br.ports = ports
|
||||
|
||||
# Also remove port/interface directly for indexing?
|
||||
port.verify('interfaces')
|
||||
for iface in port.interfaces:
|
||||
del self.api._tables['Interface'].rows[iface.uuid]
|
||||
del self.api._tables['Port'].rows[port.uuid]
|
||||
|
||||
|
||||
class ListPortsCommand(BaseCommand):
|
||||
def __init__(self, api, bridge):
|
||||
super(ListPortsCommand, self).__init__(api)
|
||||
self.bridge = bridge
|
||||
|
||||
def run_idl(self, txn):
|
||||
br = self.row_by_value('Bridge', 'name', self.bridge)
|
||||
self.result = [p.name for p in br.ports if p.name != self.bridge]
|
||||
|
||||
|
||||
class PortToBridgeCommand(BaseCommand):
|
||||
def __init__(self, api, name):
|
||||
super(PortToBridgeCommand, self).__init__(api)
|
||||
self.name = name
|
||||
|
||||
def run_idl(self, txn):
|
||||
# TODO(twilson) This is expensive!
|
||||
# This traversal of all ports could be eliminated by caching the bridge
|
||||
# name on the Port's (or Interface's for iface_to_br) external_id field
|
||||
# In fact, if we did that, the only place that uses to_br functions
|
||||
# could just add the external_id field to the conditions passed to find
|
||||
port = self.row_by_value('Port', 'name', self.name)
|
||||
bridges = self.api._tables['Bridge'].rows.values()
|
||||
self.result = next(br.name for br in bridges if port in br.ports)
|
||||
|
||||
|
||||
class DbListCommand(BaseCommand):
|
||||
def __init__(self, api, table, records, columns, if_exists):
|
||||
super(DbListCommand, self).__init__(api)
|
||||
self.table = self.api._tables[table]
|
||||
self.columns = columns or self.table.columns.keys() + ['_uuid']
|
||||
self.if_exists = if_exists
|
||||
idx = idlutils.get_index_column(self.table)
|
||||
if records:
|
||||
self.records = [uuid for uuid, row in self.table.rows.items()
|
||||
if getattr(row, idx) in records]
|
||||
else:
|
||||
self.records = self.table.rows.keys()
|
||||
|
||||
def run_idl(self, txn):
|
||||
self.result = [
|
||||
{
|
||||
c: idlutils.get_column_value(self.table.rows[uuid], c)
|
||||
for c in self.columns
|
||||
}
|
||||
for uuid in self.records
|
||||
]
|
||||
|
||||
|
||||
class DbFindCommand(BaseCommand):
|
||||
def __init__(self, api, table, *conditions, **kwargs):
|
||||
super(DbFindCommand, self).__init__(api)
|
||||
self.table = self.api._tables[table]
|
||||
self.conditions = conditions
|
||||
self.columns = (kwargs.get('columns') or
|
||||
self.table.columns.keys() + ['_uuid'])
|
||||
|
||||
def run_idl(self, txn):
|
||||
self.result = [
|
||||
{
|
||||
c: idlutils.get_column_value(r, c)
|
||||
for c in self.columns
|
||||
}
|
||||
for r in self.table.rows.values()
|
||||
if idlutils.row_match(r, self.conditions)
|
||||
]
|
87
neutron/agent/ovsdb/native/connection.py
Normal file
87
neutron/agent/ovsdb/native/connection.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import Queue
|
||||
import threading
|
||||
|
||||
from ovs.db import idl
|
||||
from ovs import poller
|
||||
|
||||
from neutron.agent.ovsdb.native import idlutils
|
||||
|
||||
|
||||
class TransactionQueue(Queue.Queue, object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TransactionQueue, self).__init__(*args, **kwargs)
|
||||
alertpipe = os.pipe()
|
||||
self.alertin = os.fdopen(alertpipe[0], 'r', 0)
|
||||
self.alertout = os.fdopen(alertpipe[1], 'w', 0)
|
||||
|
||||
def get_nowait(self, *args, **kwargs):
|
||||
try:
|
||||
result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
|
||||
except Queue.Empty:
|
||||
return None
|
||||
self.alertin.read(1)
|
||||
return result
|
||||
|
||||
def put(self, *args, **kwargs):
|
||||
super(TransactionQueue, self).put(*args, **kwargs)
|
||||
self.alertout.write('X')
|
||||
self.alertout.flush()
|
||||
|
||||
@property
|
||||
def alert_fileno(self):
|
||||
return self.alertin.fileno()
|
||||
|
||||
|
||||
class Connection(object):
|
||||
def __init__(self, connection, timeout):
|
||||
self.idl = None
|
||||
self.connection = connection
|
||||
self.timeout = timeout
|
||||
self.txns = TransactionQueue(1)
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def start(self):
|
||||
with self.lock:
|
||||
if self.idl is not None:
|
||||
return
|
||||
|
||||
helper = idlutils.get_schema_helper(self.connection)
|
||||
helper.register_all()
|
||||
self.idl = idl.Idl(self.connection, helper)
|
||||
idlutils.wait_for_change(self.idl, self.timeout)
|
||||
self.poller = poller.Poller()
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.setDaemon(True)
|
||||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.idl.wait(self.poller)
|
||||
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
||||
self.poller.block()
|
||||
self.idl.run()
|
||||
txn = self.txns.get_nowait()
|
||||
if txn is not None:
|
||||
try:
|
||||
txn.results.put(txn.do_commit())
|
||||
except Exception as ex:
|
||||
txn.results.put(ex)
|
||||
self.txns.task_done()
|
||||
|
||||
def queue_txn(self, txn):
|
||||
self.txns.put(txn)
|
118
neutron/agent/ovsdb/native/idlutils.py
Normal file
118
neutron/agent/ovsdb/native/idlutils.py
Normal file
@ -0,0 +1,118 @@
|
||||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from ovs.db import idl
|
||||
from ovs import jsonrpc
|
||||
from ovs import poller
|
||||
from ovs import stream
|
||||
|
||||
|
||||
def get_schema_helper(connection):
|
||||
err, strm = stream.Stream.open_block(
|
||||
stream.Stream.open(connection))
|
||||
if err:
|
||||
raise Exception("Could not connect to %s" % (
|
||||
connection,))
|
||||
rpc = jsonrpc.Connection(strm)
|
||||
req = jsonrpc.Message.create_request('get_schema', ['Open_vSwitch'])
|
||||
err, resp = rpc.transact_block(req)
|
||||
rpc.close()
|
||||
if err:
|
||||
raise Exception("Could not retrieve schema from %s: %s" % (
|
||||
connection, os.strerror(err)))
|
||||
elif resp.error:
|
||||
raise Exception(resp.error)
|
||||
return idl.SchemaHelper(None, resp.result)
|
||||
|
||||
|
||||
def wait_for_change(_idl, timeout, seqno=None):
|
||||
if seqno is None:
|
||||
seqno = _idl.change_seqno
|
||||
stop = time.time() + timeout
|
||||
while _idl.change_seqno == seqno:
|
||||
_idl.run()
|
||||
ovs_poller = poller.Poller()
|
||||
_idl.wait(ovs_poller)
|
||||
ovs_poller.timer_wait(timeout * 1000)
|
||||
ovs_poller.block()
|
||||
if time.time() > stop:
|
||||
raise Exception("Timeout")
|
||||
|
||||
|
||||
def get_column_value(row, col):
|
||||
if col == '_uuid':
|
||||
val = row.uuid
|
||||
else:
|
||||
val = getattr(row, col)
|
||||
|
||||
# Idl returns lists of Rows where ovs-vsctl returns lists of UUIDs
|
||||
if isinstance(val, list) and len(val):
|
||||
if isinstance(val[0], idl.Row):
|
||||
val = [v.uuid for v in val]
|
||||
# ovs-vsctl treats lists of 1 as single results
|
||||
if len(val) == 1:
|
||||
val = val[0]
|
||||
return val
|
||||
|
||||
|
||||
def condition_match(row, condition):
|
||||
"""Return whether a condition matches a row
|
||||
|
||||
:param row An OVSDB Row
|
||||
:param condition A 3-tuple containing (column, operation, match)
|
||||
"""
|
||||
|
||||
col, op, match = condition
|
||||
val = get_column_value(row, col)
|
||||
matched = True
|
||||
|
||||
# TODO(twilson) Implement other operators and type comparisons
|
||||
# ovs_lib only uses dict '=' and '!=' searches for now
|
||||
if isinstance(match, dict):
|
||||
for key in match:
|
||||
if op == '=':
|
||||
if (key not in val or match[key] != val[key]):
|
||||
matched = False
|
||||
break
|
||||
elif op == '!=':
|
||||
if key not in val or match[key] == val[key]:
|
||||
matched = False
|
||||
break
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
elif isinstance(match, list):
|
||||
raise NotImplementedError()
|
||||
else:
|
||||
if op == '==' and val != match:
|
||||
matched = False
|
||||
elif op == '!=' and val == match:
|
||||
matched = False
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
return matched
|
||||
|
||||
|
||||
def row_match(row, conditions):
|
||||
"""Return whether the row matches the list of conditions"""
|
||||
return all(condition_match(row, cond) for cond in conditions)
|
||||
|
||||
|
||||
def get_index_column(table):
|
||||
if len(table.indexes) == 1:
|
||||
idx = table.indexes[0]
|
||||
if len(idx) == 1:
|
||||
return idx[0].name
|
@ -129,6 +129,7 @@ class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
|
||||
class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
|
||||
scenarios = [
|
||||
('vsctl', dict(ovsdb_interface='vsctl')),
|
||||
('native', dict(ovsdb_interface='native')),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
@ -54,6 +54,16 @@ class OVSBridgeTestCase(base.BaseOVSLinuxTestCase):
|
||||
self.br.delete_port(port_name)
|
||||
self.assertFalse(self.br.port_exists(port_name))
|
||||
|
||||
def test_duplicate_port_may_exist_false(self):
|
||||
port_name, ofport = self.create_ovs_port(('type', 'internal'))
|
||||
cmd = self.br.ovsdb.add_port(self.br.br_name,
|
||||
port_name, may_exist=False)
|
||||
self.assertRaises(RuntimeError, cmd.execute, check_error=True)
|
||||
|
||||
def test_delete_port_if_exists_false(self):
|
||||
cmd = self.br.ovsdb.del_port('nonexistantport', if_exists=False)
|
||||
self.assertRaises(RuntimeError, cmd.execute, check_error=True)
|
||||
|
||||
def test_replace_port(self):
|
||||
port_name = base.get_rand_port_name()
|
||||
self.br.replace_port(port_name, ('type', 'internal'))
|
||||
|
@ -146,22 +146,6 @@ class OVS_Lib_Test(base.BaseTestCase):
|
||||
def _build_timeout_opt(self, exp_timeout):
|
||||
return "--timeout=%d" % exp_timeout if exp_timeout else self.TO
|
||||
|
||||
def test_replace_port(self):
|
||||
pname = "tap5"
|
||||
self.br.replace_port(pname)
|
||||
self._verify_vsctl_mock("--if-exists", "del-port", pname,
|
||||
"--", "add-port", self.BR_NAME, pname)
|
||||
|
||||
def test_replace_port_with_attrs(self):
|
||||
pname = "tap5"
|
||||
self.br.replace_port(pname, ('type', 'internal'),
|
||||
('external_ids:iface-status', 'active'))
|
||||
self._verify_vsctl_mock("--if-exists", "del-port", pname,
|
||||
"--", "add-port", self.BR_NAME, pname,
|
||||
"--", "set", "Interface", pname,
|
||||
"type=internal",
|
||||
"external_ids:iface-status=active")
|
||||
|
||||
def _test_delete_port(self, exp_timeout=None):
|
||||
pname = "tap5"
|
||||
self.br.delete_port(pname)
|
||||
|
Loading…
Reference in New Issue
Block a user