Replace tobiko.common.utils.network with tobiko.shell.ping

Change-Id: I55504fd6387593297f671b76753410d6414ef6b1
This commit is contained in:
Federico Ressi 2019-04-08 12:54:44 +02:00
parent f26cb5c276
commit 081ff5240c
4 changed files with 71 additions and 179 deletions

View File

@ -2,6 +2,5 @@
ansible>=2.4.0 # GPLv3
os-faults>=0.1.18 # Apache-2.0
tempest>=17.1.0 # Apache-2.0
cryptography<=2.2.2 # Apache-2.0
Jinja2>=2.8.0 # BSD

View File

@ -1,141 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import glob
import inspect
import os
import re
import signal
import subprocess
from oslo_log import log
from tempest.common.utils import net_utils
from tempest.lib.common.utils import test_utils
import tobiko
from tobiko import config
LOG = log.getLogger(__name__)
SHELL = (config.get_any_option('tobiko.shell.command') or '/bin/sh -c').split()
SG_RULES = {'ALLOW_ICMP':
{'direction': 'ingress',
'protocol': 'icmp'
}
}
def run_background_ping(ip):
"""Starts background ping process."""
# The caller function name
caller_f = inspect.stack()[1][1].split('/')[-1].split(".py")[0]
PING_OUTPUT_F = "/tmp/ping_%s_%s_output" % (ip, caller_f)
PING_PID_F = "/tmp/ping_%s_%s_pid" % (ip, caller_f)
PING_CMD = 'ping -q %s' % ip
# Kill any existing running ping process to the same IP address
if os.path.exists(PING_OUTPUT_F):
with os.open(PING_OUTPUT_F) as pid_file:
os.kill(int(pid_file.read()), signal.SIGINT)
for f in glob.glob("/tmp/ping_%s_%s*" % (ip, caller_f)):
os.remove(f)
for f in glob.glob("/tmp/ping_%s_%s.*"):
os.remove(f)
ping_log = open(PING_OUTPUT_F, 'ab')
p = subprocess.Popen([PING_CMD], stdout=ping_log, shell=True)
with open(PING_PID_F, 'ab') as pf:
pf.write(str(p.pid))
def get_packet_loss(ip):
"""Returns packet loss."""
# The caller function name
caller_f = inspect.stack()[1][1].split('/')[-1].split(".py")[0]
PING_OUTPUT_F = "/tmp/ping_%s_%s_output" % (ip, caller_f)
PING_PID_F = "/tmp/ping_%s_%s_pid" % (ip, caller_f)
try:
# Kill Process
with open(PING_PID_F) as f:
pid = f.read()
os.kill(int(pid), signal.SIGINT)
# Packet loss pattern
p = re.compile("(\\d{1,3})% packet loss")
# Get ping package loss
with open(PING_OUTPUT_F) as f:
m = p.search(f.read())
packet_loss = m.group(1)
finally:
# Remove files created by pre test
from shutil import copyfile
copyfile(PING_OUTPUT_F, "/home/abregman/stam")
os.remove(PING_OUTPUT_F)
os.remove(PING_PID_F)
return packet_loss
def ping_ip_address(ip_address, should_succeed=True,
ping_timeout=None, mtu=None, fragmentation=True):
if not ip_address:
raise ValueError("Invalid IP address: {!r}".format(ip_address))
timeout = ping_timeout or 60.
cmd = ['ping', '-c1', '-w10']
if mtu:
if not fragmentation:
cmd += ['-M', 'do']
cmd += ['-s', str(net_utils.get_ping_payload_size(mtu, 4))]
cmd.append(ip_address)
cmd_line = SHELL + [str(subprocess.list2cmdline(cmd))]
def ping():
LOG.debug('Execute ping command: %r', cmd_line)
proc = subprocess.Popen(cmd_line,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
if proc.returncode == 0:
LOG.debug("Ping command succeeded:\n"
"stdout:\n%s\n"
"stderr:\n%s\n", stdout, stderr)
return should_succeed
else:
LOG.debug("Ping command failed (exit_status=%d):\n"
"stdout:\n%s\n"
"stderr:\n%s\n", proc.returncode, stdout, stderr)
return not should_succeed
return test_utils.call_until_true(ping, timeout, 1)
def assert_ping(ip, should_fail=False, fragmentation=True,
packet_size=None):
success = ping_ip_address(ip, mtu=packet_size,
fragmentation=fragmentation)
if success:
if should_fail:
tobiko.fail("Host {!r} is reachable", ip)
elif not should_fail:
tobiko.fail("Host {!r} is not reachable", ip)

View File

@ -37,7 +37,7 @@ RECEIVED = 'received'
UNRECEIVED = 'unreceived'
def ping(host, until=TRANSMITTED, **ping_params):
def ping(host, until=TRANSMITTED, check=True, **ping_params):
"""Send ICMP messages to host address until timeout
:param host: destination host address
@ -45,7 +45,7 @@ def ping(host, until=TRANSMITTED, **ping_params):
function
:returns: PingStatistics
"""
return get_statistics(host=host, until=until, **ping_params)
return get_statistics(host=host, until=until, check=check, **ping_params)
def ping_until_delivered(host, **ping_params):
@ -122,21 +122,21 @@ def ping_until_unreceived(host, **ping_params):
return ping(host=host, until=UNRECEIVED, **ping_params)
def get_statistics(parameters=None, ssh_client=None, until=None,
def get_statistics(parameters=None, ssh_client=None, until=None, check=True,
**ping_params):
parameters = _parameters.get_ping_parameters(default=parameters,
**ping_params)
statistics = _statistics.PingStatistics()
for partial_statistics in iter_statistics(parameters=parameters,
ssh_client=ssh_client,
until=until):
until=until, check=check):
statistics += partial_statistics
LOG.debug("%r", statistics)
return statistics
def iter_statistics(parameters=None, ssh_client=None, until=None,
def iter_statistics(parameters=None, ssh_client=None, until=None, check=True,
**ping_params):
parameters = _parameters.get_ping_parameters(default=parameters,
**ping_params)
@ -177,7 +177,8 @@ def iter_statistics(parameters=None, ssh_client=None, until=None,
try:
result = execute_ping(parameters=execute_parameters,
ssh_client=ssh_client,
timeout=end_of_time - now)
timeout=end_of_time - now,
check=check)
except sh.ShellTimeoutExpired:
pass
@ -208,7 +209,7 @@ def iter_statistics(parameters=None, ssh_client=None, until=None,
message_type=until)
def execute_ping(parameters, ssh_client=None, **params):
def execute_ping(parameters, ssh_client=None, check=True, **params):
if not ssh_client:
is_cirros_image = params.setdefault('is_cirros_image', False)
if is_cirros_image:
@ -218,7 +219,7 @@ def execute_ping(parameters, ssh_client=None, **params):
command = get_ping_command(parameters)
result = sh.execute(command=command, ssh_client=ssh_client,
timeout=parameters.timeout, check=False)
if result.exit_status:
if check and result.exit_status:
handle_ping_command_error(error=result.stderr)
return result

View File

@ -16,9 +16,9 @@ from __future__ import absolute_import
import tobiko
from tobiko import config
from tobiko.common.utils import network
from tobiko.openstack import heat
from tobiko.tests.scenario.neutron import base
from tobiko.shell import ping
CONF = config.CONF
@ -26,9 +26,12 @@ CONF = config.CONF
class FloatingIPFixture(heat.HeatStackFixture):
template = base.heat_template_file('floating_ip.yaml')
# template parameters
floating_network = CONF.tobiko.neutron.floating_network
image = CONF.tobiko.nova.image
flavor = CONF.tobiko.nova.flavor
internal_network_fixture = base.InternalNetworkFixture
internal_network = None
@ -42,12 +45,66 @@ class FloatingIPFixture(heat.HeatStackFixture):
self.internal_network_fixture).wait_for_outputs()
class FloatingIPTest(base.NeutronTest):
"""Tests server connectivity"""
floating_ip_fixture = FloatingIPFixture
@classmethod
def setUpClass(cls):
super(FloatingIPTest, cls).setUpClass()
stack = tobiko.setup_fixture(cls.floating_ip_fixture)
outputs = stack.wait_for_outputs()
cls.floating_ip_address = outputs.floating_ip_address
cls.mtu = stack.internal_network.mtu
def test_ping(self):
ping.ping_until_received(self.floating_ip_address).assert_replied()
def test_ping_with_mtu_packet(self):
ping.ping_until_received(self.floating_ip_address,
packet_size=self.mtu,
fragmentation=False).assert_replied()
def test_ping_with_oversized_packet(self):
# Wait for VM to get ready
ping.ping_until_received(self.floating_ip_address)
# Send 5 over-sized packets
ping.ping(self.floating_ip_address, packet_size=self.mtu + 1,
fragmentation=False, count=5,
check=False).assert_not_replied()
class FloatingIPWithPortSecurityFixture(FloatingIPFixture):
port_security_enabled = True
class FloatingIPWithSecurityGroupFixture(FloatingIPFixture):
port_security_enabled = True
class FloatingIPWithPortSecurityTest(base.NeutronTest):
floating_ip_fixture = FloatingIPFixture
floating_ip_with_securtity_fixture = FloatingIPWithPortSecurityFixture
@classmethod
def setUpClass(cls):
super(FloatingIPWithPortSecurityTest, cls).setUpClass()
# Setup VM with port security
stack = tobiko.setup_fixture(cls.floating_ip_with_securtity_fixture)
outputs = stack.wait_for_outputs()
cls.floating_ip_address_with_security = outputs.floating_ip_address
# Setup VM without port security
stack = tobiko.setup_fixture(cls.floating_ip_fixture)
outputs = stack.wait_for_outputs()
cls.floating_ip_address = outputs.floating_ip_address
def test_ping(self):
ping.ping_until_received(self.floating_ip_address).assert_replied()
ping.ping(self.floating_ip_address_with_security,
count=5).assert_not_replied()
class FloatingIPWithSecurityGroupFixture(FloatingIPWithPortSecurityFixture):
security_groups_fixture = base.SecurityGroupsFixture
security_groups = None
@ -62,29 +119,5 @@ class FloatingIPWithSecurityGroupFixture(FloatingIPFixture):
self.security_groups_fixture).wait_for_outputs()
class FloatingIPTest(base.NeutronTest):
"""Tests server connectivity"""
def test_ping_floating_ip(self, fixture_type=FloatingIPFixture):
"""Validates connectivity to a server post upgrade."""
stack = self.setup_fixture(fixture_type)
network.assert_ping(stack.outputs.floating_ip_address)
def test_ping_floating_ip_with_port_security(
self, fixture_type=FloatingIPWithPortSecurityFixture):
"""Validates connectivity to a server post upgrade."""
stack = self.setup_fixture(fixture_type)
network.assert_ping(stack.outputs.floating_ip_address,
should_fail=True)
def test_ping_floating_ip_with_security_group(
self, fixture_type=FloatingIPWithSecurityGroupFixture):
"""Validates connectivity to a server post upgrade."""
stack = self.setup_fixture(fixture_type)
network.assert_ping(stack.outputs.floating_ip_address)
def test_ping_with_oversize_packet(self, fixture_type=FloatingIPFixture):
stack = self.setup_fixture(fixture_type)
network.assert_ping(stack.outputs.floating_ip_address,
packet_size=stack.internal_network.mtu + 1,
fragmentation=False, should_fail=True)
class FloatingIPWithSecurityGroupTest(FloatingIPTest):
floating_ip_fixture = FloatingIPWithSecurityGroupFixture