Use ThreadGroup instead of separate threads

ThreadGroup can be used for provision_ambari execution. In that case
we will wait until successful execution of provision_ambari for all
servers.

Change-Id: I60913824e15add981857caef7d5bc94cae5f9512
Closes-bug: 1448073
This commit is contained in:
Vitaly Gridnev 2015-04-24 13:50:55 +03:00
parent 4ec5e20e2b
commit fbcf8d979e

View File

@ -174,9 +174,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def convert_props_to_template(self, props):
raise NotImplementedError('not yet supported')
def _spawn(self, description, func, *args, **kwargs):
context.spawn(description, func, *args, **kwargs)
def _provision_cluster(self, name, cluster_spec, ambari_info,
servers, version):
# TODO(jspeidel): encapsulate in another class
@ -186,12 +183,14 @@ class AmbariPlugin(p.ProvisioningPluginBase):
servers[0].cluster_id,
_("Provision cluster via Ambari"), len(servers))
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
self._spawn(
"hdp-provision-instance-%s" % server.instance.hostname(),
server.provision_ambari, ambari_info, cluster_spec)
with context.ThreadGroup() as tg:
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
tg.spawn(
"hdp-provision-instance-%s" %
server.instance.hostname(),
server.provision_ambari, ambari_info, cluster_spec)
handler = self.version_factory.get_version_handler(version)
ambari_client = handler.get_ambari_client()
@ -327,11 +326,13 @@ class AmbariPlugin(p.ProvisioningPluginBase):
cpo.add_provisioning_step(
cluster.id, _("Provision cluster via Ambari"), len(servers))
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
self._spawn('Ambari provisioning thread',
server.provision_ambari, ambari_info, cluster_spec)
with context.ThreadGroup() as tg:
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
tg.spawn('Ambari provisioning thread',
server.provision_ambari,
ambari_info, cluster_spec)
ambari_client.configure_scaled_cluster_instances(
cluster.name, cluster_spec, self._get_num_hosts(cluster),