Add wrapper for IP command

Change-Id: I5bbd4f1fa0c4f495228efbf5796d8a03959f19f2
This commit is contained in:
Federico Ressi 2019-10-14 12:54:39 +02:00
parent 9cb3538d4c
commit 9fc9eb4112
11 changed files with 225 additions and 42 deletions

View File

@ -23,6 +23,7 @@ import six
from six.moves.urllib import parse
import tobiko
from tobiko.shell import ip
from tobiko.shell import ping
from tobiko.shell import sh
from tobiko.shell import ssh
@ -185,7 +186,7 @@ class OpenStackTopology(tobiko.SharedFixture):
public_ip = self._public_ip(ips, ssh_client=ssh_client)
if public_ip is None:
LOG.debug("Unable to SSH connect to any node IP address: %s"
','.join(str(ip) for ip in ips))
','.join(str(ip_address) for ip_address in ips))
return None
# I need to get a name for the new node
@ -214,9 +215,9 @@ class OpenStackTopology(tobiko.SharedFixture):
pass
if address:
details['address'] = address
for ip in self._ips(address):
for ip_address in self._ips(address):
try:
return self._nodes_by_ips[ip]
return self._nodes_by_ips[ip_address]
except KeyError:
pass
raise _exception.NoSuchOpenStackTopologyNode(details=details)
@ -251,12 +252,12 @@ class OpenStackTopology(tobiko.SharedFixture):
def groups(self):
return list(self._nodes_by_group)
def _ssh_client(self, ip, username=None, port=None, key_filename=None,
**ssh_parameters):
def _ssh_client(self, address, username=None, port=None,
key_filename=None, **ssh_parameters):
username = username or self.config.conf.username
port = port or self.config.conf.port
key_filename = key_filename or self.config.conf.key_file
return ssh.ssh_client(host=str(ip),
return ssh.ssh_client(host=str(address),
username=username,
key_filename=key_filename,
**ssh_parameters)
@ -300,33 +301,33 @@ class OpenStackTopology(tobiko.SharedFixture):
else:
# Exclude unreachable addresses
untested_ips = list()
for ip in ips:
if ip not in self._unreachable_ips:
if ip in self._reachable_ips:
for address in ips:
if address not in self._unreachable_ips:
if address in self._reachable_ips:
# Will take result from the first one of marked already
# marked as reachable
reachable = reachable or ip
reachable = reachable or address
else:
# Will later search for results between the other IPs
untested_ips.append(ip)
untested_ips.append(address)
for ip in untested_ips:
for address in untested_ips:
if reachable is None:
try:
received = ping.ping(ip, count=1, timeout=5.,
received = ping.ping(address, count=1, timeout=5.,
ssh_client=proxy_client,
**kwargs).received
except ping.PingFailed:
pass
else:
if received:
reachable = ip
reachable = address
# Mark IP as reachable
self._reachable_ips.add(ip)
self._reachable_ips.add(address)
continue
# Mark IP as unreachable
self._unreachable_ips.add(ip)
self._unreachable_ips.add(address)
return reachable
@ -336,7 +337,7 @@ class OpenStackTopology(tobiko.SharedFixture):
return ip_version and int(ip_version) or None
def _ips_from_host(self, **kwargs):
return sh.list_ip_addresses(ip_version=self.ip_version, **kwargs)
return ip.list_ip_addresses(ip_version=self.ip_version, **kwargs)
def _ips(self, obj):
if isinstance(obj, tobiko.Selection):

View File

@ -18,7 +18,7 @@ from __future__ import absolute_import
import netaddr
import tobiko
from tobiko.shell.sh import _execute
from tobiko.shell import sh
class IfconfigError(tobiko.TobikoException):
@ -61,8 +61,8 @@ def list_ip_addresses(ip_version=None, **execute_params):
def execute_ifconfig(*ifconfig_args, **execute_params):
command = ('/sbin/ifconfig',) + ifconfig_args
result = _execute.execute(command, stdin=False, stdout=True, stderr=True,
expect_exit_status=None, **execute_params)
result = sh.execute(command, stdin=False, stdout=True, stderr=True,
expect_exit_status=None, **execute_params)
if result.exit_status or not result.stdout:
raise IfconfigError(error=result.stderr)
return result.stdout

79
tobiko/shell/ip.py Normal file
View File

@ -0,0 +1,79 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import netaddr
import tobiko
from tobiko.shell import sh
class IpError(tobiko.TobikoException):
message = ("Unable to get IP addresses from host "
"(exit_status={exit_status!r}): {error!s}")
INETS = {
4: ['inet'],
6: ['inet6'],
None: ['inet', 'inet6']
}
def list_ip_addresses(ip_version=None, **execute_params):
inets = INETS.get(ip_version)
if inets is None:
error = "invalid IP version: {!r}".format(ip_version)
raise IpError(error=error)
output = execute_ip(['-o', 'address', 'list'], **execute_params)
ips = tobiko.Selection()
if output:
for line in output.splitlines():
fields = line.strip().split()
inet = fields[2]
if inet in inets:
address = fields[3]
if '/' in address:
address, _ = address.split('/', 1)
ips.append(netaddr.IPAddress(address))
return ips
def list_network_namespaces(**execute_params):
output = execute_ip(['-o', 'netns', 'list'], **execute_params)
namespaces = list()
if output:
for line in output.splitlines():
fields = line.strip().split()
namespace = fields[0]
namespaces.append(namespace)
return namespaces
def execute_ip(ifconfig_args, network_namespace=None, sudo=None,
**execute_params):
command = ['/sbin/ip'] + ifconfig_args
if network_namespace:
if sudo is None:
sudo = True
command = ['/sbin/ip', 'netns', 'exec', network_namespace] + command
result = sh.execute(command, stdin=False, stdout=True, stderr=True,
expect_exit_status=None, sudo=sudo, **execute_params)
if result.exit_status:
raise IpError(error=result.stderr, exit_status=result.exit_status)
return result.stdout

View File

@ -19,7 +19,6 @@ from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _execute
from tobiko.shell.sh import _hostname
from tobiko.shell.sh import _ifconfig
from tobiko.shell.sh import _io
from tobiko.shell.sh import _local
from tobiko.shell.sh import _process
@ -42,10 +41,6 @@ ShellExecuteResult = _execute.ShellExecuteResult
HostNameError = _hostname.HostnameError
get_hostname = _hostname.get_hostname
IfconfigError = _ifconfig.IfconfigError
list_ip_addresses = _ifconfig.list_ip_addresses
execute_ifconfig = _ifconfig.execute_ifconfig
join_chunks = _io.join_chunks
local_execute = _local.local_execute

View File

@ -54,11 +54,11 @@ def local_execute(command, environment=None, timeout=None, shell=None,
def local_process(command, environment=None, current_dir=None, timeout=None,
shell=None, stdin=None, stdout=None, stderr=True):
shell=None, stdin=None, stdout=None, stderr=True, sudo=None):
return LocalShellProcessFixture(
command=command, environment=environment, current_dir=current_dir,
timeout=timeout, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr)
stderr=stderr, sudo=sudo)
class LocalExecutePathFixture(_path.ExecutePathFixture):

View File

@ -34,9 +34,11 @@ MAX_TIMEOUT = 3600. # 1 hour
def process(command=None, environment=None, timeout=None, shell=None,
stdin=None, stdout=None, stderr=None, ssh_client=None, **kwargs):
stdin=None, stdout=None, stderr=None, ssh_client=None, sudo=None,
**kwargs):
kwargs.update(command=command, environment=environment, timeout=timeout,
shell=shell, stdin=stdin, stdout=stdout, stderr=stderr)
shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
sudo=sudo)
try:
from tobiko.shell.sh import _ssh
from tobiko.shell import ssh
@ -76,6 +78,7 @@ class ShellProcessParameters(Parameters):
stderr = True
buffer_size = io.DEFAULT_BUFFER_SIZE
poll_interval = 1.
sudo = None
class ShellProcessFixture(tobiko.SharedFixture):
@ -127,6 +130,14 @@ class ShellProcessFixture(tobiko.SharedFixture):
command = shell + [str(command)]
else:
command = _command.shell_command(command)
sudo = self.parameters.sudo
if sudo:
if sudo is True:
sudo = default_sudo_command()
else:
sudo = _command.shell_command(sudo)
command = sudo + command
self.command = command
def setup_timeout(self):
@ -483,4 +494,10 @@ def str_from_stream(stream):
def default_shell_command():
from tobiko import config
CONF = config.CONF
return _command.shell_command(CONF.tobiko.shell.command)
return _command.shell_command(CONF.tobiko.shell.sudo)
def default_sudo_command():
from tobiko import config
CONF = config.CONF
return _command.shell_command(CONF.tobiko.shell.sudo)

