diff --git a/releasenotes/notes/Added-parser-for-`ss`-command-ac426b522eb8adbe.yaml b/releasenotes/notes/Added-parser-for-`ss`-command-ac426b522eb8adbe.yaml new file mode 100644 index 000000000..ab5753865 --- /dev/null +++ b/releasenotes/notes/Added-parser-for-`ss`-command-ac426b522eb8adbe.yaml @@ -0,0 +1,4 @@ +--- +prelude: > +features: + - Add parser for ``ss`` command line tool diff --git a/tobiko/openstack/tests/__init__.py b/tobiko/openstack/tests/__init__.py index c2505760a..9e12d5e7c 100644 --- a/tobiko/openstack/tests/__init__.py +++ b/tobiko/openstack/tests/__init__.py @@ -20,7 +20,6 @@ from tobiko.openstack.tests import _neutron from tobiko.openstack.tests import _nova InvalidDBConnString = _neutron.InvalidDBConnString -ParsingError = _neutron.ParsingError RAFTStatusError = _neutron.RAFTStatusError test_neutron_agents_are_alive = _neutron.test_neutron_agents_are_alive test_ovn_dbs_validations = _neutron.test_ovn_dbs_validations diff --git a/tobiko/openstack/tests/_neutron.py b/tobiko/openstack/tests/_neutron.py index 3bc15aec0..503b3dc9b 100644 --- a/tobiko/openstack/tests/_neutron.py +++ b/tobiko/openstack/tests/_neutron.py @@ -14,6 +14,7 @@ from tobiko.openstack import neutron from tobiko.openstack import topology from tobiko.shell import ip from tobiko.shell import sh +from tobiko.shell import ss from tobiko.tripleo import _overcloud from tobiko.tripleo import pacemaker @@ -94,13 +95,14 @@ def ovn_dbs_vip_bindings(test_case): addrs, port = parse_ips_from_db_connections(ovn_conn_str[db]) if _overcloud.is_ovn_using_raft(): addrs.append(netaddr.IPAddress('0.0.0.0')) + addrs.append(netaddr.IPAddress('::')) for node in topology.list_openstack_nodes(group='controller'): - socs = get_ovn_db_socket_info(node.hostname, port) + socs = ss.tcp_listening(port=port, ssh_client=node.ssh_client) if sockets_centrallized and not socs: continue test_case.assertEqual(1, len(socs)) - test_case.assertIn(socs[0]['addr'], addrs) - test_case.assertEqual(socs[0]['process'], 'ovsdb-server') + test_case.assertIn(socs[0]['local_ip'], addrs) + test_case.assertEqual(socs[0]['process'][0], 'ovsdb-server') if sockets_centrallized: test_case.assertFalse(found_centralized) found_centralized = True @@ -187,32 +189,6 @@ def parse_ips_from_db_connections(con_str): return addrs, ref_port -class ParsingError(tobiko.TobikoException): - pass - - -def get_ovn_db_socket_info(hostname, port): - """Parse SS output for details about open port""" - socs = [] - cmd = 'ss -Hp state listening sport = {}'.format(port) - node_ssh = topology.get_openstack_node(hostname=hostname).ssh_client - output = sh.execute(cmd, ssh_client=node_ssh, sudo=True).stdout - for soc_details in output.splitlines(): - try: - _, _, _, con_tuple, _, process_info = soc_details.split() - addr = netaddr.IPAddress(con_tuple.split(':')[0]) - proc = process_info.split('"')[1] - except (ValueError, AttributeError, IndexError) as ex: - msg = 'Fail getting socket infornation from "{}"'.format( - soc_details) - LOG.error(msg) - raise ParsingError(message=msg) from ex - LOG.debug('Parsed "{}" ip address and "{}" process name from "{}"'. - format(addr, proc, soc_details)) - socs.append({'addr': addr, 'process': proc}) - return socs - - def ovn_dbs_are_synchronized(test_case): """Check that OVN DBs are syncronized across all controller nodes""" db_sync_status = get_ovn_db_sync_status() @@ -437,10 +413,11 @@ def test_raft_cluster(): test_case.assertTrue(leader_found) for node in topology.list_openstack_nodes(group='controller'): node_ips = ip.list_ip_addresses(ssh_client=node.ssh_client) - socs = get_ovn_db_socket_info(node.hostname, cluster_ports[db]) + socs = ss.tcp_listening(port=cluster_ports[db], + ssh_client=node.ssh_client) test_case.assertEqual(1, len(socs)) - test_case.assertIn(socs[0]['addr'], node_ips) - test_case.assertEqual(socs[0]['process'], 'ovsdb-server') + test_case.assertIn(socs[0]['local_ip'], node_ips) + test_case.assertEqual(socs[0]['process'][0], 'ovsdb-server') def test_ovs_bridges_mac_table_size(): diff --git a/tobiko/shell/ss.py b/tobiko/shell/ss.py new file mode 100644 index 000000000..899670367 --- /dev/null +++ b/tobiko/shell/ss.py @@ -0,0 +1,173 @@ +# Copyright (c) 2022 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import netaddr +from oslo_log import log +import typing # noqa + +import tobiko +from tobiko.shell import sh +from tobiko.shell import ssh + + +class SocketLookupError(tobiko.TobikoException): + message = ('ss command "{cmd}" failed with the error "{err}"') + + +class SockHeader(): + + def __init__(self, header_str: str): + self.header_str = header_str + self.header: typing.List[str] = [] + self._parse_header() + + def _parse_header(self): + if 'Netid' in self.header_str: + self.header.append('protocol') + if 'State' in self.header_str: + self.header.append('state') + if 'Recv-Q' in self.header_str: + self.header.append('recv_q') + if 'Send-Q' in self.header_str: + self.header.append('send_q') + if 'Local Address:Port' in self.header_str: + self.header.append('local') + if 'Peer Address:Port' in self.header_str: + self.header.append('remote') + if 'Process' in self.header_str: + self.header.append('process') + + def __len__(self): + return len(self.header) + + def __iter__(self): + for elem in self.header: + yield elem + + +class SockLine(str): + pass + + +class SockData(dict): + pass + + +LOG = log.getLogger(__name__) + + +def _ss(params: str = '', + ssh_client: ssh.SSHClientFixture = None, + parser: typing.Callable[[SockHeader, SockLine], SockData] = None, + **execute_params) -> typing.List[SockData]: + execute_params.update({'sudo': True}) + sockets = [] + command_line = "ss -np {}".format(params) + try: + stdout = sh.execute(command_line, + ssh_client=ssh_client, + **execute_params).stdout + except sh.ShellCommandFailed as ex: + if ex.stdout.startswith('Error'): + raise SocketLookupError(cmd=command_line, err=ex.stderr) from ex + if ex.exit_status > 0: + raise + parsed_header = False + for line in stdout.splitlines(): + if not parsed_header: + headers = SockHeader(line) + parsed_header = True + continue + sock_info = SockLine(line.strip()) + if parser: + try: + sockets.append(parser(headers, sock_info)) + except ValueError as ex: + LOG.error(str(ex)) + continue + else: + sockets.append(SockData({'raw_data': sock_info})) + return sockets + + +def get_processes(processes: str) -> typing.List[str]: + """Parse processes names from ss output + + The simpliest example of the proccesses suffix in ss output: + + users:(("httpd",pid=735448,fd=11)) + + But it can be a bit more complex + + users:(("httpd",pid=4969,fd=53),("httpd",pid=3328,fd=53)) + + Function return the list of all processes names ['httpd', 'httpd'] + """ + stack = [] + process_list = [] + nested = False + for idx, symbol in enumerate(processes): + if symbol == '(': + stack.append(idx) + nested = True + elif symbol == ')' and len(stack) == 1: + process_list.extend(get_processes(processes[stack[0]+1:idx])) + elif symbol == ')': + stack.pop() + if not nested: + process_list.append(processes.split('"', 2)[1]) + return process_list + + +def parse_tcp_socket(headers: SockHeader, + sock_info: SockLine) -> SockData: + socket_details = SockData() + sock_data = sock_info.split() + if len(headers) != len(sock_data): + msg = 'Unable to parse line: "{}"'.format(sock_info) + raise ValueError(msg) + for idx, header in enumerate(headers): + if not header: + continue + if header == 'local' or header == 'remote': + ip, port = sock_data[idx].strip().rsplit(':', 1) + if ip == '*': + ip = '0' + socket_details['{}_ip'.format(header)] = netaddr.IPAddress( + ip.strip('][')) + socket_details['{}_port'.format(header)] = port + elif header == 'process': + try: + socket_details[header] = get_processes(sock_data[idx]) + except IndexError as ex: + msg = 'Unable to parse processes part of the line: {}'.format( + sock_info) + raise ValueError(msg) from ex + else: + socket_details[header] = sock_data[idx] + return socket_details + + +def tcp_listening(address: str = '', + port: str = '', + **exec_params) -> typing.List[SockData]: + params = '-t state listening' + if port: + params += ' sport {}'.format(port) + if address: + params += ' src {}'.format(address) + return _ss(params=params, parser=parse_tcp_socket, **exec_params)