Merge "Add test: Editing and deleting port forwarding TCP rule"

This commit is contained in:
Zuul 2020-07-01 15:47:22 +00:00 committed by Gerrit Code Review
commit 722055abdc
2 changed files with 68 additions and 3 deletions

View File

@ -446,11 +446,13 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
self.wait_for_server_status(
server, constants.SERVER_STATUS_ACTIVE, client)
def check_servers_hostnames(self, servers, log_errors=True):
def check_servers_hostnames(self, servers, timeout=None, log_errors=True):
"""Compare hostnames of given servers with their names."""
try:
for server in servers:
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
try:
kwargs['port'] = (
server['port_forwarding_tcp']['external_port'])

View File

@ -17,6 +17,7 @@ from neutron_lib import constants
from oslo_log import log
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
@ -45,9 +46,9 @@ class PortForwardingTestJSON(base.BaseTempestTestCase):
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.keypair = cls.create_keypair()
def _prepare_resources(self, num_servers, internal_tcp_port, protocol):
def _prepare_resources(self, num_servers, internal_tcp_port, protocol,
external_port_base=1025):
servers = []
external_port_base = 1025
for i in range(1, num_servers + 1):
internal_udp_port = internal_tcp_port + 10
external_tcp_port = external_port_base + i
@ -121,3 +122,65 @@ class PortForwardingTestJSON(base.BaseTempestTestCase):
self.check_servers_hostnames(servers)
# And now test UDP port forwarding using nc
self._test_udp_port_forwarding(servers)
@decorators.idempotent_id('aa19d46c-a4a6-11ea-bb37-0242ac130002')
def test_port_forwarding_editing_and_deleting_tcp_rule(self):
server = self._prepare_resources(
num_servers=1, internal_tcp_port=22,
protocol=constants.PROTO_NAME_TCP,
external_port_base=1035)
fip_id = server[0]['port_forwarding_tcp']['floatingip_id']
pf_id = server[0]['port_forwarding_tcp']['id']
# Check connectivity with the original parameters
self.check_servers_hostnames(server)
# Use a reasonable timeout to verify that connections will not
# happen. Default would be 196 seconds, which is an overkill.
test_ssh_connect_timeout = 6
# Update external port and check connectivity with original parameters
# Port under server[0]['port_forwarding_tcp']['external_port'] should
# not answer at this point.
def fip_pf_connectivity():
try:
self.check_servers_hostnames(
server, timeout=test_ssh_connect_timeout)
return True
except (AssertionError, lib_exc.SSHTimeout):
return False
def no_fip_pf_connectivity():
return not fip_pf_connectivity()
self.client.update_port_forwarding(fip_id, pf_id, external_port=3333)
utils.wait_until_true(
no_fip_pf_connectivity,
exception=RuntimeError(
"Connection to the server {!r} through "
"port {!r} is still possible.".format(
server[0]['id'],
server[0]['port_forwarding_tcp']['external_port'])))
# Check connectivity with the new parameters
server[0]['port_forwarding_tcp']['external_port'] = 3333
utils.wait_until_true(
fip_pf_connectivity,
exception=RuntimeError(
"Connection to the server {!r} through "
"port {!r} is not possible.".format(
server[0]['id'],
server[0]['port_forwarding_tcp']['external_port'])))
# Remove port forwarding and ensure connection stops working.
self.client.delete_port_forwarding(fip_id, pf_id)
self.assertRaises(lib_exc.NotFound, self.client.get_port_forwarding,
fip_id, pf_id)
utils.wait_until_true(
no_fip_pf_connectivity,
exception=RuntimeError(
"Connection to the server {!r} through "
"port {!r} is still possible.".format(
server[0]['id'],
server[0]['port_forwarding_tcp']['external_port'])))