Refactor topology nodes discovery
Change-Id: I698f612c182e4f6179ea8c53528df36b2f8f8738
This commit is contained in:
parent
efe9febf9a
commit
f5a703b77d
196
tobiko/openstack/topology/_address.py
Normal file
196
tobiko/openstack/topology/_address.py
Normal file
@ -0,0 +1,196 @@
|
||||
# Copyright 2020 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import socket
|
||||
import typing
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
|
||||
import tobiko
|
||||
from tobiko.shell import ssh
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def list_addresses(obj,
|
||||
ip_version: typing.Optional[int] = None,
|
||||
port: typing.Union[int, str, None] = None,
|
||||
ssh_config: bool = False) -> \
|
||||
typing.List[netaddr.IPAddress]:
|
||||
if isinstance(obj, tobiko.Selection):
|
||||
addresses = obj
|
||||
elif isinstance(obj, netaddr.IPAddress):
|
||||
addresses = tobiko.select([obj])
|
||||
elif isinstance(obj, str):
|
||||
addresses = tobiko.select(
|
||||
list_host_addresses(obj,
|
||||
ip_version=ip_version,
|
||||
port=port,
|
||||
ssh_config=ssh_config))
|
||||
elif isinstance(obj, collections.Sequence):
|
||||
addresses = tobiko.Selection()
|
||||
for item in iter(obj):
|
||||
addresses.extend(list_addresses(item))
|
||||
|
||||
if addresses and ip_version is not None:
|
||||
addresses = addresses.with_attributes(version=ip_version)
|
||||
return addresses
|
||||
|
||||
|
||||
@functools.lru_cache()
|
||||
def list_host_addresses(host: str,
|
||||
ip_version: typing.Optional[int] = None,
|
||||
port: typing.Union[int, str, None] = None,
|
||||
ssh_config: bool = False) -> \
|
||||
typing.List[netaddr.IPAddress]:
|
||||
|
||||
if not port:
|
||||
if ssh_config:
|
||||
port = 22 # use the default port for SSH protocol
|
||||
else:
|
||||
port = 0
|
||||
|
||||
addresses = []
|
||||
hosts = [host]
|
||||
resolved = set()
|
||||
while hosts:
|
||||
host = hosts.pop()
|
||||
if host in resolved:
|
||||
LOG.debug(f"Cyclic address resolution detected for host {host}")
|
||||
continue # already resolved
|
||||
|
||||
resolved.add(host) # avoid resolving it again
|
||||
address = parse_ip_address(host)
|
||||
if address:
|
||||
addresses.append(address)
|
||||
continue
|
||||
|
||||
# use socket host address resolution to get IP addresses
|
||||
addresses.extend(resolv_host_addresses(host=host,
|
||||
port=port,
|
||||
ip_version=ip_version))
|
||||
if ssh_config:
|
||||
# get additional socket addresses from SSH configuration
|
||||
hosts.extend(list_ssh_hostconfig_hostnames(host))
|
||||
|
||||
if [host] != [str(address) for address in addresses]:
|
||||
LOG.debug(f"Host '{host}' addresses resolved as: {addresses}")
|
||||
return addresses
|
||||
|
||||
|
||||
def parse_ip_address(host: str) -> typing.Optional[netaddr.IPAddress]:
|
||||
try:
|
||||
return netaddr.IPAddress(host)
|
||||
except (netaddr.AddrFormatError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
ADDRESS_FAMILIES = {
|
||||
4: socket.AF_INET,
|
||||
6: socket.AF_INET6,
|
||||
None: socket.AF_UNSPEC
|
||||
}
|
||||
|
||||
|
||||
# pylint: disable=no-member
|
||||
AddressFamily = socket.AddressFamily
|
||||
# pylint: enable=no-member
|
||||
|
||||
|
||||
def get_address_family(ip_version: typing.Optional[int] = None) -> \
|
||||
AddressFamily:
|
||||
try:
|
||||
return ADDRESS_FAMILIES[ip_version]
|
||||
except KeyError:
|
||||
pass
|
||||
raise ValueError(f"{ip_version!r} is an invalid value for getting address "
|
||||
"family")
|
||||
|
||||
|
||||
IP_VERSIONS = {
|
||||
socket.AF_INET: 4,
|
||||
socket.AF_INET6: 6,
|
||||
}
|
||||
|
||||
|
||||
def get_ip_version(family: AddressFamily) -> int:
|
||||
try:
|
||||
return IP_VERSIONS[family]
|
||||
except KeyError:
|
||||
pass
|
||||
raise ValueError(f"{family!r} is an invalid value for getting IP version")
|
||||
|
||||
|
||||
def resolv_host_addresses(host: str,
|
||||
port: typing.Union[int, str] = 0,
|
||||
ip_version: typing.Optional[int] = None) -> \
|
||||
typing.List[netaddr.IPAddress]:
|
||||
|
||||
family = get_address_family(ip_version)
|
||||
proto = socket.AI_CANONNAME | socket.IPPROTO_TCP
|
||||
LOG.debug(f"Resolve IP addresses for host '{host}' "
|
||||
f"(port={port}, family={family}, proto={proto})'...")
|
||||
try:
|
||||
addrinfo = socket.getaddrinfo(host, port, family=family, proto=proto)
|
||||
except socket.gaierror as ex:
|
||||
LOG.debug(f"Can't resolve IP addresses for host '{host}': {ex}")
|
||||
return []
|
||||
|
||||
addresses = []
|
||||
for _family, _, _, canonical_name, sockaddr in addrinfo:
|
||||
if family != socket.AF_UNSPEC and family != _family:
|
||||
LOG.error(f"Resolved address family '{_family}' 'of address "
|
||||
f"'{sockaddr}' is not {family} "
|
||||
f"(canonical_name={canonical_name}")
|
||||
continue
|
||||
|
||||
address = parse_ip_address(sockaddr[0])
|
||||
if address is None:
|
||||
LOG.error(f"Resolved address '{sockaddr[0]}' is not a valid IP "
|
||||
f"address (canonical_name={canonical_name})")
|
||||
continue
|
||||
|
||||
if ip_version and ip_version != address.version:
|
||||
LOG.error(f"Resolved IP address version '{address.version}' of "
|
||||
f"'{address}' is not {ip_version} "
|
||||
f"(canonical_name={canonical_name})")
|
||||
continue
|
||||
|
||||
addresses.append(address)
|
||||
LOG.debug(f"IP address for host '{host}' has been resolved as "
|
||||
f"'{address}' (canonical_name={canonical_name})")
|
||||
|
||||
if not addresses:
|
||||
LOG.debug(f"Host name '{host}' resolved to any IP address.")
|
||||
|
||||
return addresses
|
||||
|
||||
|
||||
def list_ssh_hostconfig_hostnames(host: str) -> typing.List[str]:
|
||||
hosts: typing.List[str] = [host]
|
||||
hostnames: typing.List[str] = []
|
||||
while hosts:
|
||||
hostname = ssh.ssh_host_config(hosts.pop()).hostname
|
||||
if (hostname is not None and
|
||||
host != hostname and
|
||||
hostname not in hostnames):
|
||||
LOG.debug(f"Found hostname '{hostname}' for '{host}' in SSH "
|
||||
"configuration")
|
||||
hostnames.append(hostname)
|
||||
return hostnames
|
26
tobiko/openstack/topology/_config.py
Normal file
26
tobiko/openstack/topology/_config.py
Normal file
@ -0,0 +1,26 @@
|
||||
# Copyright 2020 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import tobiko
|
||||
|
||||
|
||||
class OpenStackTopologyConfig(tobiko.SharedFixture):
|
||||
|
||||
conf = None
|
||||
|
||||
def setup_fixture(self):
|
||||
from tobiko import config
|
||||
CONF = config.CONF
|
||||
self.conf = CONF.tobiko.topology
|
144
tobiko/openstack/topology/_connection.py
Normal file
144
tobiko/openstack/topology/_connection.py
Normal file
@ -0,0 +1,144 @@
|
||||
# Copyright 2020 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import collections
|
||||
import typing
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
|
||||
import tobiko
|
||||
from tobiko.openstack.topology import _config
|
||||
from tobiko.shell import ssh
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class UreachableSSHServer(tobiko.TobikoException):
|
||||
message = ("Unable to reach SSH server through any address: {addresses}. "
|
||||
"Failures: {failures}")
|
||||
|
||||
|
||||
class SSHConnection(object):
|
||||
|
||||
def __init__(self,
|
||||
address: netaddr.IPAddress,
|
||||
ssh_client: typing.Optional[ssh.SSHClientFixture] = None,
|
||||
failure: typing.Optional[Exception] = None):
|
||||
self.address = address
|
||||
self.ssh_client = ssh_client
|
||||
self.failure = failure
|
||||
|
||||
def __repr__(self) -> str:
|
||||
attributes = ", ".join(f"{n}={v!r}"
|
||||
for n, v in self._iter_attributes())
|
||||
return f"{type(self).__name__}({attributes})"
|
||||
|
||||
def _iter_attributes(self):
|
||||
yield 'address', self.address
|
||||
if self.ssh_client is not None:
|
||||
yield 'ssh_client', self.ssh_client
|
||||
if self.failure is not None:
|
||||
yield 'failure', self.failure
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return (self.failure is None and
|
||||
self.ssh_client is not None)
|
||||
|
||||
|
||||
class SSHConnectionManager(tobiko.SharedFixture):
|
||||
|
||||
config = tobiko.required_setup_fixture(_config.OpenStackTopologyConfig)
|
||||
|
||||
def __init__(self):
|
||||
super(SSHConnectionManager, self).__init__()
|
||||
self._connections: typing.Dict[netaddr.IPAddress, SSHConnection] = (
|
||||
collections.OrderedDict())
|
||||
|
||||
def cleanup_fixture(self):
|
||||
connections = list(self._connections.values())
|
||||
self._connections.clear()
|
||||
for connection in connections:
|
||||
connection.close()
|
||||
|
||||
def connect(self,
|
||||
addresses: typing.List[netaddr.IPAddress],
|
||||
**connect_parameters) -> ssh.SSHClientFixture:
|
||||
if not addresses:
|
||||
raise ValueError(f"'addresses' list is empty: {addresses}")
|
||||
|
||||
connections = tobiko.select(self.list_connections(addresses))
|
||||
try:
|
||||
return connections.with_attributes(is_valid=True).first.ssh_client
|
||||
except tobiko.ObjectNotFound:
|
||||
pass
|
||||
|
||||
for connection in connections.with_attributes(failure=None):
|
||||
# connection not tried yet
|
||||
LOG.debug("Establishing SSH connection to "
|
||||
f"'{connection.address}'")
|
||||
try:
|
||||
ssh_client = self.ssh_client(connection.address,
|
||||
**connect_parameters)
|
||||
ssh_client.connect(retry_count=1, connection_attempts=1)
|
||||
except Exception as ex:
|
||||
LOG.debug("Failed establishing SSH connect to "
|
||||
f"'{connection.address}'.", exc_info=1)
|
||||
# avoid re-checking again later the same address
|
||||
connection.failure = ex
|
||||
continue
|
||||
else:
|
||||
# cache valid connection SSH client for later use
|
||||
connection.ssh_client = ssh_client
|
||||
assert connection.is_valid
|
||||
return ssh_client
|
||||
|
||||
failures = '\n'.join(str(connection.failure)
|
||||
for connection in connections)
|
||||
raise UreachableSSHServer(addresses=addresses,
|
||||
failures=failures)
|
||||
|
||||
def list_connections(self, addresses: typing.List[netaddr.IPAddress]) -> \
|
||||
typing.List[SSHConnection]:
|
||||
connections = []
|
||||
for address in addresses:
|
||||
connections.append(self.get_connection(address))
|
||||
return connections
|
||||
|
||||
def get_connection(self, address: netaddr.IPAddress):
|
||||
tobiko.check_valid_type(address, netaddr.IPAddress)
|
||||
return self._connections.setdefault(address,
|
||||
SSHConnection(address))
|
||||
|
||||
def ssh_client(self, address, username=None, port=None,
|
||||
key_filename=None, **ssh_parameters):
|
||||
username = username or self.config.conf.username
|
||||
port = port or self.config.conf.port
|
||||
key_filename = key_filename or self.config.conf.key_file
|
||||
return ssh.ssh_client(host=str(address),
|
||||
username=username,
|
||||
key_filename=key_filename,
|
||||
**ssh_parameters)
|
||||
|
||||
|
||||
SSH_CONNECTIONS = SSHConnectionManager()
|
||||
|
||||
|
||||
def ssh_connect(addresses: typing.List[netaddr.IPAddress],
|
||||
manager: typing.Optional[SSHConnectionManager] = None,
|
||||
**connect_parameters) -> ssh.SSHClientFixture:
|
||||
manager = manager or SSH_CONNECTIONS
|
||||
return manager.connect(addresses=addresses, **connect_parameters)
|
@ -14,11 +14,9 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import collections
|
||||
import socket
|
||||
import typing # noqa
|
||||
import weakref
|
||||
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
import six
|
||||
@ -28,11 +26,13 @@ import tobiko
|
||||
from tobiko import docker
|
||||
from tobiko import podman
|
||||
from tobiko.shell import ip
|
||||
from tobiko.shell import ping
|
||||
from tobiko.shell import sh
|
||||
from tobiko.shell import ssh
|
||||
from tobiko.openstack import nova
|
||||
from tobiko.openstack import keystone
|
||||
from tobiko.openstack.topology import _address
|
||||
from tobiko.openstack.topology import _config
|
||||
from tobiko.openstack.topology import _connection
|
||||
from tobiko.openstack.topology import _exception
|
||||
|
||||
|
||||
@ -103,13 +103,13 @@ class OpenStackTopologyNode(object):
|
||||
_docker_client = None
|
||||
_podman_client = None
|
||||
|
||||
def __init__(self, topology, name: str, public_ip,
|
||||
ssh_client):
|
||||
def __init__(self, topology, name: str, ssh_client: ssh.SSHClientFixture,
|
||||
addresses: typing.List[netaddr.IPAddress]):
|
||||
self._topology = weakref.ref(topology)
|
||||
self.name: str = name
|
||||
self.public_ip = public_ip
|
||||
self.name = name
|
||||
self.ssh_client = ssh_client
|
||||
self.groups: typing.Set[str] = set()
|
||||
self.addresses: typing.List[netaddr.IPAddress] = list(addresses)
|
||||
|
||||
@property
|
||||
def topology(self):
|
||||
@ -118,6 +118,10 @@ class OpenStackTopologyNode(object):
|
||||
def add_group(self, group: str):
|
||||
self.groups.add(group)
|
||||
|
||||
@property
|
||||
def public_ip(self):
|
||||
return self.addresses[0]
|
||||
|
||||
@property
|
||||
def ssh_parameters(self):
|
||||
return self.ssh_client.setup_connect_parameters()
|
||||
@ -143,19 +147,9 @@ class OpenStackTopologyNode(object):
|
||||
name=self.name)
|
||||
|
||||
|
||||
class OpenStackTopologyConfig(tobiko.SharedFixture):
|
||||
|
||||
conf = None
|
||||
|
||||
def setup_fixture(self):
|
||||
from tobiko import config
|
||||
CONF = config.CONF
|
||||
self.conf = CONF.tobiko.topology
|
||||
|
||||
|
||||
class OpenStackTopology(tobiko.SharedFixture):
|
||||
|
||||
config = tobiko.required_setup_fixture(OpenStackTopologyConfig)
|
||||
config = tobiko.required_setup_fixture(_config.OpenStackTopologyConfig)
|
||||
|
||||
agent_to_service_name_mappings = {
|
||||
'neutron-dhcp-agent': 'devstack@q-dhcp',
|
||||
@ -166,23 +160,27 @@ class OpenStackTopology(tobiko.SharedFixture):
|
||||
|
||||
has_containers = False
|
||||
|
||||
_connections = tobiko.required_setup_fixture(
|
||||
_connection.SSHConnectionManager)
|
||||
|
||||
def __init__(self):
|
||||
super(OpenStackTopology, self).__init__()
|
||||
self._reachable_ips = set()
|
||||
self._unreachable_ips = set()
|
||||
self._nodes_by_name = collections.OrderedDict()
|
||||
self._nodes_by_ips = collections.OrderedDict()
|
||||
self._nodes_by_group = collections.OrderedDict()
|
||||
self._names: typing.Dict[str, OpenStackTopologyNode] = (
|
||||
collections.OrderedDict())
|
||||
self._groups: typing.Dict[str, tobiko.Selection] = (
|
||||
collections.OrderedDict())
|
||||
self._addresses: typing.Dict[netaddr.IPAddress,
|
||||
OpenStackTopologyNode] = (
|
||||
collections.OrderedDict())
|
||||
|
||||
def setup_fixture(self):
|
||||
self.discover_nodes()
|
||||
|
||||
def cleanup_fixture(self):
|
||||
self._reachable_ips.clear()
|
||||
self._unreachable_ips.clear()
|
||||
self._nodes_by_name.clear()
|
||||
self._nodes_by_ips.clear()
|
||||
self._nodes_by_group.clear()
|
||||
tobiko.cleanup_fixture(self._connections)
|
||||
self._names.clear()
|
||||
self._groups.clear()
|
||||
self._addresses.clear()
|
||||
|
||||
def get_agent_service_name(self, agent_name):
|
||||
try:
|
||||
@ -212,51 +210,70 @@ class OpenStackTopology(tobiko.SharedFixture):
|
||||
address=hypervisor.host_ip,
|
||||
group='compute')
|
||||
|
||||
def add_node(self, hostname=None, address=None, group=None,
|
||||
ssh_client=None) -> OpenStackTopologyNode:
|
||||
name = hostname and node_name_from_hostname(hostname) or None
|
||||
ips = set()
|
||||
if address:
|
||||
ips.update(self._ips(address))
|
||||
def add_node(self,
|
||||
hostname: typing.Optional[str] = None,
|
||||
address: typing.Optional[str] = None,
|
||||
group: typing.Optional[str] = None,
|
||||
ssh_client: typing.Optional[ssh.SSHClientFixture] = None) \
|
||||
-> OpenStackTopologyNode:
|
||||
if hostname:
|
||||
ips.update(self._ips(hostname))
|
||||
ips = tobiko.select(ips)
|
||||
name = node_name_from_hostname(hostname)
|
||||
else:
|
||||
name = None
|
||||
|
||||
addresses: typing.List[netaddr.IPAddress] = []
|
||||
if address:
|
||||
# add manually configure addresses first
|
||||
addresses.extend(self._list_addresses(address))
|
||||
if hostname:
|
||||
# detect more addresses from the hostname
|
||||
addresses.extend(self._list_addresses(hostname))
|
||||
if ssh_client is not None:
|
||||
# detect all global addresses from remote server
|
||||
addresses.extend(self._list_addresses_from_host(
|
||||
ssh_client=ssh_client))
|
||||
addresses = tobiko.select(remove_duplications(addresses))
|
||||
|
||||
try:
|
||||
node = self.get_node(name=name, address=ips)
|
||||
node = self.get_node(name=name, address=addresses)
|
||||
except _exception.NoSuchOpenStackTopologyNode:
|
||||
node = self._add_node(hostname=hostname, ips=ips,
|
||||
ssh_client=ssh_client)
|
||||
node = None
|
||||
|
||||
node = node or self._add_node(addresses=addresses,
|
||||
hostname=hostname,
|
||||
ssh_client=ssh_client)
|
||||
|
||||
if group:
|
||||
# Add group anyway enven if the node hasn't been added
|
||||
# Add group anyway even if the node hasn't been added
|
||||
group_nodes = self.add_group(group=group)
|
||||
if node:
|
||||
if node and node not in group_nodes:
|
||||
group_nodes.append(node)
|
||||
node.add_group(group=group)
|
||||
|
||||
return node
|
||||
|
||||
def _add_node(self, ips, hostname=None, ssh_client=None):
|
||||
public_ip = self._public_ip(ips, ssh_client=ssh_client)
|
||||
if public_ip is None:
|
||||
LOG.debug("Unable to SSH connect to any node IP address: %s"
|
||||
','.join(str(ip_address) for ip_address in ips))
|
||||
return None
|
||||
def _add_node(self,
|
||||
addresses: typing.List[netaddr.IPAddress],
|
||||
hostname: str = None,
|
||||
ssh_client: typing.Optional[ssh.SSHClientFixture] = None):
|
||||
if ssh_client is None:
|
||||
ssh_client = self._ssh_connect(addresses=addresses)
|
||||
addresses.extend(self._list_addresses_from_host(ssh_client=ssh_client))
|
||||
addresses = tobiko.select(remove_duplications(addresses))
|
||||
|
||||
# I need to get a name for the new node
|
||||
ssh_client = ssh_client or self._ssh_client(public_ip)
|
||||
hostname = hostname or sh.get_hostname(ssh_client=ssh_client)
|
||||
name = node_name_from_hostname(hostname)
|
||||
try:
|
||||
node = self._nodes_by_name[name]
|
||||
node = self._names[name]
|
||||
except KeyError:
|
||||
self._nodes_by_name[name] = node = self.create_node(
|
||||
name=name, public_ip=public_ip, ssh_client=ssh_client)
|
||||
other = self._nodes_by_ips.setdefault(public_ip, node)
|
||||
if node is not other:
|
||||
LOG.error("Two nodes have the same IP address (%s): %r, %r",
|
||||
public_ip, node.name, other.name)
|
||||
self._names[name] = node = self.create_node(name=name,
|
||||
ssh_client=ssh_client,
|
||||
addresses=addresses)
|
||||
for address in addresses:
|
||||
address_node = self._addresses.setdefault(address, node)
|
||||
if address_node is not node:
|
||||
LOG.error(f"Address '{address}' of node '{name}' is already "
|
||||
f"used by node '{address_node.name}'")
|
||||
return node
|
||||
|
||||
def get_node(self, name=None, hostname=None, address=None):
|
||||
@ -266,184 +283,107 @@ class OpenStackTopology(tobiko.SharedFixture):
|
||||
tobiko.check_valid_type(name, six.string_types)
|
||||
details['name'] = name
|
||||
try:
|
||||
return self._nodes_by_name[name]
|
||||
return self._names[name]
|
||||
except KeyError:
|
||||
pass
|
||||
if address:
|
||||
details['address'] = address
|
||||
for ip_address in self._ips(address):
|
||||
for address in self._list_addresses(address):
|
||||
try:
|
||||
return self._nodes_by_ips[ip_address]
|
||||
return self._addresses[address]
|
||||
except KeyError:
|
||||
pass
|
||||
raise _exception.NoSuchOpenStackTopologyNode(details=details)
|
||||
|
||||
def create_node(self, name, public_ip, ssh_client, **kwargs):
|
||||
def create_node(self, name, ssh_client, **kwargs):
|
||||
return OpenStackTopologyNode(topology=self, name=name,
|
||||
public_ip=public_ip,
|
||||
ssh_client=ssh_client, **kwargs)
|
||||
|
||||
@property
|
||||
def nodes(self):
|
||||
return tobiko.select(self.get_node(name)
|
||||
for name in self._nodes_by_name)
|
||||
for name in self._names)
|
||||
|
||||
def add_group(self, group):
|
||||
def add_group(self, group: str) -> tobiko.Selection:
|
||||
try:
|
||||
return self._nodes_by_group[group]
|
||||
return self._groups[group]
|
||||
except KeyError:
|
||||
self._nodes_by_group[group] = nodes = self.create_group()
|
||||
self._groups[group] = nodes = self.create_group()
|
||||
return nodes
|
||||
|
||||
def create_group(self):
|
||||
def create_group(self) -> tobiko.Selection:
|
||||
return tobiko.Selection()
|
||||
|
||||
def get_group(self, group):
|
||||
def get_group(self, group) -> tobiko.Selection:
|
||||
try:
|
||||
return self._nodes_by_group[group]
|
||||
return self._groups[group]
|
||||
except KeyError as ex:
|
||||
raise _exception.NoSuchOpenStackTopologyNodeGroup(
|
||||
group=group) from ex
|
||||
|
||||
def get_groups(self, groups):
|
||||
nodes = []
|
||||
for i in groups:
|
||||
nodes.extend(self.get_group(i))
|
||||
def get_groups(self, groups) -> tobiko.Selection:
|
||||
nodes = tobiko.Selection()
|
||||
for group in groups:
|
||||
nodes.extend(self.get_group(group))
|
||||
return nodes
|
||||
|
||||
@property
|
||||
def groups(self):
|
||||
return list(self._nodes_by_group)
|
||||
def groups(self) -> typing.List[str]:
|
||||
return list(self._groups)
|
||||
|
||||
def _ssh_client(self, address, username=None, port=None,
|
||||
key_filename=None, **ssh_parameters):
|
||||
username = username or self.config.conf.username
|
||||
port = port or self.config.conf.port
|
||||
key_filename = key_filename or self.config.conf.key_file
|
||||
return ssh.ssh_client(host=str(address),
|
||||
username=username,
|
||||
key_filename=key_filename,
|
||||
**ssh_parameters)
|
||||
def _ssh_connect(self, addresses: typing.List[netaddr.IPAddress],
|
||||
**connect_params) -> ssh.SSHClientFixture:
|
||||
|
||||
def _public_ip(self, ips, ssh_client=None):
|
||||
reachable_ip = self._reachable_ip(ips)
|
||||
if reachable_ip:
|
||||
return reachable_ip
|
||||
|
||||
if not ssh_client:
|
||||
# Try connecting via other nodes to get target node IP
|
||||
# addresses
|
||||
proxy_client = None
|
||||
try:
|
||||
return _connection.ssh_connect(addresses, **connect_params)
|
||||
except _connection.UreachableSSHServer:
|
||||
for proxy_node in self.nodes:
|
||||
proxy_client = proxy_node.ssh_client
|
||||
if proxy_client:
|
||||
internal_ip = self._reachable_ip(ips,
|
||||
proxy_client=proxy_client)
|
||||
if internal_ip:
|
||||
ssh_client = self._ssh_client(
|
||||
internal_ip, proxy_client=proxy_client)
|
||||
break
|
||||
if ssh_client:
|
||||
break
|
||||
LOG.debug("Try connecting through a proxy node "
|
||||
f"'{proxy_node.name}'")
|
||||
try:
|
||||
return self._ssh_connect_with_proxy_client(
|
||||
addresses, proxy_client, **connect_params)
|
||||
except _connection.UreachableSSHServer:
|
||||
pass
|
||||
raise
|
||||
|
||||
if ssh_client:
|
||||
# Connect via SSH to to get target node IP addresses
|
||||
ips = self._ips_from_host(ssh_client=ssh_client)
|
||||
reachable_ip = self._reachable_ip(ips)
|
||||
if reachable_ip:
|
||||
return reachable_ip
|
||||
|
||||
LOG.warning('Unable to reach remote host via any IP address: %s',
|
||||
', '.join(str(a) for a in ips))
|
||||
return None
|
||||
|
||||
def _reachable_ip(self, ips, proxy_client=None, **kwargs):
|
||||
reachable = None
|
||||
if proxy_client:
|
||||
untested_ips = ips
|
||||
else:
|
||||
# Exclude unreachable addresses
|
||||
untested_ips = list()
|
||||
for address in ips:
|
||||
if address not in self._unreachable_ips:
|
||||
if address in self._reachable_ips:
|
||||
# Will take result from the first one of marked already
|
||||
# marked as reachable
|
||||
reachable = reachable or address
|
||||
else:
|
||||
# Will later search for results between the other IPs
|
||||
untested_ips.append(address)
|
||||
|
||||
for address in untested_ips:
|
||||
if reachable is None:
|
||||
try:
|
||||
received = ping.ping(address, count=1, timeout=5.,
|
||||
ssh_client=proxy_client,
|
||||
**kwargs).received
|
||||
except ping.PingFailed:
|
||||
pass
|
||||
else:
|
||||
if received:
|
||||
reachable = address
|
||||
# Mark IP as reachable
|
||||
self._reachable_ips.add(address)
|
||||
continue
|
||||
|
||||
# Mark IP as unreachable
|
||||
self._unreachable_ips.add(address)
|
||||
|
||||
return reachable
|
||||
def _ssh_connect_with_proxy_client(self, addresses, proxy_client,
|
||||
**connect_params) -> \
|
||||
ssh.SSHClientFixture:
|
||||
ssh_client = _connection.ssh_connect(addresses,
|
||||
proxy_client=proxy_client,
|
||||
**connect_params)
|
||||
addresses = self._list_addresses_from_host(ssh_client=ssh_client)
|
||||
try:
|
||||
LOG.debug("Try connecting through an address that doesn't require "
|
||||
"an SSH proxy host")
|
||||
return _connection.ssh_connect(addresses, **connect_params)
|
||||
except _connection.UreachableSSHServer:
|
||||
return ssh_client
|
||||
|
||||
@property
|
||||
def ip_version(self):
|
||||
def ip_version(self) -> typing.Optional[int]:
|
||||
ip_version = self.config.conf.ip_version
|
||||
return ip_version and int(ip_version) or None
|
||||
|
||||
def _ips_from_host(self, **kwargs):
|
||||
return ip.list_ip_addresses(ip_version=self.ip_version,
|
||||
scope='global', **kwargs)
|
||||
def _list_addresses_from_host(self, ssh_client: ssh.SSHClientFixture):
|
||||
return ip.list_ip_addresses(ssh_client=ssh_client,
|
||||
ip_version=self.ip_version,
|
||||
scope='global')
|
||||
|
||||
def _ips(self, obj):
|
||||
if isinstance(obj, tobiko.Selection):
|
||||
ips = obj
|
||||
elif isinstance(obj, netaddr.IPAddress):
|
||||
ips = tobiko.select([obj])
|
||||
elif isinstance(obj, six.string_types):
|
||||
try:
|
||||
ips = tobiko.select([netaddr.IPAddress(obj)])
|
||||
except (netaddr.AddrFormatError, ValueError):
|
||||
ips = resolve_host_ips(obj)
|
||||
else:
|
||||
for item in iter(obj):
|
||||
tobiko.check_valid_type(item, netaddr.IPAddress)
|
||||
ips = tobiko.select(obj)
|
||||
|
||||
if ips and self.ip_version:
|
||||
ips = ips.with_attributes(version=self.ip_version)
|
||||
return ips
|
||||
|
||||
|
||||
def resolve_host_ips(host, port=0):
|
||||
tobiko.check_valid_type(host, six.string_types)
|
||||
LOG.debug('Calling getaddrinfo with host %r', host)
|
||||
ips = tobiko.Selection()
|
||||
try:
|
||||
addrinfo = socket.getaddrinfo(host, port, 0, 0,
|
||||
socket.AI_CANONNAME | socket.IPPROTO_TCP)
|
||||
except socket.gaierror:
|
||||
LOG.exception('Error calling getaddrinfo for host %r', host)
|
||||
else:
|
||||
for _, _, _, canonical_name, sockaddr in addrinfo:
|
||||
try:
|
||||
ips.append(netaddr.IPAddress(sockaddr[0]))
|
||||
except netaddr.AddrFormatError as ex:
|
||||
LOG.error("Invalid sockaddr for host %r: %r -> %r (%s)",
|
||||
host, canonical_name, sockaddr, ex)
|
||||
else:
|
||||
LOG.debug("IP address for host %r: %r -> %r",
|
||||
host, canonical_name, sockaddr)
|
||||
return ips
|
||||
def _list_addresses(self, obj) -> typing.List[netaddr.IPAddress]:
|
||||
return _address.list_addresses(obj,
|
||||
ip_version=self.ip_version,
|
||||
ssh_config=True)
|
||||
|
||||
|
||||
def node_name_from_hostname(hostname):
|
||||
return hostname.split('.', 1)[0].lower()
|
||||
|
||||
|
||||
def remove_duplications(items: typing.List) -> typing.List:
|
||||
# use all items as dictionary keys to remove duplications
|
||||
mapping = collections.OrderedDict((k, None) for k in items)
|
||||
return list(mapping.keys())
|
||||
|
@ -34,8 +34,6 @@ class TripleoTopologyTest(test_topology.OpenStackTopologyTest):
|
||||
self.assertEqual(name, node.name)
|
||||
nodes = self.topology.get_group('undercloud')
|
||||
self.assertEqual([node], nodes)
|
||||
host_config = tripleo.undercloud_host_config()
|
||||
self.assertEqual(host_config.hostname, str(node.public_ip))
|
||||
|
||||
@tripleo.skip_if_missing_overcloud
|
||||
def test_overcloud_group(self):
|
||||
|
Loading…
Reference in New Issue
Block a user