Update Neutron port scenario test case
Introduce new ping helper methods Change-Id: Ibaae77b2b4bb075ceffbaa748429a6a114921b22
This commit is contained in:
parent
be17623c11
commit
fb456a4ca5
@ -15,6 +15,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from tobiko.shell.ping import _assert
|
||||||
from tobiko.shell.ping import _exception
|
from tobiko.shell.ping import _exception
|
||||||
from tobiko.shell.ping import _interface
|
from tobiko.shell.ping import _interface
|
||||||
from tobiko.shell.ping import _parameters
|
from tobiko.shell.ping import _parameters
|
||||||
@ -22,6 +23,10 @@ from tobiko.shell.ping import _ping
|
|||||||
from tobiko.shell.ping import _statistics
|
from tobiko.shell.ping import _statistics
|
||||||
|
|
||||||
|
|
||||||
|
assert_reachable_ips = _assert.assert_reachable_ips
|
||||||
|
get_reachable_ips = _assert.get_reachable_ips
|
||||||
|
get_unreachable_ips = _assert.get_unreachable_ips
|
||||||
|
|
||||||
PingException = _exception.PingException
|
PingException = _exception.PingException
|
||||||
PingError = _exception.PingError
|
PingError = _exception.PingError
|
||||||
LocalPingError = _exception.LocalPingError
|
LocalPingError = _exception.LocalPingError
|
||||||
|
38
tobiko/shell/ping/_assert.py
Normal file
38
tobiko/shell/ping/_assert.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
# Copyright (c) 2019 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import tobiko
|
||||||
|
from tobiko.shell.ping import _ping
|
||||||
|
|
||||||
|
|
||||||
|
def assert_reachable_ips(target_ips, **params):
|
||||||
|
unreachable_ips = get_unreachable_ips(target_ips, **params)
|
||||||
|
if unreachable_ips:
|
||||||
|
tobiko.fail("Unable to reach IP address(es): {!r}", unreachable_ips)
|
||||||
|
|
||||||
|
|
||||||
|
def get_reachable_ips(target_ips, **params):
|
||||||
|
return tobiko.select(address
|
||||||
|
for address in target_ips
|
||||||
|
if _ping.ping(address, **params).received)
|
||||||
|
|
||||||
|
|
||||||
|
def get_unreachable_ips(target_ips, **params):
|
||||||
|
reachable_ips = get_reachable_ips(target_ips, **params)
|
||||||
|
return tobiko.select(address
|
||||||
|
for address in target_ips
|
||||||
|
if address not in reachable_ips)
|
@ -31,58 +31,53 @@ class PortTest(testtools.TestCase):
|
|||||||
stack = tobiko.required_setup_fixture(stacks.CirrosServerStackFixture)
|
stack = tobiko.required_setup_fixture(stacks.CirrosServerStackFixture)
|
||||||
|
|
||||||
def test_port_ips(self):
|
def test_port_ips(self):
|
||||||
port = self.stack.port_details
|
server_ips = ip.list_ip_addresses(scope='global',
|
||||||
server_addresses = ip.list_ip_addresses(
|
ssh_client=self.stack.ssh_client)
|
||||||
ssh_client=self.stack.ssh_client)
|
port_ips = neutron.list_port_ip_addresses(port=self.stack.port_details)
|
||||||
for address in neutron.list_port_ip_addresses(port=port):
|
self.assertFalse(set(port_ips) - set(server_ips))
|
||||||
self.assertIn(address, server_addresses)
|
|
||||||
|
|
||||||
def test_port_network(self):
|
def test_port_network(self):
|
||||||
port = self.stack.port_details
|
|
||||||
self.assertEqual(self.stack.network_stack.network_id,
|
self.assertEqual(self.stack.network_stack.network_id,
|
||||||
port['network_id'])
|
self.stack.port_details['network_id'])
|
||||||
|
|
||||||
def test_port_subnets(self):
|
def test_port_subnets(self):
|
||||||
port_subnets = {fixed_ip['subnet_id']
|
port_subnets = [fixed_ip['subnet_id']
|
||||||
for fixed_ip in self.stack.port_details['fixed_ips']}
|
for fixed_ip in self.stack.port_details['fixed_ips']]
|
||||||
subnets = set(self.stack.network_stack.network_details['subnets'])
|
network_subnets = self.stack.network_stack.network_details['subnets']
|
||||||
self.assertEqual(port_subnets, subnets)
|
self.assertEqual(set(network_subnets), set(port_subnets))
|
||||||
|
|
||||||
def test_ping_subnet_gateways(self):
|
def test_ping_subnet_gateways(self):
|
||||||
subnet_ids = self.stack.network_stack.network_details['subnets']
|
network_id = self.stack.network_stack.network_id
|
||||||
subnet_gateway_ips = [
|
subnets = neutron.list_subnets(network_id=network_id)
|
||||||
netaddr.IPAddress(neutron.get_subnet(subnet_id)['gateway_ip'])
|
gateway_ips = [netaddr.IPAddress(subnet['gateway_ip'])
|
||||||
for subnet_id in subnet_ids]
|
for subnet in subnets]
|
||||||
reachable_gateway_ips = [
|
ping.assert_reachable_ips(gateway_ips,
|
||||||
gateway_ip
|
ssh_client=self.stack.ssh_client)
|
||||||
for gateway_ip in subnet_gateway_ips
|
|
||||||
if ping.ping(gateway_ip,
|
|
||||||
ssh_client=self.stack.ssh_client).received]
|
|
||||||
self.assertEqual(subnet_gateway_ips, reachable_gateway_ips)
|
|
||||||
|
|
||||||
def test_ping_port(self, network_id=None, device_id=None):
|
def test_ping_port(self, network_id=None, device_id=None):
|
||||||
network_id = network_id or self.stack.network_stack.network_id
|
network_id = network_id or self.stack.network_stack.network_id
|
||||||
device_id = device_id or self.stack.server_id
|
device_id = device_id or self.stack.server_id
|
||||||
ports = neutron.list_ports(network_id=network_id,
|
ports = neutron.list_ports(network_id=network_id,
|
||||||
device_id=device_id)
|
device_id=device_id)
|
||||||
|
port_ips = set()
|
||||||
for port in ports:
|
for port in ports:
|
||||||
self.assertEqual(network_id, port['network_id'])
|
self.assertEqual(network_id, port['network_id'])
|
||||||
self.assertEqual(device_id, port['device_id'])
|
self.assertEqual(device_id, port['device_id'])
|
||||||
for address in neutron.list_port_ip_addresses(port=port):
|
port_ips.update(neutron.list_port_ip_addresses(port=port))
|
||||||
ping.ping(host=address,
|
ping.assert_reachable_ips(port_ips,
|
||||||
ssh_client=self.stack.ssh_client).assert_replied()
|
ssh_client=self.stack.ssh_client)
|
||||||
|
|
||||||
def test_ping_inner_gateway_ip(self):
|
def test_ping_inner_gateway_ip(self):
|
||||||
if not self.stack.network_stack.has_gateway:
|
if not self.stack.network_stack.has_gateway:
|
||||||
self.skip('Server network has no gateway router')
|
self.skip('Server network has no gateway router')
|
||||||
self.test_ping_port(device_id=self.stack.network_stack.gateway_id)
|
self.test_ping_port(device_id=self.stack.network_stack.gateway_id)
|
||||||
|
|
||||||
def test_ping_outer_gateway_ip(self):
|
def test_ping_outer_gateway_ips(self):
|
||||||
if not self.stack.network_stack.has_gateway:
|
if not self.stack.network_stack.has_gateway:
|
||||||
self.skip('Server network has no gateway router')
|
self.skip('Server network has no gateway router')
|
||||||
self.test_ping_port(
|
ping.assert_reachable_ips(
|
||||||
device_id=self.stack.network_stack.gateway_id,
|
self.stack.network_stack.external_gateway_ips,
|
||||||
network_id=self.stack.network_stack.gateway_network_id)
|
ssh_client=self.stack.ssh_client)
|
||||||
|
|
||||||
|
|
||||||
# --- Test la-h3 extension ----------------------------------------------------
|
# --- Test la-h3 extension ----------------------------------------------------
|
||||||
|
Loading…
x
Reference in New Issue
Block a user