Add paramiko SSH client integration

Add the ability of executing commands on remote hosts
using paramiko SSH client.

- Implement fixture to load ~/.ssh/config file to lookup
  for local configuration before connecting to node.
  Supported options are:
  * User
  * Hostname
  * Port
  * IdentityFile
  * ProxyJump
  * ProxyCommand

- Iplement fixture to connect a paramiko.SSHClient to
  remote host using parameters from ~/.ssh/confing and
  tobiko.conf

- Implement remote execution function to implement same
  interface than the local one.

Change-Id: I767f7515c1418cff1f42a662bd4dac8d9e7abae9
This commit is contained in:
Federico Ressi 2019-05-14 16:32:57 +02:00
parent ede95b9b42
commit 7d0f8e92f4
9 changed files with 687 additions and 22 deletions

View File

@ -27,8 +27,9 @@ LOG = log.getLogger(__name__)
CONFIG_MODULES = ['tobiko.openstack.keystone.config',
'tobiko.openstack.neutron.config',
'tobiko.openstack.nova.config',
'tobiko.shell.sh.config',
'tobiko.shell.ping.config']
'tobiko.shell.paramiko.config',
'tobiko.shell.ping.config',
'tobiko.shell.sh.config']
CONFIG_DIRS = [os.getcwd(),
os.path.expanduser("~/.tobiko"),

View File

@ -0,0 +1,25 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from tobiko.shell.paramiko import _config
from tobiko.shell.paramiko import _client
SSHHostConfig = _config.SSHHostConfig
SSHClientFixture = _client.SSHClientFixture
ssh_client = _client.ssh_client
ssh_proxy_client = _client.ssh_proxy_client

View File

@ -0,0 +1,276 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import socket
import time
import paramiko
from oslo_log import log
import tobiko
from tobiko.shell.paramiko import _config
LOG = log.getLogger(__name__)
SSH_CONNECT_PARAMETERS = [
'hostname', 'port', 'username', 'password', 'pkey', 'key_filename',
'timeout', 'allow_agent', 'look_for_keys', 'compress', 'banner_timeout',
'auth_timeout', 'passphrase']
class SSHClientFixture(tobiko.SharedFixture):
host = None
username = None
port = 22
client = None
paramiko_conf = tobiko.required_setup_fixture(
_config.SSHParamikoConfFixture)
ssh_config = tobiko.required_setup_fixture(_config.SSHConfigFixture)
host_config = None
proxy_client = None
proxy_command = None
proxy_sock = None
def __init__(self, host=None, proxy_client=None, **connect_parameters):
super(SSHClientFixture, self).__init__()
if host:
self.host = host
if proxy_client:
self.proxy_client = proxy_client
invalid_parameters = sorted([
name
for name in connect_parameters
if name not in SSH_CONNECT_PARAMETERS])
if invalid_parameters:
message = "Invalid SSH connection parameters: {!s}".format(
', '.join(invalid_parameters))
raise ValueError(message)
self.connect_parameters = connect_parameters
def setup_fixture(self):
self.setup_host_config()
self.setup_connect_parameters()
self.setup_proxy_sock()
self.setup_ssh_client()
def setup_host_config(self):
host = self.host
if not host:
message = 'Invalid host: {!r}'.format(host)
raise ValueError(message)
self.host_config = self.ssh_config.lookup(host)
def setup_connect_parameters(self):
# Import missing parameters from below objects:
# - self.host_config
# - self
missing_parameters = [name
for name in SSH_CONNECT_PARAMETERS
if self.connect_parameters.get(name) is None]
for obj in [self.host_config, self]:
parameters = {}
for name in missing_parameters:
value = getattr(obj, name, None)
if value is not None:
parameters[name] = value
if parameters:
LOG.debug("Got connect parameters for host %r from object %r: "
" %r", self.host, obj, parameters)
self.connect_parameters.update(parameters)
missing_parameters = [name
for name in missing_parameters
if name not in parameters]
hostname = self.connect_parameters.get('hostname')
if not hostname:
message = "Invalid hostname: {!r}".format(hostname)
raise ValueError(message)
key_filename = self.connect_parameters.get('key_filename')
if key_filename:
key_filename = os.path.expanduser(key_filename)
if not os.path.exists(key_filename):
message = "key_filename {!r} doesn't exist".format(hostname)
raise ValueError(message)
self.connect_parameters['key_filename'] = key_filename
def setup_proxy_sock(self):
proxy_command = (self.host_config.proxy_command or self.proxy_command)
proxy_client = self.proxy_client
if proxy_client:
proxy_command = proxy_command or 'nc {hostname!r} {port!r}'
elif not proxy_command:
return
parameters = self.connect_parameters
proxy_command = proxy_command.format(
hostname=parameters['hostname'],
port=parameters.get('port', 22))
if proxy_client:
if isinstance(proxy_client, SSHClientFixture):
# Connect to proxy server
proxy_client = tobiko.setup_fixture(proxy_client).client
# Open proxy channel
LOG.debug("Execute proxy command on proxy host: %r", proxy_command)
self.proxy_sock = proxy_client.get_transport().open_session()
self.addCleanup(self.cleanup_proxy_sock)
self.proxy_sock.exec_command(proxy_command)
else:
LOG.debug("Execute proxy command on local host: %r", proxy_command)
self.proxy_sock = paramiko.ProxyCommand(proxy_command)
self.addCleanup(self.cleanup_proxy_sock)
def setup_ssh_client(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.load_system_host_keys()
now = time.time()
timeout = now + self.connect_timeout
sleep_time = self.connect_sleep_time
while True:
message = "time left {!s} seconds".format(timeout - now)
try:
self._connect_client(client, message=message)
break
except (EOFError, socket.error, socket.timeout,
paramiko.SSHException):
now = time.time()
if now + sleep_time >= timeout:
raise
LOG.debug('Retry to connect to %r in %d seconds',
self.connect_login, sleep_time)
time.sleep(sleep_time)
now = time.time()
if now >= timeout:
raise
sleep_time += self.connect_sleep_time_increment
def _connect_client(self, client, message=None):
"""Returns an ssh connection to the specified host."""
parameters = self.connect_parameters
extra_info = ''
if message:
extra_info = ' (' + message + ')'
LOG.info("Creating SSH connection to %r%s...", self.connect_login,
extra_info)
try:
client.connect(sock=self.proxy_sock, **parameters)
except Exception as ex:
LOG.debug("Error connecting to %s%s: %s", self.connect_login,
extra_info, ex)
raise
else:
self.client = client
self.addCleanup(self.cleanup_ssh_client, client)
LOG.info("SSH connection to %s successfully created",
self.connect_login)
def cleanup_ssh_client(self):
if self.client:
self.client = None
self.client.close()
def cleanup_proxy_sock(self):
proxy_sock = self.proxy_sock
if proxy_sock:
self.proxy_sock = None
proxy_sock.close()
@property
def connect_timeout(self):
return self.paramiko_conf.connect_timeout
@property
def connect_sleep_time(self):
return self.paramiko_conf.connect_sleep_time
@property
def connect_sleep_time_increment(self):
return self.paramiko_conf.connect_sleep_time_increment
@property
def connect_login(self):
login = self.connect_parameters['hostname']
port = self.connect_parameters.get('port', None)
if port:
login = ':'.join([login, str(port)])
username = self.connect_parameters.get('username', None)
if username:
login = "@".join([username, login])
return login
class SSHClientManager(object):
ssh_config = tobiko.required_setup_fixture(_config.SSHConfigFixture)
paramiko_conf = tobiko.required_setup_fixture(
_config.SSHParamikoConfFixture)
def __init__(self):
self.clients = {}
def get_client(self, host, username=None, port=None, proxy_jump=None):
host_config = self.ssh_config.lookup(host)
hostname = host_config.hostname
port = port or host_config.port
username = username or host_config.username
proxy_jump = proxy_jump or host_config.proxy_jump
host_key = hostname, port, username, proxy_jump
client = self.clients.get(host_key)
if not client:
proxy_client = None
if proxy_jump:
proxy_client = self.get_client(proxy_jump)
self.clients[host_key] = client = SSHClientFixture(
host=host, hostname=hostname, port=port, username=username,
proxy_client=proxy_client)
return client
@property
def proxy_client(self):
proxy_jump = self.paramiko_conf.proxy_jump
if proxy_jump:
return self.get_client(proxy_jump)
else:
return None
CLIENTS = SSHClientManager()
def ssh_client(host, port=None, username=None, proxy_jump=None,
manager=None):
manager = manager or CLIENTS
return manager.get_client(host=host, port=port, username=username,
proxy_jump=proxy_jump)
def ssh_proxy_client(manager=None):
manager = manager or CLIENTS
return manager.proxy_client

View File

@ -0,0 +1,129 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import collections
import os
from oslo_log import log
import paramiko
import tobiko
LOG = log.getLogger(__name__)
class SSHParamikoConfFixture(tobiko.SharedFixture):
conf = None
def setup_fixture(self):
from tobiko import config
CONF = config.CONF
self.conf = CONF.tobiko.paramiko
def __getattr__(self, name):
return getattr(self.conf, name)
class SSHConfigFixture(tobiko.SharedFixture):
paramiko_conf = tobiko.required_setup_fixture(SSHParamikoConfFixture)
_config_file = None
config = None
def __init__(self, config_file=None):
super(SSHConfigFixture, self).__init__()
if config_file:
self._config_file = config_file
def setup_fixture(self):
self.setup_ssh_config()
def setup_ssh_config(self):
self.config = paramiko.SSHConfig()
config_file = self.config_file
if config_file:
LOG.debug("Loading %r config file...", config_file)
config_file = os.path.expanduser(config_file)
if os.path.exists(config_file):
with open(config_file) as f:
self.config.parse(f)
LOG.debug("File %r parsed.", config_file)
@property
def config_file(self):
return self._config_file or self.paramiko_conf.config_file
def lookup(self, host):
host_config = SSHHostConfig(host=host,
ssh_config=self,
host_config=self.config.lookup(host))
LOG.debug('Lookup SSH config for for host %r:\n%r', host, host_config)
return host_config
def __repr__(self):
return "{class_name!s}(config_file={config_file!r})".format(
class_name=type(self).__name__, config_file=self.config_file)
class SSHHostConfig(collections.namedtuple('SSHHostConfig', ['host',
'ssh_config',
'host_config'])):
paramiko_conf = tobiko.required_setup_fixture(SSHParamikoConfFixture)
@property
def username(self):
return self.host_config.get('user')
@property
def hostname(self):
return self.host_config.get('hostname', self.host)
@property
def key_filename(self):
return self.host_config.get('identityfile',
self.paramiko_conf.key_file)
@property
def proxy_jump(self):
proxy_jump = (self.host_config.get('proxyjump') or
self.paramiko_conf.proxy_jump)
if not proxy_jump:
return None
proxy_hostname = self.ssh_config.lookup(proxy_jump).hostname
if ({proxy_jump, proxy_hostname} & {self.host, self.hostname}):
# Avoid recursive proxy jump definition
return None
return proxy_jump
@property
def proxy_command(self):
return self.host_config.get('proxycommand',
self.paramiko_conf.proxy_command)
def __getattr__(self, name):
try:
return self.host_config[name]
except KeyError:
pass
message = "{!r} object has no attribute {!r}".format(self, name)
raise AttributeError(message)

View File

@ -0,0 +1,45 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from oslo_config import cfg
def register_tobiko_options(conf):
conf.register_opts(
group=cfg.OptGroup('paramiko'),
opts=[cfg.StrOpt('config_file',
default='~/.ssh/config',
help="Default user SSH configuration file"),
cfg.StrOpt('key_file',
default='~/.ssh/id_rsa',
help="Default SSH private key file"),
cfg.IntOpt('connect_timeout',
default=120,
help="SSH connect timeout in seconds"),
cfg.IntOpt('connect_sleep_time',
default=1,
help=("Seconds to wait after every failed SSH "
"connection attempt")),
cfg.IntOpt('connect_sleep_time_increment',
default=1,
help=("Incremental seconds to wait after every "
"failed SSH connection attempt")),
cfg.StrOpt('proxy_jump',
default=None,
help="Default SSH proxy server"),
cfg.StrOpt('proxy_command',
default=None,
help="Default proxy command"),
])

View File

@ -16,19 +16,24 @@
from __future__ import absolute_import
import collections
import select
import subprocess
import sys
import time
from oslo_log import log
import six
import tobiko
from tobiko.shell import paramiko
from tobiko.shell.sh import _exception
LOG = log.getLogger(__name__)
def execute(command, timeout=None, shell=None, check=True, ssh_client=None):
def execute(command, stdin=None, environment=None, timeout=None, shell=None,
check=True, ssh_client=None):
"""Execute command inside a remote or local shell
:param command: command argument list
@ -51,15 +56,25 @@ def execute(command, timeout=None, shell=None, check=True, ssh_client=None):
:raises ShellCommandError: when command execution terminates with non-zero
exit status.
"""
if timeout:
timeout = float(timeout)
if ssh_client:
result = execute_remote_command(command=command, timeout=timeout,
shell=shell, ssh_client=ssh_client)
if isinstance(command, six.string_types):
command = command.split()
else:
result = execute_local_command(command=command, timeout=timeout,
shell=shell)
command = [str(a) for a in command]
ssh_client = ssh_client or paramiko.ssh_proxy_client()
if ssh_client:
result = execute_remote_command(command=command, stdin=stdin,
environment=environment,
timeout=timeout, shell=shell,
ssh_client=ssh_client)
else:
result = execute_local_command(command=command, stdin=stdin,
environment=environment,
timeout=timeout, shell=shell)
if result.exit_status == 0:
LOG.debug("Command %r succeeded:\n"
@ -82,31 +97,126 @@ def execute(command, timeout=None, shell=None, check=True, ssh_client=None):
return result
def execute_remote_command(command, ssh_client, timeout=None, shell=None):
def execute_remote_command(command, ssh_client, stdin=None, timeout=None,
shell=None, environment=None):
"""Execute command on a remote host using SSH client"""
raise NotImplementedError
if shell:
command = shell.split() + [str(subprocess.list2cmdline(command))]
if isinstance(ssh_client, paramiko.SSHClientFixture):
# Connect to fixture
ssh_client = tobiko.setup_fixture(ssh_client).client
transport = ssh_client.get_transport()
with transport.open_session() as channel:
if environment:
channel.update_environment(environment)
channel.exec_command(subprocess.list2cmdline(command))
stdout, stderr = comunicate_ssh_channel(channel, stdin=stdin,
timeout=timeout)
if channel.exit_status_ready():
exit_status = channel.recv_exit_status()
else:
exit_status = None
return ShellExecuteResult(command=command, timeout=timeout,
stdout=stdout, stderr=stderr,
exit_status=exit_status)
def execute_local_command(command, timeout=None, shell=None):
def comunicate_ssh_channel(ssh_channel, stdin=None, chunk_size=None,
timeout=None, sleep_time=None, read_stdout=True,
read_stderr=True):
if read_stdout:
rlist = [ssh_channel]
else:
rlist = []
if not stdin:
ssh_channel.shutdown_write()
stdin = None
wlist = []
else:
wlist = [ssh_channel]
if not isinstance(stdin, six.binary_type):
stdin = stdin.encode()
chunk_size = chunk_size or 1024
sleep_time = sleep_time or 1.
timeout = timeout or float("inf")
start = time.time()
stdout = None
stderr = None
while True:
chunk_timeout = min(sleep_time, timeout - (time.time() - start))
if chunk_timeout < 0.:
LOG.debug('Timed out reading from SSH channel: %r', ssh_channel)
break
ssh_channel.settimeout(chunk_timeout)
if read_stdout and ssh_channel.recv_ready():
chunk = ssh_channel.recv(chunk_size)
if stdout:
stdout += chunk
else:
stdout = chunk
if not chunk:
LOG.debug("STDOUT channel closed by peer on SSH channel %r",
ssh_channel)
read_stdout = False
elif read_stderr and ssh_channel.recv_stderr_ready():
chunk = ssh_channel.recv_stderr(chunk_size)
if stderr:
stderr += chunk
else:
stderr = chunk
if not chunk:
LOG.debug("STDERR channel closed by peer on SSH channel %r",
ssh_channel)
read_stderr = False
elif ssh_channel.exit_status_ready():
break
elif stdin and ssh_channel.send_ready():
sent_bytes = ssh_channel.send(stdin[:chunk_size])
stdin = stdin[sent_bytes:] or None
if not stdin:
LOG.debug('shutdown_write() on SSH channel: %r', ssh_channel)
ssh_channel.shutdown_write()
else:
select.select(rlist, wlist, rlist or wlist, chunk_timeout)
if stdout:
if not isinstance(stdout, six.string_types):
stdout = stdout.decode()
else:
stdout = ''
if stderr:
if not isinstance(stderr, six.string_types):
stderr = stderr.decode()
else:
stderr = ''
return stdout, stderr
def execute_local_command(command, stdin=None, environment=None, timeout=None,
shell=None):
"""Execute command on local host using local shell"""
LOG.debug("Executing command %r on local host (timeout=%r)...",
command, timeout)
stdin = stdin or None
if not shell:
from tobiko import config
CONF = config.CONF
shell = CONF.tobiko.shell.command
if isinstance(command, six.string_types):
command = command.split()
else:
command = [str(a) for a in command]
if shell:
command = shell.split() + [str(subprocess.list2cmdline(command))]
process = subprocess.Popen(command,
universal_newlines=True,
env=environment,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@ -118,14 +228,15 @@ def execute_local_command(command, timeout=None, shell=None):
# Wait for process execution while reading STDERR and STDOUT streams
if timeout:
try:
stdout, stderr = process.communicate(timeout=timeout)
stdout, stderr = process.communicate(input=stdin,
timeout=timeout)
except subprocess.TimeoutExpired:
# At this state I expect the process to be still running
# therefore it has to be kill later after calling poll()
LOG.exception("Command %r timeout expired.", command)
stdout = stderr = None
stdout = stderr = ''
else:
stdout, stderr = process.communicate()
stdout, stderr = process.communicate(input=stdin)
# Check process termination status
exit_status = process.poll()

View File

@ -61,6 +61,15 @@ class ExecuteTest(testtools.TestCase):
timeout=None, exit_status=0, stdout='', stderr='something\n'),
result)
def test_execute_reading_from_stdin(self):
result = sh.execute('cat', shell='/bin/sh -c', stdin='some input\n')
self.assertEqual(
sh.ShellExecuteResult(
command=['/bin/sh', '-c', 'cat'],
timeout=None, exit_status=0, stdout='some input\n',
stderr=''),
result)
def test_execute_failing_command(self):
ex = self.assertRaises(sh.ShellCommandFailed, sh.execute, 'exit 15',
shell='/bin/sh -c')
@ -88,18 +97,19 @@ class ExecuteTest(testtools.TestCase):
self.assertEqual(['/bin/sh', '-c', 'echo something >&2; exit 7'],
ex.command)
@unittest.skipIf(sys.version_info < (3, 3),
'timeout not implemented for Python version < 3.3')
def test_execute_with_timeout(self):
result = sh.execute('true', timeout=30., shell='/bin/sh -c')
expected_timeout = None if sys.version_info < (3, 3) else 30.
self.assertEqual(
sh.ShellExecuteResult(
command=['/bin/sh', '-c', 'true'],
timeout=expected_timeout, exit_status=0, stdout='',
timeout=30, exit_status=0, stdout='',
stderr=''),
result)
@unittest.skipIf(sys.version_info < (3, 3),
'not implemented for Python version < 3.3')
'timeout not implemented for Python version < 3.3')
def test_execute_with_timeout_expired(self):
ex = self.assertRaises(sh.ShellTimeoutExpired, sh.execute,
'echo out; echo err >&2; sleep 30',

View File

View File

@ -0,0 +1,68 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import mock
import paramiko
import tobiko
from tobiko import config
from tobiko.shell import paramiko as ssh
from tobiko.tests import unit
CONF = config.CONF
class SSHClientFixtureTest(unit.TobikoUnitTest):
fixture = tobiko.required_fixture(ssh.SSHClientFixture)
expected_host = None
expected_proxy_client = None
def setUp(self):
super(SSHClientFixtureTest, self).setUp()
tobiko.cleanup_fixture(self.fixture)
self.ssh_client = mock.MagicMock(specs=paramiko.SSHClient)
self.patch('paramiko.SSHClient', return_value=self.ssh_client)
def test_init(self):
fixture = self.fixture
self.assertIs(self.expected_host, fixture.host)
self.assertIs(self.expected_proxy_client, fixture.proxy_client)
self.assertEqual({}, fixture.connect_parameters)
self.assertIsNone(fixture.host_config)
def test_setup(self):
fixture = self.fixture
if not self.expected_host:
fixture.host = 'some-host'
fixture.username = 'some-username'
fixture.setUp()
ssh_config_file = os.path.expanduser(CONF.tobiko.paramiko.config_file)
if os.path.exists(ssh_config_file):
with open(ssh_config_file) as f:
ssh_config = paramiko.SSHConfig()
ssh_config.parse(f)
expected_host_config = ssh_config.lookup(fixture.host)
else:
expected_host_config = {'hostname': fixture.host}
self.assertEqual(fixture.host, fixture.host_config.host)
self.assertEqual(expected_host_config, fixture.host_config.host_config)