Create module to capture output of ss command

There are different places where we try to get information from ss
command so it make sense to have a common module to capture info

In current patch there is only tcp_listening handler added. More
functionality in following patches.

Change-Id: I15e8d1fc68946672d599863150da9406908522a4
This commit is contained in:
Alex Katz 2022-08-29 02:08:28 +03:00
parent 458bbab794
commit f773893c69
4 changed files with 186 additions and 33 deletions

View File

@ -0,0 +1,4 @@
---
prelude: >
features:
- Add parser for ``ss`` command line tool

View File

@ -20,7 +20,6 @@ from tobiko.openstack.tests import _neutron
from tobiko.openstack.tests import _nova from tobiko.openstack.tests import _nova
InvalidDBConnString = _neutron.InvalidDBConnString InvalidDBConnString = _neutron.InvalidDBConnString
ParsingError = _neutron.ParsingError
RAFTStatusError = _neutron.RAFTStatusError RAFTStatusError = _neutron.RAFTStatusError
test_neutron_agents_are_alive = _neutron.test_neutron_agents_are_alive test_neutron_agents_are_alive = _neutron.test_neutron_agents_are_alive
test_ovn_dbs_validations = _neutron.test_ovn_dbs_validations test_ovn_dbs_validations = _neutron.test_ovn_dbs_validations

View File

@ -14,6 +14,7 @@ from tobiko.openstack import neutron
from tobiko.openstack import topology from tobiko.openstack import topology
from tobiko.shell import ip from tobiko.shell import ip
from tobiko.shell import sh from tobiko.shell import sh
from tobiko.shell import ss
from tobiko.tripleo import _overcloud from tobiko.tripleo import _overcloud
from tobiko.tripleo import pacemaker from tobiko.tripleo import pacemaker
@ -94,13 +95,14 @@ def ovn_dbs_vip_bindings(test_case):
addrs, port = parse_ips_from_db_connections(ovn_conn_str[db]) addrs, port = parse_ips_from_db_connections(ovn_conn_str[db])
if _overcloud.is_ovn_using_raft(): if _overcloud.is_ovn_using_raft():
addrs.append(netaddr.IPAddress('0.0.0.0')) addrs.append(netaddr.IPAddress('0.0.0.0'))
addrs.append(netaddr.IPAddress('::'))
for node in topology.list_openstack_nodes(group='controller'): for node in topology.list_openstack_nodes(group='controller'):
socs = get_ovn_db_socket_info(node.hostname, port) socs = ss.tcp_listening(port=port, ssh_client=node.ssh_client)
if sockets_centrallized and not socs: if sockets_centrallized and not socs:
continue continue
test_case.assertEqual(1, len(socs)) test_case.assertEqual(1, len(socs))
test_case.assertIn(socs[0]['addr'], addrs) test_case.assertIn(socs[0]['local_ip'], addrs)
test_case.assertEqual(socs[0]['process'], 'ovsdb-server') test_case.assertEqual(socs[0]['process'][0], 'ovsdb-server')
if sockets_centrallized: if sockets_centrallized:
test_case.assertFalse(found_centralized) test_case.assertFalse(found_centralized)
found_centralized = True found_centralized = True
@ -187,32 +189,6 @@ def parse_ips_from_db_connections(con_str):
return addrs, ref_port return addrs, ref_port
class ParsingError(tobiko.TobikoException):
pass
def get_ovn_db_socket_info(hostname, port):
"""Parse SS output for details about open port"""
socs = []
cmd = 'ss -Hp state listening sport = {}'.format(port)
node_ssh = topology.get_openstack_node(hostname=hostname).ssh_client
output = sh.execute(cmd, ssh_client=node_ssh, sudo=True).stdout
for soc_details in output.splitlines():
try:
_, _, _, con_tuple, _, process_info = soc_details.split()
addr = netaddr.IPAddress(con_tuple.split(':')[0])
proc = process_info.split('"')[1]
except (ValueError, AttributeError, IndexError) as ex:
msg = 'Fail getting socket infornation from "{}"'.format(
soc_details)
LOG.error(msg)
raise ParsingError(message=msg) from ex
LOG.debug('Parsed "{}" ip address and "{}" process name from "{}"'.
format(addr, proc, soc_details))
socs.append({'addr': addr, 'process': proc})
return socs
def ovn_dbs_are_synchronized(test_case): def ovn_dbs_are_synchronized(test_case):
"""Check that OVN DBs are syncronized across all controller nodes""" """Check that OVN DBs are syncronized across all controller nodes"""
db_sync_status = get_ovn_db_sync_status() db_sync_status = get_ovn_db_sync_status()
@ -437,10 +413,11 @@ def test_raft_cluster():
test_case.assertTrue(leader_found) test_case.assertTrue(leader_found)
for node in topology.list_openstack_nodes(group='controller'): for node in topology.list_openstack_nodes(group='controller'):
node_ips = ip.list_ip_addresses(ssh_client=node.ssh_client) node_ips = ip.list_ip_addresses(ssh_client=node.ssh_client)
socs = get_ovn_db_socket_info(node.hostname, cluster_ports[db]) socs = ss.tcp_listening(port=cluster_ports[db],
ssh_client=node.ssh_client)
test_case.assertEqual(1, len(socs)) test_case.assertEqual(1, len(socs))
test_case.assertIn(socs[0]['addr'], node_ips) test_case.assertIn(socs[0]['local_ip'], node_ips)
test_case.assertEqual(socs[0]['process'], 'ovsdb-server') test_case.assertEqual(socs[0]['process'][0], 'ovsdb-server')
def test_ovs_bridges_mac_table_size(): def test_ovs_bridges_mac_table_size():

