Merge "Add provisioning steps to Storm plugin"

This commit is contained in:
Jenkins 2015-02-23 21:05:46 +00:00 committed by Gerrit Code Review
commit bc911a3acb

View File

@ -27,6 +27,7 @@ from sahara.plugins import provisioning as p
from sahara.plugins.storm import config_helper as c_helper
from sahara.plugins.storm import run_scripts as run
from sahara.plugins import utils
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import remote
conductor = conductor.API
@ -90,10 +91,7 @@ class StormProvider(p.ProvisioningPluginBase):
# start storm master
if sm_instance:
with remote.get_remote(sm_instance) as r:
run.start_storm_nimbus_and_ui(r)
LOG.info(_LI("Storm master at '%s' has been started"),
sm_instance.hostname())
self._start_storm_master(sm_instance)
# start storm slaves
self._start_slave_processes(sl_instances)
@ -134,22 +132,45 @@ class StormProvider(p.ProvisioningPluginBase):
return extra
@cpo.event_wrapper(
True, step=utils.start_process_event_message("StormMaster"))
def _start_storm_master(self, sm_instance):
with remote.get_remote(sm_instance) as r:
run.start_storm_nimbus_and_ui(r)
LOG.info(_LI("Storm master at '%s' has been started"),
sm_instance.hostname())
def _start_slave_processes(self, sl_instances):
if len(sl_instances) == 0:
return
cpo.add_provisioning_step(
sl_instances[0].cluster_id,
utils.start_process_event_message("Slave"), len(sl_instances))
with context.ThreadGroup() as tg:
for i in sl_instances:
tg.spawn('storm-start-sl-%s' % i.instance_name,
self._start_slaves, i)
@cpo.event_wrapper(True)
def _start_slaves(self, instance):
with instance.remote() as r:
run.start_storm_supervisor(r)
def _start_zookeeper_processes(self, zk_instances):
if len(zk_instances) == 0:
return
cpo.add_provisioning_step(
zk_instances[0].cluster_id,
utils.start_process_event_message("Zookeeper"), len(zk_instances))
with context.ThreadGroup() as tg:
for i in zk_instances:
tg.spawn('storm-start-zk-%s' % i.instance_name,
self._start_zookeeper, i)
@cpo.event_wrapper(True)
def _start_zookeeper(self, instance):
with instance.remote() as r:
run.start_zookeeper(r)
@ -164,6 +185,9 @@ class StormProvider(p.ProvisioningPluginBase):
def _push_configs_to_nodes(self, cluster, extra, new_instances):
all_instances = utils.get_instances(cluster)
cpo.add_provisioning_step(
cluster.id, _("Push configs to nodes"), len(all_instances))
with context.ThreadGroup() as tg:
for instance in all_instances:
if instance in new_instances:
@ -186,6 +210,7 @@ class StormProvider(p.ProvisioningPluginBase):
return stream
@cpo.event_wrapper(True)
def _push_configs_to_new_node(self, cluster, extra, instance):
ng_extra = extra[instance.node_group.id]
@ -212,6 +237,7 @@ class StormProvider(p.ProvisioningPluginBase):
if 'supervisor' in node_processes:
self._push_supervisor_configs(r, files_supervisor)
@cpo.event_wrapper(True)
def _push_configs_to_existing_node(self, cluster, extra, instance):
node_processes = instance.node_group.node_processes
need_storm_update = ('nimbus' in node_processes or