View File

@ -48,19 +48,19 @@ def ssh_execute(ssh_client, command, environment=None, timeout=None,
def ssh_process(command, environment=None, current_dir=None, timeout=None,
shell=None, stdin=None, stdout=None, stderr=None,
ssh_client=None):
ssh_client=None, sudo=None):
if ssh_client is None:
ssh_client = ssh.ssh_proxy_client()
if ssh_client:
return SSHShellProcessFixture(
command=command, environment=environment, current_dir=current_dir,
timeout=timeout, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, ssh_client=ssh_client)
stderr=stderr, ssh_client=ssh_client, sudo=sudo)
else:
return _local.local_process(
command=command, environment=environment, current_dir=current_dir,
timeout=timeout, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr)
stderr=stderr, sudo=sudo)
class SSHShellProcessParameters(_process.ShellProcessParameters):

View File

@ -24,7 +24,11 @@ OPTIONS = [
cfg.StrOpt('command',
default='/bin/sh -c',
help="Default shell command used for executing "
"local commands")
"local commands"),
cfg.StrOpt('sudo',
default='sudo',
help="Default sudo command used for executing "
"commands as superuser or another user")
]

View File

@ -19,7 +19,7 @@ import netaddr
import testtools
import tobiko
from tobiko.shell import sh
from tobiko.shell import ifconfig
from tobiko.shell import ssh
from tobiko.openstack import stacks
@ -33,7 +33,8 @@ class IfconfigTest(testtools.TestCase):
stacks.UbuntuServerStackFixture)
def test_list_ip_addresses(self, ip_version=None, **execute_params):
ips = sh.list_ip_addresses(ip_version=ip_version, **execute_params)
ips = ifconfig.list_ip_addresses(ip_version=ip_version,
**execute_params)
self.assertIsInstance(ips, tobiko.Selection)
for ip in ips:
self.assertIsInstance(ip, netaddr.IPAddress)

