Merge "Replace empty list with scalable process in scaling"

This commit is contained in:
Jenkins 2015-03-23 11:13:36 +00:00 committed by Gerrit Code Review
commit f6defa6898
4 changed files with 19 additions and 29 deletions

View File

@ -20,7 +20,6 @@ from oslo_log import log as logging
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as pu
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.plugins.vanilla import utils as vu
@ -32,42 +31,35 @@ from sahara.utils import general as g
LOG = logging.getLogger(__name__)
def start_all_processes(instances, filternames):
if filternames:
instances = pu.instances_with_services(instances, filternames)
def start_dn_nm_processes(instances):
filternames = ['datanode', 'nodemanager']
instances = pu.instances_with_services(instances, filternames)
if len(instances) == 0:
return
name = pu.start_process_event_message(", ".join(filternames))
cpo.add_provisioning_step(instances[0].cluster_id, name, len(instances))
cpo.add_provisioning_step(
instances[0].cluster_id,
pu.start_process_event_message("DataNodes, NodeManagers"),
len(instances))
with context.ThreadGroup() as tg:
for instance in instances:
processes = set(instance.node_group.node_processes)
if filternames:
processes = processes.intersection(filternames)
if processes:
tg.spawn('vanilla-start-processes-%s' %
instance.instance_name,
_start_processes,
instance, list(processes))
processes = processes.intersection(filternames)
tg.spawn('vanilla-start-processes-%s' % instance.instance_name,
_start_processes, instance, list(processes))
@cpo.event_wrapper(True)
def _start_processes(instance, processes):
with instance.remote() as r:
for process in processes:
if process in ['namenode', 'datanode']:
r.execute_command(
'sudo su - -c "hadoop-daemon.sh start %s" hadoop'
% process)
elif process in ['resourcemanager', 'nodemanager']:
r.execute_command(
'sudo su - -c "yarn-daemon.sh start %s" hadoop' % process)
else:
raise ex.HadoopProvisionError(
_("Process %s is not supported") % process)
if 'datanode' in processes:
r.execute_command(
'sudo su - -c "hadoop-daemon.sh start datanode" hadoop')
if 'nodemanager' in processes:
r.execute_command(
'sudo su - -c "yarn-daemon.sh start nodemanager" hadoop')
def start_hadoop_process(instance, process):

View File

@ -37,7 +37,7 @@ def scale_cluster(pctx, cluster, instances):
run.refresh_yarn_nodes(cluster)
config.configure_topology_data(pctx, cluster)
run.start_all_processes(instances, [])
run.start_dn_nm_processes(instances)
def _get_instances_with_service(instances, service):

View File

@ -81,8 +81,7 @@ class VersionHandler(avm.AbstractVersionHandler):
if rm:
run.start_yarn_process(rm, 'resourcemanager')
run.start_all_processes(utils.get_instances(cluster),
['datanode', 'nodemanager'])
run.start_dn_nm_processes(utils.get_instances(cluster))
run.await_datanodes(cluster)

View File

@ -121,8 +121,7 @@ class VersionHandler(avm.AbstractVersionHandler):
self.start_resourcemanager(cluster)
run.start_all_processes(utils.get_instances(cluster),
['datanode', 'nodemanager'])
run.start_dn_nm_processes(utils.get_instances(cluster))
run.await_datanodes(cluster)