Use StatefulConnection class to verify connectivity in SG tests

There are two security group tests that are running for more than
20 minutes with RHEL image:
 - test_multiple_ports_portrange_remote
 - test_overlapping_sec_grp_rules
It happens because of the lack of privileges. I think it makes
sense to utilize the existing StatefulConnection class as it
has better service handling (start/stop) with all necessary
permissions

Change-Id: Iaca6fd3e6ed3c64ab3ca22817ad461479ecfa189
This commit is contained in:
Alex Katz 2022-08-15 23:50:24 +03:00
parent f6d2d887be
commit 5684d5e96d
2 changed files with 27 additions and 21 deletions

View File

@ -27,6 +27,7 @@ except ImportError:
import eventlet import eventlet
from oslo_log import log
from tempest.lib import exceptions from tempest.lib import exceptions
from neutron_tempest_plugin import config from neutron_tempest_plugin import config
@ -37,6 +38,7 @@ SCHEMA_PORT_MAPPING = {
"https": 443, "https": 443,
} }
CONF = config.CONF CONF = config.CONF
LOG = log.getLogger(__name__)
class classproperty(object): class classproperty(object):
@ -202,20 +204,30 @@ class StatefulConnection:
timeout=self.test_timeout, timeout=self.test_timeout,
sleep=self.test_sleep) sleep=self.test_sleep)
try: try:
LOG.info("Checking connectivity between server and client -"
" attempt {}".format(self.test_attempt))
self.server_ssh.exec_command( self.server_ssh.exec_command(
'grep {} output.txt'.format(self.test_str)) 'grep {} output.txt'.format(self.test_str))
self.client_ssh.exec_command( self.client_ssh.exec_command(
'grep {} output.txt'.format(self.test_str)) 'grep {} output.txt'.format(self.test_str))
if not self.should_pass: if not self.should_pass:
LOG.warning("attempt {} succeed while it should fail".format(
self.test_attempt))
return False return False
else: else:
if not self.connection_started: if not self.connection_started:
self.connection_started = True self.connection_started = True
LOG.info("attempt {} succeed as it expected".format(
self.test_attempt))
return True return True
except exceptions.SSHExecCommandFailed: except exceptions.SSHExecCommandFailed:
if self.should_pass: if self.should_pass:
LOG.warning("attempt {} failed while it should pass".format(
self.test_attempt))
return False return False
else: else:
LOG.info("attempt {} failed as it expected".format(
self.test_attempt))
return True return True
finally: finally:
self.test_attempt += 1 self.test_attempt += 1

View File

@ -520,12 +520,9 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# verify that conections are not working # verify that conections are not working
for port in range(80, 84): for port in range(80, 84):
self._verify_http_connection( with utils.StatefulConnection(
ssh_clients[0], ssh_clients[0], ssh_clients[2], test_ip, port) as con:
ssh_clients[2], con.test_connection(should_pass=False)
test_ip, port,
servers,
should_pass=False)
# add two remote-group rules with port-ranges # add two remote-group rules with port-ranges
rule_list = [{'protocol': constants.PROTO_NUM_TCP, rule_list = [{'protocol': constants.PROTO_NUM_TCP,
@ -543,11 +540,9 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# verify that conections are working # verify that conections are working
for port in range(80, 84): for port in range(80, 84):
self._verify_http_connection( with utils.StatefulConnection(
ssh_clients[0], ssh_clients[0], ssh_clients[2], test_ip, port) as con:
ssh_clients[2], con.test_connection()
test_ip, port,
servers)
# list the tcp rule id by SG id and port-range # list the tcp rule id by SG id and port-range
sg_rule_id = self.os_primary.network_client.list_security_group_rules( sg_rule_id = self.os_primary.network_client.list_security_group_rules(
@ -559,12 +554,9 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# verify that conections are not working # verify that conections are not working
for port in range(80, 82): for port in range(80, 82):
self._verify_http_connection( with utils.StatefulConnection(
ssh_clients[0], ssh_clients[0], ssh_clients[2], test_ip, port) as con:
ssh_clients[2], con.test_connection(should_pass=False)
test_ip, port,
servers,
should_pass=False)
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490') @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490')
def test_intra_sg_isolation(self): def test_intra_sg_isolation(self):
@ -675,11 +667,13 @@ class NetworkSecGroupTest(base.BaseTempestTestCase):
# status can change the datapath. Let's check the rules in two # status can change the datapath. Let's check the rules in two
# attempts # attempts
for _ in range(2): for _ in range(2):
self._verify_http_connection(client_ssh[0], srv_ssh, srv_ip, with utils.StatefulConnection(
tcp_port, []) client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
con.test_connection()
for port in range(tcp_port, tcp_port + 3): for port in range(tcp_port, tcp_port + 3):
self._verify_http_connection(client_ssh[1], srv_ssh, srv_ip, with utils.StatefulConnection(
port, []) client_ssh[1], srv_ssh, srv_ip, port) as con:
con.test_connection()
@decorators.idempotent_id('96dcd5ff-9d45-4e0d-bea0-0b438cbd388f') @decorators.idempotent_id('96dcd5ff-9d45-4e0d-bea0-0b438cbd388f')
def test_remove_sec_grp_from_active_vm(self): def test_remove_sec_grp_from_active_vm(self):