192 lines
8.0 KiB
Python
192 lines
8.0 KiB
Python
# Copyright 2019 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import re
|
|
|
|
import netaddr
|
|
from neutron_tempest_plugin.scenario import base
|
|
from neutron_tempest_plugin.common import utils as common_utils
|
|
from oslo_log import log
|
|
from tempest import config
|
|
|
|
from whitebox_neutron_tempest_plugin.common import utils as local_utils
|
|
|
|
CONF = config.CONF
|
|
LOG = log.getLogger(__name__)
|
|
WB_CONF = config.CONF.whitebox_neutron_plugin_options
|
|
|
|
|
|
class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
|
|
credentials = ['primary', 'admin']
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(BaseTempestWhiteboxTestCase, cls).resource_setup()
|
|
uri = CONF.identity.uri
|
|
cls.is_ipv6 = True if netaddr.valid_ipv6(
|
|
uri[uri.find("[") + 1:uri.find("]")]) else False
|
|
cls.image_ref = CONF.compute.image_ref
|
|
cls.flavor_ref = CONF.compute.flavor_ref
|
|
cls.username = CONF.validation.image_ssh_user
|
|
|
|
@classmethod
|
|
def run_on_master_controller(cls, cmd):
|
|
if WB_CONF.openstack_type == 'devstack':
|
|
output, errors = local_utils.run_local_cmd(cmd)
|
|
LOG.debug("Stderr: {}".format(errors.decode()))
|
|
output = output.decode()
|
|
LOG.debug("Output: {}".format(output))
|
|
return output.strip()
|
|
|
|
def get_host_for_server(self, server_id):
|
|
server_details = self.os_admin.servers_client.show_server(server_id)
|
|
return server_details['server']['OS-EXT-SRV-ATTR:host']
|
|
|
|
|
|
class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase):
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(BaseTempestTestCaseOvn, cls).resource_setup()
|
|
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
|
|
ovn_agents = [agent for agent in agents if 'ovn' in agent['binary']]
|
|
if not ovn_agents:
|
|
raise cls.skipException(
|
|
"OVN agents not found. This test is supported only on "
|
|
"openstack environments with OVN support.")
|
|
|
|
cls.nbctl, cls.sbctl = cls._get_ovn_dbs()
|
|
cls.nbmonitorcmd, cls.sbmonitorcmd = cls._get_ovn_db_monitor_cmds()
|
|
|
|
@classmethod
|
|
def _get_ovn_db_monitor_cmds(cls):
|
|
regex = r'--db=(.*)$'
|
|
# this regex search will return the connection string (tcp:IP:port or
|
|
# ssl:IP:port) and in case of TLS, will also include the TLS options
|
|
nb_monitor_connection_opts = re.search(regex, cls.nbctl).group(1)
|
|
sb_monitor_connection_opts = re.search(regex, cls.sbctl).group(1)
|
|
monitorcmdprefix = 'sudo timeout 300 ovsdb-client monitor -f json '
|
|
return (monitorcmdprefix + nb_monitor_connection_opts,
|
|
monitorcmdprefix + sb_monitor_connection_opts)
|
|
|
|
@classmethod
|
|
def _get_ovn_dbs(cls):
|
|
ssl_params = ''
|
|
if WB_CONF.openstack_type == 'tripleo':
|
|
cmd = ("sudo ovs-vsctl get open . external_ids:ovn-remote | "
|
|
"sed -e 's/\"//g'")
|
|
sbdb = cls.run_on_master_controller(cmd)
|
|
if 'ssl' in sbdb:
|
|
ssl_params = '-p {} -c {} -C {} '.format(
|
|
WB_CONF.pki_private_key,
|
|
WB_CONF.pki_certificate,
|
|
WB_CONF.pki_ca_cert)
|
|
nbdb = sbdb.replace('6642', '6641')
|
|
cmd = 'ovn-{}ctl --db={} %s' % (ssl_params)
|
|
cmd = 'sudo %s exec ovn_controller %s' % (cls.container_app, cmd)
|
|
if WB_CONF.openstack_type == 'devstack':
|
|
sbdb = "unix:/usr/local/var/run/ovn/ovnsb_db.sock"
|
|
nbdb = sbdb.replace('sb', 'nb')
|
|
cmd = ("sudo ovn-{}ctl --db={}")
|
|
return [cmd.format('nb', nbdb), cmd.format('sb', sbdb)]
|
|
|
|
def get_router_gateway_chassis(self, router_port_id):
|
|
cmd = "{} get port_binding cr-lrp-{} chassis".format(
|
|
self.sbctl, router_port_id)
|
|
LOG.debug("Waiting until port is bound to chassis")
|
|
self.chassis_id = None
|
|
|
|
def _port_binding_exist():
|
|
self.chassis_id = self.run_on_master_controller(cmd)
|
|
LOG.debug("chassis_id = '{}'".format(self.chassis_id))
|
|
if self.chassis_id != '[]':
|
|
return True
|
|
return False
|
|
|
|
try:
|
|
common_utils.wait_until_true(lambda: _port_binding_exist(),
|
|
timeout=30, sleep=5)
|
|
except common_utils.WaitTimeout:
|
|
self.fail("Port is not bound to chassis")
|
|
cmd = "{} get chassis {} hostname".format(self.sbctl, self.chassis_id)
|
|
LOG.debug("Running '{}' on the master node".format(cmd))
|
|
res = self.run_on_master_controller(cmd)
|
|
return res.replace('"', '').split('.')[0]
|
|
|
|
def get_router_gateway_chassis_list(self, router_port_id):
|
|
cmd = (self.nbctl + " lrp-get-gateway-chassis lrp-" + router_port_id)
|
|
data = self.run_on_master_controller(cmd)
|
|
return [re.sub(r'.*_(.*?)\s.*', r'\1', s) for s in data.splitlines()]
|
|
|
|
def get_router_gateway_chassis_by_id(self, chassis_id):
|
|
res = self.run_on_master_controller(
|
|
self.sbctl + " get chassis " + chassis_id + " hostname").rstrip()
|
|
return res.replace('"', '').split('.')[0]
|
|
|
|
def get_router_port_gateway_mtu(self, router_port_id):
|
|
cmd = (self.nbctl + " get logical_router_port lrp-" + router_port_id +
|
|
" options:gateway_mtu")
|
|
return int(
|
|
self.run_on_master_controller(cmd).rstrip().strip('"'))
|
|
|
|
def get_item_uuid(self, db, item, search_string):
|
|
ovn_db = self.sbctl if db == 'sb' else self.nbctl
|
|
cmd = (ovn_db + " find " + item + " " + search_string +
|
|
" | grep _uuid | awk '{print $3}'")
|
|
return self.run_on_master_controller(cmd)
|
|
|
|
def get_datapath_tunnel_key(self, search_string):
|
|
cmd = (self.sbctl + " find datapath_binding " + search_string +
|
|
" | grep tunnel_key | awk '{print $3}'")
|
|
return self.run_on_master_controller(cmd)
|
|
|
|
def get_logical_switch(self, port):
|
|
"""Returns logical switch name that port is connected to
|
|
|
|
Fuction gets the logical switch name without its ID from the
|
|
`ovn-nbctl lsp-get-ls <PORT_NAME>` command
|
|
"""
|
|
cmd = '{cmd} lsp-get-ls {port}'.format(cmd=self.nbctl, port=port)
|
|
output = self.run_on_master_controller(cmd)
|
|
ls_name = re.search('neutron-[^)]*', output)
|
|
if ls_name:
|
|
return ls_name.group()
|
|
else:
|
|
return ''
|
|
|
|
def get_physical_net(self, port):
|
|
"""Returns physical network name that port has configured with
|
|
|
|
Physical network name is saved as option in the logical switch port
|
|
record in OVN north database. It can be queried with
|
|
`ovn-nbctl lsp-get-options <PORT_NAME>` command but this output may
|
|
contain more than one option so it is better to get the value with
|
|
`ovn-nbctl get Logical_Switch_Port <PORT_NAME> options:network_name`
|
|
command
|
|
"""
|
|
cmd = '{cmd} get Logical_Switch_Port {port} '\
|
|
'options:network_name'.format(cmd=self.nbctl, port=port)
|
|
return self.run_on_master_controller(cmd)
|
|
|
|
def verify_that_segment_deleted(self, segment_id):
|
|
"""Checks that the segment id is not in the OVN database
|
|
|
|
There shouldn't be 'provnet-<SEGEMTN_ID>' port in the OVN database
|
|
after the segment has been deleted
|
|
"""
|
|
cmd = '{cmd} find Logical_Switch_Port '\
|
|
'name=provnet-{sid}'.format(cmd=self.nbctl, sid=segment_id)
|
|
output = self.run_on_master_controller(cmd)
|
|
self.assertEqual(output, '')
|