diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index c805435026..e84cd9467a 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -174,9 +174,6 @@ class AmbariPlugin(p.ProvisioningPluginBase): def convert_props_to_template(self, props): raise NotImplementedError('not yet supported') - def _spawn(self, description, func, *args, **kwargs): - context.spawn(description, func, *args, **kwargs) - def _provision_cluster(self, name, cluster_spec, ambari_info, servers, version): # TODO(jspeidel): encapsulate in another class @@ -186,10 +183,12 @@ class AmbariPlugin(p.ProvisioningPluginBase): servers[0].cluster_id, _("Provision cluster via Ambari"), len(servers)) - for server in servers: - self._spawn( - "hdp-provision-instance-%s" % server.instance.hostname(), - server.provision_ambari, ambari_info, cluster_spec) + with context.ThreadGroup() as tg: + for server in servers: + tg.spawn( + "hdp-provision-instance-%s" % + server.instance.hostname(), + server.provision_ambari, ambari_info, cluster_spec) handler = self.version_factory.get_version_handler(version) ambari_client = handler.get_ambari_client() @@ -325,9 +324,11 @@ class AmbariPlugin(p.ProvisioningPluginBase): cpo.add_provisioning_step( cluster.id, _("Provision cluster via Ambari"), len(servers)) - for server in servers: - self._spawn('Ambari provisioning thread', - server.provision_ambari, ambari_info, cluster_spec) + with context.ThreadGroup() as tg: + for server in servers: + tg.spawn('Ambari provisioning thread', + server.provision_ambari, + ambari_info, cluster_spec) ambari_client.configure_scaled_cluster_instances( cluster.name, cluster_spec, self._get_num_hosts(cluster),