173
tobiko/shell/ss.py Normal file
View File

@ -0,0 +1,173 @@
# Copyright (c) 2022 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import netaddr
from oslo_log import log
import typing # noqa
import tobiko
from tobiko.shell import sh
from tobiko.shell import ssh
class SocketLookupError(tobiko.TobikoException):
message = ('ss command "{cmd}" failed with the error "{err}"')
class SockHeader():
def __init__(self, header_str: str):
self.header_str = header_str
self.header: typing.List[str] = []
self._parse_header()
def _parse_header(self):
if 'Netid' in self.header_str:
self.header.append('protocol')
if 'State' in self.header_str:
self.header.append('state')
if 'Recv-Q' in self.header_str:
self.header.append('recv_q')
if 'Send-Q' in self.header_str:
self.header.append('send_q')
if 'Local Address:Port' in self.header_str:
self.header.append('local')
if 'Peer Address:Port' in self.header_str:
self.header.append('remote')
if 'Process' in self.header_str:
self.header.append('process')
def __len__(self):
return len(self.header)
def __iter__(self):
for elem in self.header:
yield elem
class SockLine(str):
pass
class SockData(dict):
pass
LOG = log.getLogger(__name__)
def _ss(params: str = '',
ssh_client: ssh.SSHClientFixture = None,
parser: typing.Callable[[SockHeader, SockLine], SockData] = None,
**execute_params) -> typing.List[SockData]:
execute_params.update({'sudo': True})
sockets = []
command_line = "ss -np {}".format(params)
try:
stdout = sh.execute(command_line,
ssh_client=ssh_client,
**execute_params).stdout
except sh.ShellCommandFailed as ex:
if ex.stdout.startswith('Error'):
raise SocketLookupError(cmd=command_line, err=ex.stderr) from ex
if ex.exit_status > 0:
raise
parsed_header = False
for line in stdout.splitlines():
if not parsed_header:
headers = SockHeader(line)
parsed_header = True
continue
sock_info = SockLine(line.strip())
if parser:
try:
sockets.append(parser(headers, sock_info))
except ValueError as ex:
LOG.error(str(ex))
continue
else:
sockets.append(SockData({'raw_data': sock_info}))
return sockets
def get_processes(processes: str) -> typing.List[str]:
"""Parse processes names from ss output
The simpliest example of the proccesses suffix in ss output:
users:(("httpd",pid=735448,fd=11))
But it can be a bit more complex
users:(("httpd",pid=4969,fd=53),("httpd",pid=3328,fd=53))
Function return the list of all processes names ['httpd', 'httpd']
"""
stack = []
process_list = []
nested = False
for idx, symbol in enumerate(processes):
if symbol == '(':
stack.append(idx)
nested = True
elif symbol == ')' and len(stack) == 1:
process_list.extend(get_processes(processes[stack[0]+1:idx]))
elif symbol == ')':
stack.pop()
if not nested:
process_list.append(processes.split('"', 2)[1])
return process_list
def parse_tcp_socket(headers: SockHeader,
sock_info: SockLine) -> SockData:
socket_details = SockData()
sock_data = sock_info.split()
if len(headers) != len(sock_data):
msg = 'Unable to parse line: "{}"'.format(sock_info)
raise ValueError(msg)
for idx, header in enumerate(headers):
if not header:
continue
if header == 'local' or header == 'remote':
ip, port = sock_data[idx].strip().rsplit(':', 1)
if ip == '*':
ip = '0'
socket_details['{}_ip'.format(header)] = netaddr.IPAddress(
ip.strip(']['))
socket_details['{}_port'.format(header)] = port
elif header == 'process':
try:
socket_details[header] = get_processes(sock_data[idx])
except IndexError as ex:
msg = 'Unable to parse processes part of the line: {}'.format(
sock_info)
raise ValueError(msg) from ex
else:
socket_details[header] = sock_data[idx]
return socket_details
def tcp_listening(address: str = '',
port: str = '',
**exec_params) -> typing.List[SockData]:
params = '-t state listening'
if port:
params += ' sport {}'.format(port)
if address:
params += ' src {}'.format(address)
return _ss(params=params, parser=parse_tcp_socket, **exec_params)