deb-sahara/sahara/utils/ssh_remote.py
Nikolay Starodubtsev ce62f364cd Rewrite log levels and messages
This is an initial commit to make sahara logs relevant to new
logging spec.
We need to change some log levels to become closer to the spec, also
we need to change some log messages (e.g. for INFO level).

partially-implement bp new-style-logging

Change-Id: I8e7b7c4c3f375648d2999b1d0e19b68390cc22a8
2015-03-12 17:16:06 +03:00

810 lines
27 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper methods for executing commands on nodes via SSH.
The main access point is method get_remote(instance), it returns
InstanceInteropHelper object which does the actual work. See the
class for the list of available methods.
It is a context manager, so it could be used with 'with' statement
like that:
with get_remote(instance) as r:
r.execute_command(...)
Note that the module offloads the ssh calls to a child process.
It was implemented that way because we found no way to run paramiko
and eventlet together. The private high-level module methods are
implementations which are run in a separate process.
"""
import os
import shlex
import sys
import threading
import time
import uuid
from eventlet.green import subprocess as e_subprocess
from eventlet import semaphore
from eventlet import timeout as e_timeout
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import paramiko
import requests
from requests import adapters
import six
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.utils import crypto
from sahara.utils import hashabledict as h
from sahara.utils.openstack import base
from sahara.utils.openstack import neutron
from sahara.utils import procutils
from sahara.utils import remote
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_ssh = None
_proxy_ssh = None
_sessions = {}
INFRA = None
_global_remote_semaphore = None
def _connect(host, username, private_key, proxy_command=None,
gateway_host=None, gateway_image_username=None):
global _ssh
global _proxy_ssh
LOG.debug('Creating SSH connection')
if type(private_key) in [str, unicode]:
private_key = crypto.to_paramiko_private_key(private_key)
_ssh = paramiko.SSHClient()
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
proxy = None
if proxy_command:
LOG.debug('Creating proxy using command: {command}'.format(
command=proxy_command))
proxy = paramiko.ProxyCommand(proxy_command)
if gateway_host:
_proxy_ssh = paramiko.SSHClient()
_proxy_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
LOG.debug('Connecting to proxy gateway at: {gateway}'.format(
gateway=gateway_host))
_proxy_ssh.connect(gateway_host, username=gateway_image_username,
pkey=private_key, sock=proxy)
proxy = _proxy_ssh.get_transport().open_session()
proxy.exec_command("nc {0} 22".format(host))
_ssh.connect(host, username=username, pkey=private_key, sock=proxy)
def _cleanup():
global _ssh
global _proxy_ssh
_ssh.close()
if _proxy_ssh:
_proxy_ssh.close()
def _read_paramimko_stream(recv_func):
result = ''
buf = recv_func(1024)
while buf != '':
result += buf
buf = recv_func(1024)
return result
def _escape_quotes(command):
command = command.replace('\\', '\\\\')
command = command.replace('"', '\\"')
command = command.replace('`', '\\`')
return command
def _execute_command(cmd, run_as_root=False, get_stderr=False,
raise_when_error=True):
global _ssh
chan = _ssh.get_transport().open_session()
if run_as_root:
chan.exec_command('sudo bash -c "%s"' % _escape_quotes(cmd))
else:
chan.exec_command(cmd)
# todo(dmitryme): that could hang if stderr buffer overflows
stdout = _read_paramimko_stream(chan.recv)
stderr = _read_paramimko_stream(chan.recv_stderr)
ret_code = chan.recv_exit_status()
if ret_code and raise_when_error:
raise ex.RemoteCommandException(cmd=cmd, ret_code=ret_code,
stdout=stdout, stderr=stderr)
if get_stderr:
return ret_code, stdout, stderr
else:
return ret_code, stdout
def _execute_command_interactive(cmd, run_as_root=False):
global _ssh
chan = _ssh.get_transport().open_session()
if run_as_root:
chan.exec_command('sudo bash -c "%s"' % _escape_quotes(cmd))
else:
chan.exec_command(cmd)
_proxy_shell(chan)
_ssh.close()
def _proxy_shell(chan):
def readall():
while True:
d = sys.stdin.read(1)
if not d or chan.exit_status_ready():
break
chan.send(d)
reader = threading.Thread(target=readall)
reader.start()
while True:
data = chan.recv(256)
if not data or chan.exit_status_ready():
break
sys.stdout.write(data)
sys.stdout.flush()
def _get_http_client(host, port, proxy_command=None, gateway_host=None,
gateway_username=None, gateway_private_key=None):
global _sessions
_http_session = _sessions.get((host, port), None)
LOG.debug('Cached HTTP session for {host}:{port} is {session}'.format(
host=host, port=port, session=_http_session))
if not _http_session:
if gateway_host:
_http_session = _get_proxy_gateway_http_session(
gateway_host, gateway_username,
gateway_private_key, host, port, proxy_command)
LOG.debug('Created ssh proxied HTTP session for {host}:{port}'
.format(host=host, port=port))
elif proxy_command:
# can return a new session here because it actually uses
# the same adapter (and same connection pools) for a given
# host and port tuple
_http_session = _get_proxied_http_session(
proxy_command, host, port=port)
LOG.debug('Created proxied HTTP session for {host}:{port}'
.format(host=host, port=port))
else:
# need to cache the sessions that are not proxied through
# HTTPRemoteWrapper so that a new session with a new HTTPAdapter
# and associated pools is not recreated for each HTTP invocation
_http_session = requests.Session()
LOG.debug('Created standard HTTP session for {host}:{port}'
.format(host=host, port=port))
adapter = requests.adapters.HTTPAdapter()
for prefix in ['http://', 'https://']:
_http_session.mount(prefix + '%s:%s' % (host, port),
adapter)
LOG.debug('Caching session {session} for {host}:{port}'
.format(session=_http_session, host=host, port=port))
_sessions[(host, port)] = _http_session
return _http_session
def _write_fl(sftp, remote_file, data):
fl = sftp.file(remote_file, 'w')
fl.write(data)
fl.close()
def _append_fl(sftp, remote_file, data):
fl = sftp.file(remote_file, 'a')
fl.write(data)
fl.close()
def _write_file(sftp, remote_file, data, run_as_root):
if run_as_root:
temp_file = 'temp-file-%s' % six.text_type(uuid.uuid4())
_write_fl(sftp, temp_file, data)
_execute_command(
'mv %s %s' % (temp_file, remote_file), run_as_root=True)
else:
_write_fl(sftp, remote_file, data)
def _append_file(sftp, remote_file, data, run_as_root):
if run_as_root:
temp_file = 'temp-file-%s' % six.text_type(uuid.uuid4())
_write_fl(sftp, temp_file, data)
_execute_command(
'cat %s >> %s' % (temp_file, remote_file), run_as_root=True)
_execute_command('rm -f %s' % temp_file)
else:
_append_fl(sftp, remote_file, data)
def _write_file_to(remote_file, data, run_as_root=False):
global _ssh
_write_file(_ssh.open_sftp(), remote_file, data, run_as_root)
def _write_files_to(files, run_as_root=False):
global _ssh
sftp = _ssh.open_sftp()
for fl, data in six.iteritems(files):
_write_file(sftp, fl, data, run_as_root)
def _append_to_file(remote_file, data, run_as_root=False):
global _ssh
_append_file(_ssh.open_sftp(), remote_file, data, run_as_root)
def _append_to_files(files, run_as_root=False):
global _ssh
sftp = _ssh.open_sftp()
for fl, data in six.iteritems(files):
_append_file(sftp, fl, data, run_as_root)
def _read_file(sftp, remote_file):
fl = sftp.file(remote_file, 'r')
data = fl.read()
fl.close()
return data
def _read_file_from(remote_file, run_as_root=False):
global _ssh
fl = remote_file
if run_as_root:
fl = 'temp-file-%s' % (six.text_type(uuid.uuid4()))
_execute_command('cp %s %s' % (remote_file, fl), run_as_root=True)
try:
return _read_file(_ssh.open_sftp(), fl)
except IOError:
LOG.error(_LE("Can't read file {filename}").format(
filename=remote_file))
raise
finally:
if run_as_root:
_execute_command(
'rm %s' % fl, run_as_root=True, raise_when_error=False)
def _replace_remote_string(remote_file, old_str, new_str):
old_str = old_str.replace("\'", "\''")
new_str = new_str.replace("\'", "\''")
cmd = "sudo sed -i 's,%s,%s,g' %s" % (old_str, new_str, remote_file)
_execute_command(cmd)
def _execute_on_vm_interactive(cmd, matcher):
global _ssh
buf = ''
channel = _ssh.invoke_shell()
LOG.debug('Channel is {channel}'.format(channel=channel))
try:
LOG.debug('Sending cmd {command}'.format(command=cmd))
channel.send(cmd + '\n')
while not matcher.is_eof(buf):
buf += channel.recv(4096)
response = matcher.get_response(buf)
if response is not None:
channel.send(response + '\n')
buf = ''
finally:
LOG.debug('Closing channel')
channel.close()
def _acquire_remote_semaphore():
context.current().remote_semaphore.acquire()
_global_remote_semaphore.acquire()
def _release_remote_semaphore():
_global_remote_semaphore.release()
context.current().remote_semaphore.release()
def _get_proxied_http_session(proxy_command, host, port=None):
session = requests.Session()
adapter = ProxiedHTTPAdapter(
_simple_exec_func(shlex.split(proxy_command)), host, port)
session.mount('http://{0}:{1}'.format(host, adapter.port), adapter)
return session
def _get_proxy_gateway_http_session(gateway_host, gateway_username,
gateway_private_key, host, port=None,
proxy_command=None):
session = requests.Session()
adapter = ProxiedHTTPAdapter(
_proxy_gateway_func(gateway_host, gateway_username,
gateway_private_key, host,
port, proxy_command),
host, port)
session.mount('http://{0}:{1}'.format(host, port), adapter)
return session
def _simple_exec_func(cmd):
def func():
return e_subprocess.Popen(cmd,
stdin=e_subprocess.PIPE,
stdout=e_subprocess.PIPE,
stderr=e_subprocess.PIPE)
return func
def _proxy_gateway_func(gateway_host, gateway_username,
gateway_private_key, host,
port, proxy_command):
def func():
proc = procutils.start_subprocess()
try:
conn_params = (gateway_host, gateway_username, gateway_private_key,
proxy_command, None, None)
procutils.run_in_subprocess(proc, _connect, conn_params)
cmd = "nc {host} {port}".format(host=host, port=port)
procutils.run_in_subprocess(
proc, _execute_command_interactive, (cmd,), interactive=True)
return proc
except Exception:
with excutils.save_and_reraise_exception():
procutils.shutdown_subprocess(proc, _cleanup)
return func
class ProxiedHTTPAdapter(adapters.HTTPAdapter):
def __init__(self, create_process_func, host, port):
super(ProxiedHTTPAdapter, self).__init__()
LOG.debug('HTTP adapter created for {host}:{port}'.format(host=host,
port=port))
self.create_process_func = create_process_func
self.port = port
self.host = host
def get_connection(self, url, proxies=None):
pool_conn = (
super(ProxiedHTTPAdapter, self).get_connection(url, proxies))
if hasattr(pool_conn, '_get_conn'):
http_conn = pool_conn._get_conn()
if http_conn.sock is None:
if hasattr(http_conn, 'connect'):
sock = self._connect()
LOG.debug('HTTP connection {connection} getting new '
'netcat socket {socket}'.format(
connection=http_conn, socket=sock))
http_conn.sock = sock
else:
if hasattr(http_conn.sock, 'is_netcat_socket'):
LOG.debug('Pooled http connection has existing '
'netcat socket. resetting pipe')
http_conn.sock.reset()
pool_conn._put_conn(http_conn)
return pool_conn
def close(self):
LOG.debug('Closing HTTP adapter for {host}:{port}'
.format(host=self.host, port=self.port))
super(ProxiedHTTPAdapter, self).close()
def _connect(self):
LOG.debug('Returning netcat socket for {host}:{port}'
.format(host=self.host, port=self.port))
rootwrap_command = CONF.rootwrap_command if CONF.use_rootwrap else ''
return NetcatSocket(self.create_process_func, rootwrap_command)
class NetcatSocket(object):
def _create_process(self):
self.process = self.create_process_func()
def __init__(self, create_process_func, rootwrap_command=None):
self.create_process_func = create_process_func
self.rootwrap_command = rootwrap_command
self._create_process()
def send(self, content):
try:
self.process.stdin.write(content)
self.process.stdin.flush()
except IOError as e:
raise ex.SystemError(e)
return len(content)
def sendall(self, content):
return self.send(content)
def makefile(self, mode, *arg):
if mode.startswith('r'):
return self.process.stdout
if mode.startswith('w'):
return self.process.stdin
raise ex.IncorrectStateError(_("Unknown file mode %s") % mode)
def recv(self, size):
try:
return os.read(self.process.stdout.fileno(), size)
except IOError as e:
raise ex.SystemError(e)
def _terminate(self):
if self.rootwrap_command:
os.system('{0} kill {1}'.format(self.rootwrap_command,
self.process.pid))
else:
self.process.terminate()
def close(self):
LOG.debug('Socket close called')
self._terminate()
def settimeout(self, timeout):
pass
def fileno(self):
return self.process.stdin.fileno()
def is_netcat_socket(self):
return True
def reset(self):
self._terminate()
self._create_process()
class InstanceInteropHelper(remote.Remote):
def __init__(self, instance):
self.instance = instance
def __enter__(self):
_acquire_remote_semaphore()
try:
self.bulk = BulkInstanceInteropHelper(self.instance)
return self.bulk
except Exception:
with excutils.save_and_reraise_exception():
_release_remote_semaphore()
def __exit__(self, *exc_info):
try:
self.bulk.close()
finally:
_release_remote_semaphore()
def get_neutron_info(self, instance=None):
if not instance:
instance = self.instance
neutron_info = h.HashableDict()
neutron_info['network'] = instance.cluster.neutron_management_network
ctx = context.current()
neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network')
neutron_info['token'] = ctx.auth_token
neutron_info['tenant'] = ctx.tenant_name
neutron_info['host'] = instance.management_ip
LOG.debug('Returning neutron info: {info}'.format(info=neutron_info))
return neutron_info
def _build_proxy_command(self, command, instance=None, port=None,
info=None, rootwrap_command=None):
# Accepted keywords in the proxy command template:
# {host}, {port}, {tenant_id}, {network_id}, {router_id}
keywords = {}
if not info:
info = self.get_neutron_info(instance)
keywords['tenant_id'] = context.current().tenant_id
keywords['network_id'] = info['network']
# Query Neutron only if needed
if '{router_id}' in command:
client = neutron.NeutronClient(info['network'], info['uri'],
info['token'], info['tenant'])
keywords['router_id'] = client.get_router()
keywords['host'] = instance.management_ip
keywords['port'] = port
try:
command = command.format(**keywords)
except KeyError as e:
LOG.error(_LE('Invalid keyword in proxy_command: {result}').format(
result=e))
# Do not give more details to the end-user
raise ex.SystemError('Misconfiguration')
if rootwrap_command:
command = '{0} {1}'.format(rootwrap_command, command)
return command
def _get_conn_params(self):
host_ng = self.instance.node_group
cluster = host_ng.cluster
access_instance = self.instance
proxy_gateway_node = cluster.get_proxy_gateway_node()
gateway_host = None
gateway_image_username = None
if proxy_gateway_node and not host_ng.is_proxy_gateway:
access_instance = proxy_gateway_node
gateway_host = proxy_gateway_node.management_ip
ng = proxy_gateway_node.node_group
gateway_image_username = ng.image_username
proxy_command = None
if CONF.proxy_command:
# Build a session through a user-defined socket
proxy_command = CONF.proxy_command
elif CONF.use_namespaces and not CONF.use_floating_ips:
# Build a session through a netcat socket in the Neutron namespace
proxy_command = (
'ip netns exec qrouter-{router_id} nc {host} {port}')
# proxy_command is currently a template, turn it into a real command
# i.e. dereference {host}, {port}, etc.
if proxy_command:
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
proxy_command = self._build_proxy_command(
proxy_command, instance=access_instance, port=22,
info=None, rootwrap_command=rootwrap)
return (self.instance.management_ip,
host_ng.image_username,
cluster.management_private_key,
proxy_command,
gateway_host,
gateway_image_username)
def _run(self, func, *args, **kwargs):
proc = procutils.start_subprocess()
try:
procutils.run_in_subprocess(proc, _connect,
self._get_conn_params())
return procutils.run_in_subprocess(proc, func, args, kwargs)
except Exception:
with excutils.save_and_reraise_exception():
procutils.shutdown_subprocess(proc, _cleanup)
finally:
procutils.shutdown_subprocess(proc, _cleanup)
def _run_with_log(self, func, timeout, *args, **kwargs):
start_time = time.time()
try:
with e_timeout.Timeout(timeout, ex.TimeoutException(timeout)):
return self._run(func, *args, **kwargs)
finally:
self._log_command('%s took %.1f seconds to complete' % (
func.__name__, time.time() - start_time))
def _run_s(self, func, timeout, *args, **kwargs):
_acquire_remote_semaphore()
try:
return self._run_with_log(func, timeout, *args, **kwargs)
finally:
_release_remote_semaphore()
def get_http_client(self, port, info=None):
self._log_command('Retrieving HTTP session for {0}:{1}'.format(
self.instance.management_ip, port))
host_ng = self.instance.node_group
cluster = host_ng.cluster
access_instance = self.instance
access_port = port
proxy_gateway_node = cluster.get_proxy_gateway_node()
gateway_host = None
gateway_username = None
gateway_private_key = None
if proxy_gateway_node and not host_ng.is_proxy_gateway:
access_instance = proxy_gateway_node
access_port = 22
gateway_host = proxy_gateway_node.management_ip
gateway_username = proxy_gateway_node.node_group.image_username
gateway_private_key = cluster.management_private_key
proxy_command = None
if CONF.proxy_command:
# Build a session through a user-defined socket
proxy_command = CONF.proxy_command
elif info or (CONF.use_namespaces and not CONF.use_floating_ips):
# need neutron info
if not info:
info = self.get_neutron_info(access_instance)
# Build a session through a netcat socket in the Neutron namespace
proxy_command = (
'ip netns exec qrouter-{router_id} nc {host} {port}')
# proxy_command is currently a template, turn it into a real command
# i.e. dereference {host}, {port}, etc.
if proxy_command:
rootwrap = CONF.rootwrap_command if CONF.use_rootwrap else ''
proxy_command = self._build_proxy_command(
proxy_command, instance=access_instance, port=access_port,
info=info, rootwrap_command=rootwrap)
return _get_http_client(self.instance.management_ip, port,
proxy_command, gateway_host,
gateway_username,
gateway_private_key)
def close_http_session(self, port):
global _sessions
host = self.instance.management_ip
self._log_command(_("Closing HTTP session for %(host)s:%(port)s") % {
'host': host, 'port': port})
session = _sessions.get((host, port), None)
if session is None:
raise ex.NotFoundException(
_('Session for %(host)s:%(port)s not cached') % {
'host': host, 'port': port})
session.close()
del _sessions[(host, port)]
def execute_command(self, cmd, run_as_root=False, get_stderr=False,
raise_when_error=True, timeout=300):
self._log_command('Executing "%s"' % cmd)
return self._run_s(_execute_command, timeout, cmd, run_as_root,
get_stderr, raise_when_error)
def write_file_to(self, remote_file, data, run_as_root=False, timeout=120):
self._log_command('Writing file "%s"' % remote_file)
self._run_s(_write_file_to, timeout, remote_file, data, run_as_root)
def write_files_to(self, files, run_as_root=False, timeout=120):
self._log_command('Writing files "%s"' % files.keys())
self._run_s(_write_files_to, timeout, files, run_as_root)
def append_to_file(self, r_file, data, run_as_root=False, timeout=120):
self._log_command('Appending to file "%s"' % r_file)
self._run_s(_append_to_file, timeout, r_file, data, run_as_root)
def append_to_files(self, files, run_as_root=False, timeout=120):
self._log_command('Appending to files "%s"' % files.keys())
self._run_s(_append_to_files, timeout, files, run_as_root)
def read_file_from(self, remote_file, run_as_root=False, timeout=120):
self._log_command('Reading file "%s"' % remote_file)
return self._run_s(_read_file_from, timeout, remote_file, run_as_root)
def replace_remote_string(self, remote_file, old_str, new_str,
timeout=120):
self._log_command('In file "%s" replacing string "%s" '
'with "%s"' % (remote_file, old_str, new_str))
self._run_s(_replace_remote_string, timeout, remote_file, old_str,
new_str)
def execute_on_vm_interactive(self, cmd, matcher, timeout=1800):
"""Runs given command and responds to prompts.
'cmd' is a command to execute.
'matcher' is an object which provides responses on command's
prompts. It should have two methods implemented:
* get_response(buf) - returns response on prompt if it is
found in 'buf' string, which is a part of command output.
If no prompt is found, the method should return None.
* is_eof(buf) - returns True if current 'buf' indicates that
the command is finished. False should be returned
otherwise.
"""
self._log_command('Executing interactively "%s"' % cmd)
self._run_s(_execute_on_vm_interactive, timeout, cmd, matcher)
def _log_command(self, str):
LOG.debug('[{instance}] {command}'.format(
instance=self.instance.instance_name, command=str))
class BulkInstanceInteropHelper(InstanceInteropHelper):
def __init__(self, instance):
super(BulkInstanceInteropHelper, self).__init__(instance)
self.proc = procutils.start_subprocess()
try:
procutils.run_in_subprocess(self.proc, _connect,
self._get_conn_params())
except Exception:
with excutils.save_and_reraise_exception():
procutils.shutdown_subprocess(self.proc, _cleanup)
def close(self):
procutils.shutdown_subprocess(self.proc, _cleanup)
def _run(self, func, *args, **kwargs):
return procutils.run_in_subprocess(self.proc, func, args, kwargs)
def _run_s(self, func, timeout, *args, **kwargs):
return self._run_with_log(func, timeout, *args, **kwargs)
class SshRemoteDriver(remote.RemoteDriver):
def get_type_and_version(self):
return "ssh.1.0"
def setup_remote(self, engine):
global _global_remote_semaphore
global INFRA
_global_remote_semaphore = semaphore.Semaphore(
CONF.global_remote_threshold)
INFRA = engine
def get_remote(self, instance):
return InstanceInteropHelper(instance)
def get_userdata_template(self):
# SSH does not need any instance customization
return ""