Implement threaded SSH for provisioning and Vanilla plugin

* SSH commands are now run in parallel for both configuring during
   provisioning and for Vanilla plugin configuring

 * Introduced throttling for SSH connections

Change-Id: I88a68642f3649fdab8666e36b4ab226815b72320
This commit is contained in:
Dmitry Mescheryakov 2013-09-26 20:20:33 +04:00
parent 2d4743cfd9
commit 48f4cf66fd
6 changed files with 174 additions and 81 deletions

View File

@ -17,10 +17,14 @@ import threading
import eventlet
from eventlet import corolocal
from eventlet import semaphore
from oslo.config import cfg
from savanna.openstack.common import log as logging
from savanna.openstack.common import threadgroup
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -34,6 +38,7 @@ class Context(object):
username=None,
tenant_name=None,
is_admin=None,
remote_semaphore=None,
**kwargs):
if kwargs:
LOG.warn('Arguments dropped when creating context: %s', kwargs)
@ -44,6 +49,8 @@ class Context(object):
self.username = username
self.tenant_name = tenant_name
self.is_admin = is_admin
self.remote_semaphore = remote_semaphore or semaphore.Semaphore(
CONF.cluster_remote_threshold)
def clone(self):
return Context(
@ -53,7 +60,8 @@ class Context(object):
self.service_catalog,
self.username,
self.tenant_name,
self.is_admin)
self.is_admin,
self.remote_semaphore)
def to_dict(self):
return {
@ -101,19 +109,39 @@ def set_ctx(new_ctx):
_CTXS._curr_ctxs[ident] = new_ctx
def _wrapper(ctx, thread_description, func, *args, **kwargs):
try:
set_ctx(ctx)
func(*args, **kwargs)
except Exception as e:
LOG.exception("Thread '%s' fails with exception: '%s'"
% (thread_description, e))
finally:
set_ctx(None)
def spawn(thread_description, func, *args, **kwargs):
ctx = current().clone()
eventlet.spawn(_wrapper, current().clone(), thread_description,
func, *args, **kwargs)
def wrapper(ctx, func, *args, **kwargs):
try:
set_ctx(ctx)
func(*args, **kwargs)
set_ctx(None)
except Exception as e:
LOG.exception("Thread '%s' fails with exception: '%s'"
% (thread_description, e))
eventlet.spawn(wrapper, ctx, func, *args, **kwargs)
class ThreadGroup(object):
def __init__(self, thread_pool_size=1000):
self.tg = threadgroup.ThreadGroup(thread_pool_size)
def spawn(self, thread_description, func, *args, **kwargs):
self.tg.add_thread(_wrapper, current().clone(), thread_description,
func, *args, **kwargs)
def wait(self):
self.tg.wait()
def __enter__(self):
return self
def __exit__(self, *ex):
if not any(ex):
self.tg.wait()
def sleep(seconds=0):

View File

@ -30,6 +30,7 @@ from savanna.plugins import base as plugins_base
from savanna.service import periodic
from savanna.utils import api as api_utils
from savanna.utils import patches
from savanna.utils import remote
LOG = log.getLogger(__name__)
@ -97,6 +98,7 @@ def make_app():
plugins_base.setup_plugins()
periodic.setup(app)
remote.setup_remote()
def make_json_error(ex):
status_code = (ex.code

View File

@ -110,10 +110,9 @@ class VanillaProvider(p.ProvisioningPluginBase):
pass
def configure_cluster(self, cluster):
self._push_configs_to_nodes(cluster)
self._write_hadoop_user_keys(utils.get_instances(cluster),
cluster.management_private_key,
cluster.management_public_key)
instances = utils.get_instances(cluster)
self._setup_instances(cluster, instances)
def start_cluster(self, cluster):
nn_instance = utils.get_namenode(cluster)
@ -223,10 +222,8 @@ class VanillaProvider(p.ProvisioningPluginBase):
self._validate_additional_ng_scaling(cluster, additional)
def scale_cluster(self, cluster, instances):
self._push_configs_to_nodes(cluster, instances=instances)
self._write_hadoop_user_keys(instances,
cluster.management_private_key,
cluster.management_public_key)
self._setup_instances(cluster, instances)
run.refresh_nodes(remote.get_remote(
utils.get_namenode(cluster)), "dfsadmin")
jt = utils.get_jobtracker(cluster)
@ -241,38 +238,55 @@ class VanillaProvider(p.ProvisioningPluginBase):
if "tasktracker" in i.node_group.node_processes:
run.start_process(r, "tasktracker")
def _push_configs_to_nodes(self, cluster, instances=None):
def _setup_instances(self, cluster, instances):
passwd_mysql = uuidutils.generate_uuid() \
if utils.get_hiveserver(cluster) else None
extra = self._extract_configs_to_extra(cluster, passwd_mysql)
self._push_configs_to_nodes(cluster, extra, instances)
self._configure_master_nodes(cluster, extra, passwd_mysql)
if instances is None:
instances = utils.get_instances(cluster)
def _push_configs_to_nodes(self, cluster, extra, instances):
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn('vanilla-configure-%s' % instance.instance_name,
self._push_configs_to_node, cluster, extra, instance)
for inst in instances:
ng_extra = extra[inst.node_group.id]
files = {
'/etc/hadoop/core-site.xml': ng_extra['xml']['core-site'],
'/etc/hadoop/mapred-site.xml': ng_extra['xml']['mapred-site'],
'/etc/hadoop/hdfs-site.xml': ng_extra['xml']['hdfs-site'],
'/tmp/savanna-hadoop-init.sh': ng_extra['setup_script']
}
with remote.get_remote(inst) as r:
# TODO(aignatov): sudo chown is wrong solution. But it works.
r.execute_command(
'sudo chown -R $USER:$USER /etc/hadoop'
)
r.execute_command(
'sudo chown -R $USER:$USER /opt/oozie/conf'
)
r.write_files_to(files)
r.execute_command(
'sudo chmod 0500 /tmp/savanna-hadoop-init.sh'
)
r.execute_command(
'sudo /tmp/savanna-hadoop-init.sh '
'>> /tmp/savanna-hadoop-init.log 2>&1')
def _push_configs_to_node(self, cluster, extra, instance):
ng_extra = extra[instance.node_group.id]
files = {
'/etc/hadoop/core-site.xml': ng_extra['xml']['core-site'],
'/etc/hadoop/mapred-site.xml': ng_extra['xml']['mapred-site'],
'/etc/hadoop/hdfs-site.xml': ng_extra['xml']['hdfs-site'],
'/tmp/savanna-hadoop-init.sh': ng_extra['setup_script'],
'id_rsa': cluster.management_private_key,
'authorized_keys': cluster.management_public_key
}
key_cmd = 'sudo mkdir -p /home/hadoop/.ssh/; ' \
'sudo mv id_rsa authorized_keys /home/hadoop/.ssh ; ' \
'sudo chown -R hadoop:hadoop /home/hadoop/.ssh; ' \
'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}'
with remote.get_remote(instance) as r:
# TODO(aignatov): sudo chown is wrong solution. But it works.
r.execute_command(
'sudo chown -R $USER:$USER /etc/hadoop'
)
r.execute_command(
'sudo chown -R $USER:$USER /opt/oozie/conf'
)
r.write_files_to(files)
r.execute_command(
'sudo chmod 0500 /tmp/savanna-hadoop-init.sh'
)
r.execute_command(
'sudo /tmp/savanna-hadoop-init.sh '
'>> /tmp/savanna-hadoop-init.log 2>&1')
r.execute_command(key_cmd)
def _configure_master_nodes(self, cluster, extra, passwd_mysql):
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)
@ -340,22 +354,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info})
def _write_hadoop_user_keys(self, instances, private_key, public_key):
files = {
'id_rsa': private_key,
'authorized_keys': public_key
}
mv_cmd = 'sudo mkdir -p /home/hadoop/.ssh/; ' \
'sudo mv id_rsa authorized_keys /home/hadoop/.ssh ; ' \
'sudo chown -R hadoop:hadoop /home/hadoop/.ssh; ' \
'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}'
for instance in instances:
with remote.get_remote(instance) as r:
r.write_files_to(files)
r.execute_command(mv_cmd)
def _get_scalable_processes(self):
return ["datanode", "tasktracker"]