View File

@ -0,0 +1,85 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import netaddr
import six
import testtools
import tobiko
from tobiko.shell import ip
from tobiko.shell import ssh
from tobiko.openstack import stacks
class IpTest(testtools.TestCase):
centos_stack = tobiko.required_setup_fixture(
stacks.CentosServerStackFixture)
cirros_stack = tobiko.required_setup_fixture(
stacks.CirrosServerStackFixture)
ubuntu_stack = tobiko.required_setup_fixture(
stacks.UbuntuServerStackFixture)
def test_list_ip_addresses(self, ip_version=None, **execute_params):
ips = ip.list_ip_addresses(ip_version=ip_version, **execute_params)
self.assertIsInstance(ips, tobiko.Selection)
for ip_address in ips:
self.assertIsInstance(ip_address, netaddr.IPAddress)
if ip_version:
self.assertEqual(ips.with_attributes(version=ip_version), ips)
def test_list_ip_addresses_with_ipv4(self):
self.test_list_ip_addresses(ip_version=4)
def test_list_ip_addresses_with_ipv6(self):
self.test_list_ip_addresses(ip_version=6)
def test_list_ip_addresses_with_centos_server(self):
self.test_list_ip_addresses(ssh_client=self.centos_stack.ssh_client)
def test_list_ip_addresses_with_cirros_server(self):
self.test_list_ip_addresses(ssh_client=self.cirros_stack.ssh_client)
def test_list_ip_addresses_with_ubuntu_server(self):
self.test_list_ip_addresses(ssh_client=self.ubuntu_stack.ssh_client)
def test_list_ip_addresses_with_proxy_ssh_client(self):
ssh_client = ssh.ssh_proxy_client()
if ssh_client is None:
self.skip('SSH proxy server not configured')
self.test_list_ip_addresses(ssh_client=ssh_client)
def test_list_namespaces(self, **execute_params):
namespaces = ip.list_network_namespaces(**execute_params)
self.assertIsInstance(namespaces, list)
for namespace in namespaces:
self.assertIsInstance(namespace, six.string_types)
self.test_list_ip_addresses(network_namespace=namespace)
def test_list_namespaces_with_centos_server(self):
self.test_list_namespaces(ssh_client=self.centos_stack.ssh_client)
def test_list_namespaces_with_ubuntu_server(self):
self.test_list_namespaces(ssh_client=self.ubuntu_stack.ssh_client)
def test_list_namespaces_with_proxy_ssh_client(self):
ssh_client = ssh.ssh_proxy_client()
if ssh_client is None:
self.skip('SSH proxy server not configured')
self.test_list_namespaces(ssh_client=ssh_client)

View File

@ -19,7 +19,7 @@ import testtools
import tobiko
from tobiko.shell import ping
from tobiko.shell import sh
from tobiko.shell import ip
from tobiko.openstack import neutron
from tobiko.openstack import stacks
@ -32,9 +32,10 @@ class PortTest(testtools.TestCase):
def test_port_ips(self):
port = self.stack.port_details
server_ips = sh.list_ip_addresses(ssh_client=self.stack.ssh_client)
for port_ip in neutron.list_port_ip_addresses(port=port):
self.assertIn(port_ip, server_ips)
server_addresses = ip.list_ip_addresses(
ssh_client=self.stack.ssh_client)
for address in neutron.list_port_ip_addresses(port=port):
self.assertIn(address, server_addresses)
def test_port_network(self):
port = self.stack.port_details
@ -55,8 +56,8 @@ class PortTest(testtools.TestCase):
for port in ports:
self.assertEqual(network_id, port['network_id'])
self.assertEqual(device_id, port['device_id'])
for ip in neutron.list_port_ip_addresses(port=port):
ping.ping(host=ip,
for address in neutron.list_port_ip_addresses(port=port):
ping.ping(host=address,
ssh_client=self.stack.ssh_client).assert_replied()
def test_ping_inner_gateway_ip(self):