fuel-devops/devops/tests/helpers/test_ssh_client.py

1491 lines
47 KiB
Python

# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# pylint: disable=no-self-use
from contextlib import closing
from os.path import basename
import posixpath
import stat
from unittest import TestCase
import mock
import paramiko
# noinspection PyUnresolvedReferences
from six.moves import cStringIO
from six import PY2
from devops.error import DevopsCalledProcessError
from devops.helpers.ssh_client import SSHAuth
from devops.helpers.ssh_client import SSHClient
def gen_private_keys(amount=1):
keys = []
for _ in range(amount):
keys.append(paramiko.RSAKey.generate(1024))
return keys
def gen_public_key(private_key=None):
if private_key is None:
private_key = paramiko.RSAKey.generate(1024)
return '{0} {1}'.format(private_key.get_name(), private_key.get_base64())
host = '127.0.0.1'
port = 22
username = 'user'
password = 'pass'
private_keys = []
command = 'ls ~ '
class TestSSHAuth(TestCase):
def init_checks(self, username=None, password=None, key=None, keys=None):
"""shared positive init checks
:type username: str
:type password: str
:type key: paramiko.RSAKey
:type keys: list
"""
auth = SSHAuth(
username=username,
password=password,
key=key,
keys=keys
)
int_keys = [None]
if key is not None:
int_keys.append(key)
if keys is not None:
for k in keys:
if k not in int_keys:
int_keys.append(k)
self.assertEqual(auth.username, username)
with closing(cStringIO()) as tgt:
auth.enter_password(tgt)
self.assertEqual(tgt.getvalue(), '{}\n'.format(password))
self.assertEqual(
auth.public_key,
gen_public_key(key) if key is not None else None)
_key = (
None if auth.public_key is None else
'<private for pub: {}>'.format(auth.public_key)
)
_keys = []
for k in int_keys:
if k == key:
continue
_keys.append(
'<private for pub: {}>'.format(
gen_public_key(k)) if k is not None else None)
self.assertEqual(
repr(auth),
"{cls}("
"username={username}, "
"password=<*masked*>, "
"key={key}, "
"keys={keys})".format(
cls=SSHAuth.__name__,
username=auth.username,
key=_key,
keys=_keys
)
)
self.assertEqual(
str(auth),
'{cls} for {username}'.format(
cls=SSHAuth.__name__,
username=auth.username,
)
)
def test_init_username_only(self):
self.init_checks(
username=username
)
def test_init_username_password(self):
self.init_checks(
username=username,
password=password
)
def test_init_username_key(self):
self.init_checks(
username=username,
key=gen_private_keys(1).pop()
)
def test_init_username_password_key(self):
self.init_checks(
username=username,
password=password,
key=gen_private_keys(1).pop()
)
def test_init_username_password_keys(self):
self.init_checks(
username=username,
password=password,
keys=gen_private_keys(2)
)
def test_init_username_password_key_keys(self):
self.init_checks(
username=username,
password=password,
key=gen_private_keys(1).pop(),
keys=gen_private_keys(2)
)
@mock.patch('devops.helpers.retry.sleep', autospec=True)
@mock.patch('devops.helpers.ssh_client.logger', autospec=True)
@mock.patch(
'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy')
@mock.patch('paramiko.SSHClient', autospec=True)
class TestSSHClientInit(TestCase):
def init_checks(
self,
client, policy, logger,
host=None, port=22,
username=None, password=None, private_keys=None,
auth=None
):
"""shared checks for positive cases
:type client: mock.Mock
:type policy: mock.Mock
:type logger: mock.Mock
:type host: str
:type port: int
:type username: str
:type password: str
:type private_keys: list
:type auth: SSHAuth
"""
_ssh = mock.call()
ssh = SSHClient(
host=host,
port=port,
username=username,
password=password,
private_keys=private_keys,
auth=auth
)
client.assert_called_once()
policy.assert_called_once()
if auth is None:
if private_keys is None or len(private_keys) == 0:
logger.assert_has_calls((
mock.call.debug(
'SSHClient('
'host={host}, port={port}, username={username}): '
'initialization by username/password/private_keys '
'is deprecated in favor of SSHAuth usage. '
'Please update your code'.format(
host=host, port=port, username=username
)),
mock.call.info(
'{0}:{1}> SSHAuth was made from old style creds: '
'SSHAuth for {2}'.format(host, port, username))
))
else:
logger.assert_has_calls((
mock.call.debug(
'SSHClient('
'host={host}, port={port}, username={username}): '
'initialization by username/password/private_keys '
'is deprecated in favor of SSHAuth usage. '
'Please update your code'.format(
host=host, port=port, username=username
)),
mock.call.debug(
'Main key has been updated, public key is: \n'
'{}'.format(ssh.auth.public_key)),
mock.call.info(
'{0}:{1}> SSHAuth was made from old style creds: '
'SSHAuth for {2}'.format(host, port, username))
))
else:
logger.assert_not_called()
if auth is None:
if private_keys is None or len(private_keys) == 0:
pkey = None
expected_calls = [
_ssh,
_ssh.set_missing_host_key_policy('AutoAddPolicy'),
_ssh.connect(
hostname=host, password=password,
pkey=pkey,
port=port, username=username),
]
else:
pkey = private_keys[0]
expected_calls = [
_ssh,
_ssh.set_missing_host_key_policy('AutoAddPolicy'),
_ssh.connect(
hostname=host, password=password,
pkey=None,
port=port, username=username),
_ssh.connect(
hostname=host, password=password,
pkey=pkey,
port=port, username=username),
]
self.assertIn(expected_calls, client.mock_calls)
self.assertEqual(
ssh.auth,
SSHAuth(
username=username,
password=password,
keys=private_keys
)
)
else:
self.assertEqual(ssh.auth, auth)
sftp = ssh._sftp
self.assertEqual(sftp, client().open_sftp())
self.assertEqual(ssh._ssh, client())
self.assertEqual(ssh.hostname, host)
self.assertEqual(ssh.port, port)
self.assertEqual(
repr(ssh),
'{cls}(host={host}, port={port}, auth={auth!r})'.format(
cls=ssh.__class__.__name__, host=ssh.hostname,
port=ssh.port,
auth=ssh.auth
)
)
def test_init_host(self, client, policy, logger, sleep):
"""Test with host only set"""
self.init_checks(
client, policy, logger,
host=host)
def test_init_alternate_port(self, client, policy, logger, sleep):
"""Test with alternate port"""
self.init_checks(
client, policy, logger,
host=host,
port=2222
)
def test_init_username(self, client, policy, logger, sleep):
"""Test with username only set from creds"""
self.init_checks(
client, policy, logger,
host=host,
username=username
)
def test_init_username_password(self, client, policy, logger, sleep):
"""Test with username and password set from creds"""
self.init_checks(
client, policy, logger,
host=host,
username=username,
password=password
)
def test_init_username_password_empty_keys(
self, client, policy, logger, sleep):
"""Test with username, password and empty keys set from creds"""
self.init_checks(
client, policy, logger,
host=host,
username=username,
password=password,
private_keys=[]
)
def test_init_username_single_key(self, client, policy, logger, sleep):
"""Test with username and single key set from creds"""
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException, mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
self.init_checks(
client, policy, logger,
host=host,
username=username,
private_keys=gen_private_keys(1)
)
def test_init_username_password_single_key(
self, client, policy, logger, sleep):
"""Test with username, password and single key set from creds"""
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException, mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
self.init_checks(
client, policy, logger,
host=host,
username=username,
password=password,
private_keys=gen_private_keys(1)
)
def test_init_username_multiple_keys(self, client, policy, logger, sleep):
"""Test with username and multiple keys set from creds"""
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException, mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
self.init_checks(
client, policy, logger,
host=host,
username=username,
private_keys=gen_private_keys(2)
)
def test_init_username_password_multiple_keys(
self, client, policy, logger, sleep):
"""Test with username, password and multiple keys set from creds"""
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException, mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException, mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
self.init_checks(
client, policy, logger,
host=host,
username=username,
password=password,
private_keys=gen_private_keys(2)
)
def test_init_auth(
self, client, policy, logger, sleep):
self.init_checks(
client, policy, logger,
host=host,
auth=SSHAuth(
username=username,
password=password,
key=gen_private_keys(1).pop()
)
)
def test_init_auth_break(
self, client, policy, logger, sleep):
self.init_checks(
client, policy, logger,
host=host,
username='Invalid',
password='Invalid',
private_keys=gen_private_keys(1),
auth=SSHAuth(
username=username,
password=password,
key=gen_private_keys(1).pop()
)
)
def test_init_context(
self, client, policy, logger, sleep):
with SSHClient(host=host, auth=SSHAuth()) as ssh:
client.assert_called_once()
policy.assert_called_once()
logger.assert_not_called()
self.assertEqual(ssh.auth, SSHAuth())
sftp = ssh._sftp
self.assertEqual(sftp, client().open_sftp())
self.assertEqual(ssh._ssh, client())
self.assertEqual(ssh.hostname, host)
self.assertEqual(ssh.port, port)
def test_init_clear_failed(
self, client, policy, logger, sleep):
"""Test reconnect
:type client: mock.Mock
:type policy: mock.Mock
:type logger: mock.Mock
"""
_ssh = mock.Mock()
_ssh.attach_mock(
mock.Mock(
side_effect=[
Exception('Mocked SSH close()'),
mock.Mock()
]),
'close')
_sftp = mock.Mock()
_sftp.attach_mock(
mock.Mock(
side_effect=[
Exception('Mocked SFTP close()'),
mock.Mock()
]),
'close')
client.return_value = _ssh
_ssh.attach_mock(mock.Mock(return_value=_sftp), 'open_sftp')
ssh = SSHClient(host=host, auth=SSHAuth())
client.assert_called_once()
policy.assert_called_once()
logger.assert_not_called()
self.assertEqual(ssh.auth, SSHAuth())
sftp = ssh._sftp
self.assertEqual(sftp, _sftp)
self.assertEqual(ssh._ssh, _ssh)
self.assertEqual(ssh.hostname, host)
self.assertEqual(ssh.port, port)
logger.reset_mock()
ssh.clear()
logger.assert_has_calls((
mock.call.exception('Could not close ssh connection'),
mock.call.exception('Could not close sftp connection'),
))
def test_init_reconnect(
self, client, policy, logger, sleep):
"""Test reconnect
:type client: mock.Mock
:type policy: mock.Mock
:type logger: mock.Mock
"""
ssh = SSHClient(host=host, auth=SSHAuth())
client.assert_called_once()
policy.assert_called_once()
logger.assert_not_called()
self.assertEqual(ssh.auth, SSHAuth())
sftp = ssh._sftp
self.assertEqual(sftp, client().open_sftp())
self.assertEqual(ssh._ssh, client())
client.reset_mock()
policy.reset_mock()
self.assertEqual(ssh.hostname, host)
self.assertEqual(ssh.port, port)
ssh.reconnect()
_ssh = mock.call()
expected_calls = [
_ssh.close(),
_ssh,
_ssh.set_missing_host_key_policy('AutoAddPolicy'),
_ssh.connect(
hostname='127.0.0.1',
password=None,
pkey=None,
port=22,
username=None),
]
self.assertIn(
expected_calls,
client.mock_calls
)
client.assert_called_once()
policy.assert_called_once()
logger.assert_not_called()
self.assertEqual(ssh.auth, SSHAuth())
sftp = ssh._sftp
self.assertEqual(sftp, client().open_sftp())
self.assertEqual(ssh._ssh, client())
def test_init_password_required(
self, client, policy, logger, sleep):
connect = mock.Mock(side_effect=paramiko.PasswordRequiredException)
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
with self.assertRaises(paramiko.PasswordRequiredException):
SSHClient(host=host, auth=SSHAuth())
logger.assert_has_calls((
mock.call.exception('No password has been set!'),
))
def test_init_password_broken(
self, client, policy, logger, sleep):
connect = mock.Mock(side_effect=paramiko.PasswordRequiredException)
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
with self.assertRaises(paramiko.PasswordRequiredException):
SSHClient(host=host, auth=SSHAuth(password=password))
logger.assert_has_calls((
mock.call.critical(
'Unexpected PasswordRequiredException, '
'when password is set!'
),
))
def test_init_auth_impossible_password(
self, client, policy, logger, sleep):
connect = mock.Mock(side_effect=paramiko.AuthenticationException)
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
with self.assertRaises(paramiko.AuthenticationException):
SSHClient(host=host, auth=SSHAuth(password=password))
logger.assert_has_calls(
(
mock.call.exception(
'Connection using stored authentication info failed!'),
) * 3
)
def test_init_auth_impossible_key(
self, client, policy, logger, sleep):
connect = mock.Mock(side_effect=paramiko.AuthenticationException)
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
with self.assertRaises(paramiko.AuthenticationException):
SSHClient(
host=host,
auth=SSHAuth(key=gen_private_keys(1).pop())
)
logger.assert_has_calls(
(
mock.call.exception(
'Connection using stored authentication info failed!'),
) * 3
)
def test_init_auth_pass_no_key(
self, client, policy, logger, sleep):
connect = mock.Mock(
side_effect=[
paramiko.AuthenticationException,
mock.Mock()
])
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
key = gen_private_keys(1).pop()
ssh = SSHClient(
host=host,
auth=SSHAuth(
username=username,
password=password,
key=key
)
)
client.assert_called_once()
policy.assert_called_once()
logger.assert_has_calls((
mock.call.debug(
'Main key has been updated, public key is: \nNone'),
))
self.assertEqual(
ssh.auth,
SSHAuth(
username=username,
password=password,
keys=[key]
)
)
sftp = ssh._sftp
self.assertEqual(sftp, client().open_sftp())
self.assertEqual(ssh._ssh, client())
def test_init_auth_brute_impossible(
self, client, policy, logger, sleep):
connect = mock.Mock(side_effect=paramiko.AuthenticationException)
_ssh = mock.Mock()
_ssh.attach_mock(connect, 'connect')
client.return_value = _ssh
with self.assertRaises(paramiko.AuthenticationException):
SSHClient(
host=host,
username=username,
private_keys=gen_private_keys(2))
logger.assert_has_calls(
(
mock.call.debug(
'SSHClient('
'host={host}, port={port}, username={username}): '
'initialization by username/password/private_keys '
'is deprecated in favor of SSHAuth usage. '
'Please update your code'.format(
host=host, port=port, username=username
)),
) + (
mock.call.exception(
'Connection using stored authentication info failed!'),
) * 3
)
def test_init_no_sftp(
self, client, policy, logger, sleep):
open_sftp = mock.Mock(side_effect=paramiko.SSHException)
_ssh = mock.Mock()
_ssh.attach_mock(open_sftp, 'open_sftp')
client.return_value = _ssh
ssh = SSHClient(host=host, auth=SSHAuth(password=password))
with self.assertRaises(paramiko.SSHException):
# pylint: disable=pointless-statement
# noinspection PyStatementEffect
ssh._sftp
# pylint: enable=pointless-statement
logger.assert_has_calls((
mock.call.debug('SFTP is not connected, try to connect...'),
mock.call.warning(
'SFTP enable failed! SSH only is accessible.'),
))
def test_init_sftp_repair(
self, client, policy, logger, sleep):
_sftp = mock.Mock()
open_sftp = mock.Mock(
side_effect=[
paramiko.SSHException,
_sftp, _sftp])
_ssh = mock.Mock()
_ssh.attach_mock(open_sftp, 'open_sftp')
client.return_value = _ssh
ssh = SSHClient(host=host, auth=SSHAuth(password=password))
with self.assertRaises(paramiko.SSHException):
# pylint: disable=pointless-statement
# noinspection PyStatementEffect
ssh._sftp
# pylint: enable=pointless-statement
logger.reset_mock()
sftp = ssh._sftp
self.assertEqual(sftp, open_sftp())
logger.assert_has_calls((
mock.call.debug('SFTP is not connected, try to connect...'),
))
@mock.patch('devops.helpers.ssh_client.logger', autospec=True)
@mock.patch(
'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy')
@mock.patch('paramiko.SSHClient', autospec=True)
class TestExecute(TestCase):
@staticmethod
def get_ssh():
"""SSHClient object builder for execution tests
:rtype: SSHClient
"""
return SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
def test_execute_async(self, client, policy, logger):
chan = mock.Mock()
open_session = mock.Mock(return_value=chan)
transport = mock.Mock()
transport.attach_mock(open_session, 'open_session')
get_transport = mock.Mock(return_value=transport)
_ssh = mock.Mock()
_ssh.attach_mock(get_transport, 'get_transport')
client.return_value = _ssh
ssh = self.get_ssh()
result = ssh.execute_async(command=command)
get_transport.assert_called_once()
open_session.assert_called_once()
self.assertIn(chan, result)
chan.assert_has_calls((
mock.call.makefile('wb'),
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('{}\n'.format(command))
))
self.assertIn(
mock.call.debug(
"Executing command: '{}'".format(command.rstrip())),
logger.mock_calls
)
def test_execute_async_sudo(self, client, policy, logger):
chan = mock.Mock()
open_session = mock.Mock(return_value=chan)
transport = mock.Mock()
transport.attach_mock(open_session, 'open_session')
get_transport = mock.Mock(return_value=transport)
_ssh = mock.Mock()
_ssh.attach_mock(get_transport, 'get_transport')
client.return_value = _ssh
ssh = self.get_ssh()
ssh.sudo_mode = True
result = ssh.execute_async(command=command)
get_transport.assert_called_once()
open_session.assert_called_once()
self.assertIn(chan, result)
chan.assert_has_calls((
mock.call.makefile('wb'),
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('sudo -S bash -c "{}\n"'.format(command))
))
self.assertIn(
mock.call.debug(
"Executing command: '{}'".format(command.rstrip())),
logger.mock_calls
)
def test_execute_async_with_sudo(self, client, policy, logger):
chan = mock.Mock()
open_session = mock.Mock(return_value=chan)
transport = mock.Mock()
transport.attach_mock(open_session, 'open_session')
get_transport = mock.Mock(return_value=transport)
_ssh = mock.Mock()
_ssh.attach_mock(get_transport, 'get_transport')
client.return_value = _ssh
ssh = self.get_ssh()
self.assertFalse(ssh.sudo_mode)
with SSHClient.get_sudo(ssh):
self.assertTrue(ssh.sudo_mode)
result = ssh.execute_async(command=command)
self.assertFalse(ssh.sudo_mode)
get_transport.assert_called_once()
open_session.assert_called_once()
self.assertIn(chan, result)
chan.assert_has_calls((
mock.call.makefile('wb'),
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('sudo -S bash -c "{}\n"'.format(command))
))
self.assertIn(
mock.call.debug(
"Executing command: '{}'".format(command.rstrip())),
logger.mock_calls
)
@mock.patch('devops.helpers.ssh_client.SSHAuth.enter_password')
def test_execute_async_sudo_password(
self, enter_password, client, policy, logger):
stdin = mock.Mock(name='stdin')
stdout = mock.Mock(name='stdout')
stdout_channel = mock.Mock()
stdout_channel.configure_mock(closed=False)
stdout.attach_mock(stdout_channel, 'channel')
makefile = mock.Mock(side_effect=[stdin, stdout])
chan = mock.Mock()
chan.attach_mock(makefile, 'makefile')
open_session = mock.Mock(return_value=chan)
transport = mock.Mock()
transport.attach_mock(open_session, 'open_session')
get_transport = mock.Mock(return_value=transport)
_ssh = mock.Mock()
_ssh.attach_mock(get_transport, 'get_transport')
client.return_value = _ssh
ssh = self.get_ssh()
ssh.sudo_mode = True
result = ssh.execute_async(command=command)
get_transport.assert_called_once()
open_session.assert_called_once()
# raise ValueError(closed.mock_calls)
enter_password.assert_called_once_with(stdin)
stdin.assert_has_calls((mock.call.flush(), ))
self.assertIn(chan, result)
chan.assert_has_calls((
mock.call.makefile('wb'),
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('sudo -S bash -c "{}\n"'.format(command))
))
self.assertIn(
mock.call.debug(
"Executing command: '{}'".format(command.rstrip())),
logger.mock_calls
)
def get_patched_execute_async_retval(self, ec=0, stderr_val=True):
stderr = mock.Mock()
stdout = mock.Mock()
stderr_readlines = mock.Mock(
return_value=[b' \n', b'0\n', b'1\n', b' \n'] if stderr_val else []
)
stdout_readlines = mock.Mock(
return_value=[b' \n', b'2\n', b'3\n', b' \n'])
stderr.attach_mock(stderr_readlines, 'readlines')
stdout.attach_mock(stdout_readlines, 'readlines')
exit_code = ec
chan = mock.Mock()
recv_exit_status = mock.Mock(return_value=exit_code)
chan.attach_mock(recv_exit_status, 'recv_exit_status')
return chan, '', stderr, stdout
@mock.patch(
'devops.helpers.ssh_client.SSHClient.execute_async')
def test_execute(self, execute_async, client, policy, logger):
chan, _stdin, stderr, stdout = self.get_patched_execute_async_retval()
execute_async.return_value = chan, _stdin, stderr, stdout
stderr_lst = stderr.readlines()
stdout_lst = stdout.readlines()
expected = {
'exit_code': chan.recv_exit_status(),
'stderr': stderr_lst,
'stdout': stdout_lst}
if PY2:
expected['stderr_str'] = b''.join(stderr_lst).strip()
expected['stdout_str'] = b''.join(stdout_lst).strip()
else:
expected['stderr_str'] = b''.join(stderr_lst).strip().decode(
encoding='utf-8')
expected['stdout_str'] = b''.join(stdout_lst).strip().decode(
encoding='utf-8')
ssh = self.get_ssh()
logger.reset_mock()
result = ssh.execute(command=command, verbose=True)
self.assertEqual(
result,
expected
)
execute_async.assert_called_once_with(command)
chan.assert_has_calls((
mock.call.recv_exit_status(),
mock.call.close()))
logger.assert_has_calls((
mock.call.info(
'{cmd} execution results:\n'
'Exit code: {code}\n'
'STDOUT:\n'
'{stdout}\n'
'STDERR:\n'
'{stderr}'.format(
cmd=command,
code=result['exit_code'],
stdout=result['stdout_str'],
stderr=result['stderr_str']
)),
))
@mock.patch(
'devops.helpers.ssh_client.SSHClient.execute_async')
def test_execute_together(self, execute_async, client, policy, logger):
chan, _stdin, stderr, stdout = self.get_patched_execute_async_retval()
execute_async.return_value = chan, _stdin, stderr, stdout
stderr_lst = stderr.readlines()
stdout_lst = stdout.readlines()
expected = {
'exit_code': chan.recv_exit_status(),
'stderr': stderr_lst,
'stdout': stdout_lst}
if PY2:
expected['stderr_str'] = b''.join(stderr_lst).strip()
expected['stdout_str'] = b''.join(stdout_lst).strip()
else:
expected['stderr_str'] = b''.join(stderr_lst).strip().decode(
encoding='utf-8')
expected['stdout_str'] = b''.join(stdout_lst).strip().decode(
encoding='utf-8')
host2 = '127.0.0.2'
ssh = self.get_ssh()
ssh2 = SSHClient(
host=host2,
port=port,
auth=SSHAuth(
username=username,
password=password
))
remotes = [ssh, ssh2]
SSHClient.execute_together(
remotes=remotes, command=command)
self.assertEqual(execute_async.call_count, len(remotes))
chan.assert_has_calls((
mock.call.recv_exit_status(),
mock.call.close(),
mock.call.recv_exit_status(),
mock.call.close()
))
SSHClient.execute_together(
remotes=remotes, command=command, expected=[1], raise_on_err=False)
with self.assertRaises(DevopsCalledProcessError):
SSHClient.execute_together(
remotes=remotes, command=command, expected=[1])
@mock.patch(
'devops.helpers.ssh_client.SSHClient.execute')
def test_check_call(self, execute, client, policy, logger):
exit_code = 0
return_value = {
'stderr_str': '0\n1',
'stdout_str': '2\n3',
'exit_code': exit_code,
'stderr': [b' \n', b'0\n', b'1\n', b' \n'],
'stdout': [b' \n', b'2\n', b'3\n', b' \n']}
execute.return_value = return_value
verbose = False
ssh = self.get_ssh()
result = ssh.check_call(command=command, verbose=verbose)
execute.assert_called_once_with(command, verbose)
self.assertEqual(result, return_value)
exit_code = 1
return_value['exit_code'] = exit_code
execute.reset_mock()
execute.return_value = return_value
with self.assertRaises(DevopsCalledProcessError):
ssh.check_call(command=command, verbose=verbose)
execute.assert_called_once_with(command, verbose)
@mock.patch(
'devops.helpers.ssh_client.SSHClient.check_call')
def test_check_stderr(self, check_call, client, policy, logger):
return_value = {
'stderr_str': '',
'stdout_str': '2\n3',
'exit_code': 0,
'stderr': [],
'stdout': [b' \n', b'2\n', b'3\n', b' \n']}
check_call.return_value = return_value
verbose = False
raise_on_err = True
ssh = self.get_ssh()
result = ssh.check_stderr(
command=command, verbose=verbose, raise_on_err=raise_on_err)
check_call.assert_called_once_with(
command, verbose, raise_on_err=raise_on_err)
self.assertEqual(result, return_value)
return_value['stderr_str'] = '0\n1'
return_value['stderr'] = [b' \n', b'0\n', b'1\n', b' \n']
check_call.reset_mock()
check_call.return_value = return_value
with self.assertRaises(DevopsCalledProcessError):
ssh.check_stderr(
command=command, verbose=verbose, raise_on_err=raise_on_err)
check_call.assert_called_once_with(
command, verbose, raise_on_err=raise_on_err)
@mock.patch('devops.helpers.ssh_client.logger', autospec=True)
@mock.patch(
'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy')
@mock.patch('paramiko.SSHClient', autospec=True)
@mock.patch('paramiko.Transport', autospec=True)
class TestExecuteThrowHost(TestCase):
@staticmethod
def prepare_execute_through_host(transp, client, exit_code):
intermediate_channel = mock.Mock()
open_channel = mock.Mock(return_value=intermediate_channel)
intermediate_transport = mock.Mock()
intermediate_transport.attach_mock(open_channel, 'open_channel')
get_transport = mock.Mock(return_value=intermediate_transport)
_ssh = mock.Mock()
_ssh.attach_mock(get_transport, 'get_transport')
client.return_value = _ssh
transport = mock.Mock()
transp.return_value = transport
recv_exit_status = mock.Mock(return_value=exit_code)
makefile = mock.Mock()
makefile.attach_mock(mock.Mock(
return_value=[b' \n', b'2\n', b'3\n', b' \n']),
'readlines')
makefile_stderr = mock.Mock()
makefile_stderr.attach_mock(
mock.Mock(return_value=[b' \n', b'0\n', b'1\n', b' \n']),
'readlines')
channel = mock.Mock()
channel.attach_mock(mock.Mock(return_value=makefile), 'makefile')
channel.attach_mock(mock.Mock(
return_value=makefile_stderr), 'makefile_stderr')
channel.attach_mock(recv_exit_status, 'recv_exit_status')
open_session = mock.Mock(return_value=channel)
transport.attach_mock(open_session, 'open_session')
return (
open_session, transport, channel, get_transport,
open_channel, intermediate_channel
)
def test_execute_through_host_no_creds(
self, transp, client, policy, logger):
target = '127.0.0.2'
exit_code = 0
return_value = {
'stderr_str': '0\n1',
'stdout_str': '2\n3',
'exit_code': exit_code,
'stderr': [b' \n', b'0\n', b'1\n', b' \n'],
'stdout': [b' \n', b'2\n', b'3\n', b' \n']}
(
open_session, transport, channel, get_transport,
open_channel, intermediate_channel
) = self.prepare_execute_through_host(
transp, client, exit_code=exit_code)
ssh = SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
result = ssh.execute_through_host(target, command)
self.assertEqual(result, return_value)
get_transport.assert_called_once()
open_channel.assert_called_once()
transp.assert_called_once_with(intermediate_channel)
open_session.assert_called_once()
transport.assert_has_calls((
mock.call.connect(username=username, password=password, pkey=None),
mock.call.open_session()
))
channel.assert_has_calls((
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('ls ~ '),
mock.call.recv_exit_status(),
mock.call.close()
))
def test_execute_through_host_auth(
self, transp, client, policy, logger):
_login = 'cirros'
_password = 'cubswin:)'
target = '127.0.0.2'
exit_code = 0
return_value = {
'stderr_str': '0\n1',
'stdout_str': '2\n3',
'exit_code': exit_code,
'stderr': [b' \n', b'0\n', b'1\n', b' \n'],
'stdout': [b' \n', b'2\n', b'3\n', b' \n']}
(
open_session, transport, channel, get_transport,
open_channel, intermediate_channel
) = self.prepare_execute_through_host(
transp, client, exit_code=exit_code)
ssh = SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
result = ssh.execute_through_host(
target, command,
auth=SSHAuth(username=_login, password=_password))
self.assertEqual(result, return_value)
get_transport.assert_called_once()
open_channel.assert_called_once()
transp.assert_called_once_with(intermediate_channel)
open_session.assert_called_once()
transport.assert_has_calls((
mock.call.connect(username=_login, password=_password, pkey=None),
mock.call.open_session()
))
channel.assert_has_calls((
mock.call.makefile('rb'),
mock.call.makefile_stderr('rb'),
mock.call.exec_command('ls ~ '),
mock.call.recv_exit_status(),
mock.call.close()
))
@mock.patch('devops.helpers.ssh_client.logger', autospec=True)
@mock.patch(
'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy')
@mock.patch('paramiko.SSHClient', autospec=True)
class TestSftp(TestCase):
@staticmethod
def prepare_sftp_file_tests(client):
_ssh = mock.Mock()
client.return_value = _ssh
_sftp = mock.Mock()
open_sftp = mock.Mock(parent=_ssh, return_value=_sftp)
_ssh.attach_mock(open_sftp, 'open_sftp')
ssh = SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
return ssh, _sftp
def test_exists(self, client, policy, logger):
ssh, _sftp = self.prepare_sftp_file_tests(client)
lstat = mock.Mock()
_sftp.attach_mock(lstat, 'lstat')
path = '/etc'
result = ssh.exists(path)
self.assertTrue(result)
lstat.assert_called_once_with(path)
# Negative scenario
lstat.reset_mock()
lstat.side_effect = IOError
result = ssh.exists(path)
self.assertFalse(result)
lstat.assert_called_once_with(path)
def test_isfile(self, client, policy, logger):
class Attrs(object):
def __init__(self, mode):
self.st_mode = mode
ssh, _sftp = self.prepare_sftp_file_tests(client)
lstat = mock.Mock()
_sftp.attach_mock(lstat, 'lstat')
lstat.return_value = Attrs(stat.S_IFREG)
path = '/etc/passwd'
result = ssh.isfile(path)
self.assertTrue(result)
lstat.assert_called_once_with(path)
# Negative scenario
lstat.reset_mock()
lstat.return_value = Attrs(stat.S_IFDIR)
result = ssh.isfile(path)
self.assertFalse(result)
lstat.assert_called_once_with(path)
lstat.reset_mock()
lstat.side_effect = IOError
result = ssh.isfile(path)
self.assertFalse(result)
lstat.assert_called_once_with(path)
def test_isdir(self, client, policy, logger):
class Attrs(object):
def __init__(self, mode):
self.st_mode = mode
ssh, _sftp = self.prepare_sftp_file_tests(client)
lstat = mock.Mock()
_sftp.attach_mock(lstat, 'lstat')
lstat.return_value = Attrs(stat.S_IFDIR)
path = '/etc/passwd'
result = ssh.isdir(path)
self.assertTrue(result)
lstat.assert_called_once_with(path)
# Negative scenario
lstat.reset_mock()
lstat.return_value = Attrs(stat.S_IFREG)
result = ssh.isdir(path)
self.assertFalse(result)
lstat.assert_called_once_with(path)
lstat.reset_mock()
lstat.side_effect = IOError
result = ssh.isdir(path)
self.assertFalse(result)
lstat.assert_called_once_with(path)
@mock.patch('devops.helpers.ssh_client.SSHClient.exists')
@mock.patch('devops.helpers.ssh_client.SSHClient.execute')
def test_mkdir(self, execute, exists, client, policy, logger):
exists.side_effect = [False, True]
path = '~/tst'
ssh = SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
# Path not exists
ssh.mkdir(path)
exists.assert_called_once_with(path)
execute.assert_called_once_with("mkdir -p {}\n".format(path))
# Path exists
exists.reset_mock()
execute.reset_mock()
ssh.mkdir(path)
exists.assert_called_once_with(path)
execute.assert_not_called()
@mock.patch('devops.helpers.ssh_client.SSHClient.execute')
def test_rm_rf(self, execute, client, policy, logger):
path = '~/tst'
ssh = SSHClient(
host=host,
port=port,
auth=SSHAuth(
username=username,
password=password
))
# Path not exists
ssh.rm_rf(path)
execute.assert_called_once_with("rm -rf {}".format(path))
def test_open(self, client, policy, logger):
ssh, _sftp = self.prepare_sftp_file_tests(client)
fopen = mock.Mock(return_value=True)
_sftp.attach_mock(fopen, 'open')
path = '/etc/passwd'
mode = 'r'
result = ssh.open(path)
fopen.assert_called_once_with(path, mode)
self.assertTrue(result)
@mock.patch('devops.helpers.ssh_client.SSHClient.exists')
@mock.patch('os.path.exists', autospec=True)
@mock.patch('devops.helpers.ssh_client.SSHClient.isdir')
@mock.patch('os.path.isdir', autospec=True)
def test_download(
self,
isdir, remote_isdir, exists, remote_exists, client, policy, logger
):
ssh, _sftp = self.prepare_sftp_file_tests(client)
isdir.return_value = True
exists.side_effect = [True, False, False]
remote_isdir.side_effect = [False, False, True]
remote_exists.side_effect = [True, False, False]
dst = '/etc/environment'
target = '/tmp/environment'
result = ssh.download(destination=dst, target=target)
self.assertTrue(result)
isdir.assert_called_once_with(target)
exists.assert_called_once_with(posixpath.join(target, basename(dst)))
remote_isdir.assert_called_once_with(dst)
remote_exists.assert_called_once_with(dst)
_sftp.assert_has_calls((
mock.call.get(dst, posixpath.join(target, basename(dst))),
))
# Negative scenarios
logger.reset_mock()
result = ssh.download(destination=dst, target=target)
logger.assert_has_calls((
mock.call.debug(
"Copying '%s' -> '%s' from remote to local host",
'/etc/environment',
'/tmp/environment'),
mock.call.debug(
"Can't download %s because it doesn't exist",
'/etc/environment'
),
))
self.assertFalse(result)
logger.reset_mock()
result = ssh.download(destination=dst, target=target)
logger.assert_has_calls((
mock.call.debug(
"Copying '%s' -> '%s' from remote to local host",
'/etc/environment',
'/tmp/environment'),
mock.call.debug(
"Can't download %s because it is a directory",
'/etc/environment'
),
))
@mock.patch('devops.helpers.ssh_client.SSHClient.isdir')
@mock.patch('os.path.isdir', autospec=True)
def test_upload_file(
self, isdir, remote_isdir, client, policy, logger
):
ssh, _sftp = self.prepare_sftp_file_tests(client)
isdir.return_value = False
remote_isdir.return_value = False
target = '/etc/environment'
source = '/tmp/environment'
ssh.upload(source=source, target=target)
isdir.assert_called_once_with(source)
remote_isdir.assert_called_once_with(target)
_sftp.assert_has_calls((
mock.call.put(source, target),
))
@mock.patch('devops.helpers.ssh_client.SSHClient.exists')
@mock.patch('devops.helpers.ssh_client.SSHClient.mkdir')
@mock.patch('os.walk')
@mock.patch('devops.helpers.ssh_client.SSHClient.isdir')
@mock.patch('os.path.isdir', autospec=True)
def test_upload_dir(
self,
isdir, remote_isdir, walk, mkdir, exists,
client, policy, logger
):
ssh, _sftp = self.prepare_sftp_file_tests(client)
isdir.return_value = True
remote_isdir.return_value = True
exists.return_value = True
target = '/etc'
source = '/tmp/bash'
filename = 'bashrc'
walk.return_value = (source, '', [filename]),
expected_path = posixpath.join(target, basename(source))
expected_file = posixpath.join(expected_path, filename)
ssh.upload(source=source, target=target)
isdir.assert_called_once_with(source)
remote_isdir.assert_called_once_with(target)
mkdir.assert_called_once_with(expected_path)
exists.assert_called_once_with(expected_file)
_sftp.assert_has_calls((
mock.call.unlink(expected_file),
mock.call.put(posixpath.join(source, filename), expected_file),
))