View File

@ -380,16 +380,24 @@ def _configure_instances(cluster):
* setup passwordless login
* etc.
"""
hosts = _generate_etc_hosts(cluster)
for node_group in cluster.node_groups:
for instance in node_group.instances:
LOG.debug('Configuring instance %s' % instance.instance_name)
with instance.remote as r:
r.write_file_to('etc-hosts', hosts)
r.execute_command('sudo mv etc-hosts /etc/hosts')
hosts_file = _generate_etc_hosts(cluster)
r.execute_command('sudo chown $USER:$USER .ssh/id_rsa')
r.execute_command('chmod 400 .ssh/id_rsa')
with context.ThreadGroup() as tg:
for node_group in cluster.node_groups:
for instance in node_group.instances:
tg.spawn("configure-instance-%s" % instance.instance_name,
_configure_instance, instance, hosts_file)
def _configure_instance(instance, hosts_file):
LOG.debug('Configuring instance %s' % instance.instance_name)
with instance.remote as r:
r.write_file_to('etc-hosts', hosts_file)
r.execute_command('sudo mv etc-hosts /etc/hosts')
r.execute_command('sudo chown $USER:$USER .ssh/id_rsa')
r.execute_command('chmod 400 .ssh/id_rsa')
def _generate_etc_hosts(cluster):

View File

@ -28,11 +28,14 @@ class TestAttachVolume(models_test_base.DbTestCase):
@mock.patch('savanna.utils.remote.InstanceInteropHelper._get_conn_params')
@mock.patch('savanna.utils.procutils.start_subprocess')
@mock.patch('savanna.utils.procutils.run_in_subprocess')
@mock.patch('savanna.utils.remote._acquire_remote_semaphore')
@mock.patch('savanna.utils.remote._release_remote_semaphore')
@mock.patch('savanna.utils.openstack.nova.get_node_group_image_username')
@mock.patch(
'savanna.utils.remote.BulkInstanceInteropHelper.execute_command')
def test_mount_volume(self, p_ex_cmd, p_get_username,
run_in_sub, start_sub, get_conn_params, p_close):
_acq, _rel, run_in_sub, start_sub, get_conn_params,
p_close):
p_get_username.return_value = 'root'
instance = r.InstanceResource({'instance_id': '123454321',

View File

@ -34,9 +34,12 @@ implementations which are run in a separate process.
import logging
import time
from eventlet import semaphore
from eventlet import timeout as e_timeout
from oslo.config import cfg
import paramiko
from savanna import context
from savanna import exceptions as ex
from savanna.openstack.common import excutils
from savanna.utils import crypto
@ -47,6 +50,20 @@ from savanna.utils import procutils
LOG = logging.getLogger(__name__)
remote_opts = [
cfg.IntOpt('global_remote_threshold', default=100,
help='Maximum number of remote operations that will '
'be running at the same time. Note that each '
'remote operation requires its own process to'
'run.'),
cfg.IntOpt('cluster_remote_threshold', default=70,
help='The same as global_remote_threshold, but for '
'a single cluster.'),
]
CONF = cfg.CONF
CONF.register_opts(remote_opts)
_ssh = None
@ -152,6 +169,25 @@ def _execute_on_vm_interactive(cmd, matcher):
channel.close()
_global_remote_semaphore = None
def setup_remote():
global _global_remote_semaphore
_global_remote_semaphore = semaphore.Semaphore(
CONF.global_remote_threshold)
def _acquire_remote_semaphore():
context.current().remote_semaphore.acquire()
_global_remote_semaphore.acquire()
def _release_remote_semaphore():
_global_remote_semaphore.release()
context.current().remote_semaphore.release()
class InstanceInteropHelper(object):
def __init__(self, instance):
self.instance = instance
@ -159,11 +195,19 @@ class InstanceInteropHelper(object):
self.instance.node_group)
def __enter__(self):
self.bulk = BulkInstanceInteropHelper(self.instance, self.username)
return self.bulk
_acquire_remote_semaphore()
try:
self.bulk = BulkInstanceInteropHelper(self.instance, self.username)
return self.bulk
except Exception:
with excutils.save_and_reraise_exception():
_release_remote_semaphore()
def __exit__(self, *exc_info):
self.bulk.close()
try:
self.bulk.close()
finally:
_release_remote_semaphore()
def _get_conn_params(self):
return (self.instance.management_ip, self.username,
@ -182,7 +226,7 @@ class InstanceInteropHelper(object):
finally:
procutils.shutdown_subprocess(proc, _cleanup)
def _run_t(self, func, timeout, *args, **kwargs):
def _run_with_log(self, func, timeout, *args, **kwargs):
start_time = time.time()
try:
with e_timeout.Timeout(timeout):
@ -191,6 +235,13 @@ class InstanceInteropHelper(object):
self._log_command('%s took %.1f seconds to complete' % (
func.__name__, time.time() - start_time))
def _run_s(self, func, timeout, *args, **kwargs):
_acquire_remote_semaphore()
try:
return self._run_with_log(func, timeout, *args, **kwargs)
finally:
_release_remote_semaphore()
def execute_command(self, cmd, get_stderr=False, raise_when_error=True,
timeout=300):
"""Execute specified command remotely using existing ssh connection.
@ -198,7 +249,7 @@ class InstanceInteropHelper(object):
Return exit code, stdout data and stderr data of the executed command.
"""
self._log_command('Executing "%s"' % cmd)
return self._run_t(_execute_command, timeout, cmd, get_stderr,
return self._run_s(_execute_command, timeout, cmd, get_stderr,
raise_when_error)
def write_file_to(self, remote_file, data, timeout=120):
@ -206,25 +257,25 @@ class InstanceInteropHelper(object):
data to it.
"""
self._log_command('Writing file "%s"' % remote_file)
self._run_t(_write_file_to, timeout, remote_file, data)
self._run_s(_write_file_to, timeout, remote_file, data)
def write_files_to(self, files, timeout=120):
"""Copy file->data dictionary in a single ssh connection.
"""
self._log_command('Writing files "%s"' % files.keys())
self._run_t(_write_files_to, timeout, files)
self._run_s(_write_files_to, timeout, files)
def read_file_from(self, remote_file, timeout=120):
"""Read remote file from the specified host and return given data."""
self._log_command('Reading file "%s"' % remote_file)
return self._run_t(_read_file_from, timeout, remote_file)
return self._run_s(_read_file_from, timeout, remote_file)
def replace_remote_string(self, remote_file, old_str, new_str,
timeout=120):
"""Replaces strings in remote file using sed command."""
self._log_command('In file "%s" replacing string "%s" '
'with "%s"' % (remote_file, old_str, new_str))
self._run_t(_replace_remote_string, timeout, remote_file, old_str,
self._run_s(_replace_remote_string, timeout, remote_file, old_str,
new_str)
def execute_on_vm_interactive(self, cmd, matcher, timeout=1800):
@ -242,7 +293,7 @@ class InstanceInteropHelper(object):
otherwise.
"""
self._log_command('Executing interactively "%s"' % cmd)
self._run_t(_execute_on_vm_interactive, timeout, cmd, matcher)
self._run_s(_execute_on_vm_interactive, timeout, cmd, matcher)
def _log_command(self, str):
LOG.debug('[%s] %s' % (self.instance.instance_name, str))
@ -269,3 +320,6 @@ class BulkInstanceInteropHelper(InstanceInteropHelper):
def _run(self, func, *args, **kwargs):
return procutils.run_in_subprocess(self.proc, func, args, kwargs)
def _run_s(self, func, timeout, *args, **kwargs):
return self._run_with_log(func, timeout, *args, **kwargs)