diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index a02bb2d779..c805435026 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -26,6 +26,7 @@ from sahara.plugins.hdp import saharautils as utils from sahara.plugins.hdp.versions import versionhandlerfactory as vhf from sahara.plugins import provisioning as p from sahara.topology import topology_helper as th +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import general as g @@ -75,6 +76,8 @@ class AmbariPlugin(p.ProvisioningPluginBase): cluster = g.change_cluster_status(cluster, "Configuring HA") self.configure_hdfs_ha(cluster) + @cpo.event_wrapper( + True, step=_("Add configurations to cluster"), param=('cluster', 1)) def configure_hdfs_ha(self, cluster): version = cluster.hadoop_version handler = self.version_factory.get_version_handler(version) @@ -178,6 +181,11 @@ class AmbariPlugin(p.ProvisioningPluginBase): servers, version): # TODO(jspeidel): encapsulate in another class + if servers: + cpo.add_provisioning_step( + servers[0].cluster_id, + _("Provision cluster via Ambari"), len(servers)) + for server in servers: self._spawn( "hdp-provision-instance-%s" % server.instance.hostname(), @@ -314,6 +322,9 @@ class AmbariPlugin(p.ProvisioningPluginBase): ambari_info = self.get_ambari_info(cluster_spec) self._update_ambari_info_credentials(cluster_spec, ambari_info) + cpo.add_provisioning_step( + cluster.id, _("Provision cluster via Ambari"), len(servers)) + for server in servers: self._spawn('Ambari provisioning thread', server.provision_ambari, ambari_info, cluster_spec) @@ -397,6 +408,9 @@ class AmbariPlugin(p.ProvisioningPluginBase): def _configure_topology_for_cluster(self, cluster, servers): if CONF.enable_data_locality: + cpo.add_provisioning_step( + cluster.id, _("Enable data locality for cluster"), + len(servers)) topology_data = th.generate_topology_map( cluster, CONF.enable_hypervisor_awareness) topology_str = "\n".join( @@ -426,3 +440,6 @@ class AmbariInfo(object): def get_cluster(self): sahara_instance = self.host.sahara_instance return sahara_instance.cluster + + def get_event_info(self): + return self.host.sahara_instance diff --git a/sahara/plugins/hdp/hadoopserver.py b/sahara/plugins/hdp/hadoopserver.py index 997473c2e4..a103a9885c 100644 --- a/sahara/plugins/hdp/hadoopserver.py +++ b/sahara/plugins/hdp/hadoopserver.py @@ -21,6 +21,7 @@ from sahara.i18n import _ from sahara.i18n import _LI from sahara.plugins import exceptions as ex from sahara.plugins.hdp import saharautils +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import files as f @@ -46,6 +47,14 @@ class HadoopServer(object): self.node_group = node_group self.ambari_rpm = ambari_rpm or AMBARI_RPM + def get_event_info(self): + return self.instance + + @property + def cluster_id(self): + return self.instance.cluster_id + + @cpo.event_wrapper(True, param=('self', 0)) def provision_ambari(self, ambari_info, cluster_spec): self.install_rpms() global_config = cluster_spec.configurations['global'] @@ -87,6 +96,7 @@ class HadoopServer(object): raise ex.HadoopProvisionError( _('Failed to install Hortonworks Ambari')) + @cpo.event_wrapper(True, param=('self', 0)) @saharautils.inject_remote('r') def install_swift_integration(self, r): LOG.debug( @@ -112,6 +122,7 @@ class HadoopServer(object): raise ex.HadoopProvisionError( _('Failed to install Hadoop Swift integration')) + @cpo.event_wrapper(True, param=('self', 0)) @saharautils.inject_remote('r') def configure_topology(self, topology_str, r): r.write_file_to( diff --git a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py index 7608d86ce1..f89805008f 100644 --- a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py @@ -31,6 +31,7 @@ from sahara.plugins.hdp import configprovider as cfgprov from sahara.plugins.hdp.versions import abstractversionhandler as avm from sahara.plugins.hdp.versions.version_1_3_2 import edp_engine from sahara.plugins.hdp.versions.version_1_3_2 import services +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import general as g from sahara import version @@ -125,6 +126,11 @@ class VersionHandler(avm.AbstractVersionHandler): return node_processes def install_swift_integration(self, servers): + if servers: + cpo.add_provisioning_step( + servers[0].cluster_id, _("Install Swift integration"), + len(servers)) + for server in servers: server.install_swift_integration() @@ -233,6 +239,8 @@ class AmbariClient(object): raise ex.HadoopProvisionError( _('Failed to add cluster: %s') % result.text) + @cpo.event_wrapper(True, step=_("Add configurations to cluster"), + param=('ambari_info', 2)) def _add_configurations_to_cluster( self, cluster_spec, ambari_info, name): @@ -283,6 +291,8 @@ class AmbariClient(object): _('Failed to set configurations on cluster: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add services to cluster"), param=('ambari_info', 2)) def _add_services_to_cluster(self, cluster_spec, ambari_info, name): services = cluster_spec.services add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}' @@ -299,6 +309,8 @@ class AmbariClient(object): _('Failed to add services to cluster: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add components to services"), param=('ambari_info', 2)) def _add_components_to_services(self, cluster_spec, ambari_info, name): add_component_url = ('http://{0}/api/v1/clusters/{1}/services/{' '2}/components/{3}') @@ -317,6 +329,8 @@ class AmbariClient(object): _('Failed to add components to services: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add hosts and components"), param=('ambari_info', 3)) def _add_hosts_and_components( self, cluster_spec, servers, ambari_info, name): @@ -352,6 +366,8 @@ class AmbariClient(object): _('Failed to add host component: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Install services"), param=('ambari_info', 2)) def _install_services(self, cluster_name, ambari_info): ambari_address = ambari_info.get_address() @@ -426,6 +442,8 @@ class AmbariClient(object): 'state.')) LOG.info(_LI('Ambari cluster state finalized.')) + @cpo.event_wrapper( + True, step=_("Start services"), param=('ambari_info', 3)) def start_services(self, cluster_name, cluster_spec, ambari_info): start_url = ('http://{0}/api/v1/clusters/{1}/services?ServiceInfo/' 'state=INSTALLED'.format( @@ -557,6 +575,8 @@ class AmbariClient(object): 'components in scaled instances. status' ' code returned = {0}').format(result.status)) + @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), + param=('ambari_info', 2)) @g.await_process( 3600, 5, _("Ambari agents registering with server"), _check_ambari) def wait_for_host_registrations(self, num_hosts, ambari_info): @@ -636,6 +656,8 @@ class AmbariClient(object): self._install_and_start_components( name, servers, ambari_info, cluster_spec) + @cpo.event_wrapper( + True, step=_("Decommission nodes"), param=('cluster', 1)) def decommission_cluster_instances(self, cluster, clusterspec, instances, ambari_info): raise exc.InvalidDataException(_('The HDP plugin does not support ' diff --git a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py index d96f33e60c..6c3b15b720 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py @@ -32,6 +32,7 @@ from sahara.plugins.hdp import configprovider as cfgprov from sahara.plugins.hdp.versions import abstractversionhandler as avm from sahara.plugins.hdp.versions.version_2_0_6 import edp_engine from sahara.plugins.hdp.versions.version_2_0_6 import services +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import general as g from sahara import version @@ -115,6 +116,11 @@ class VersionHandler(avm.AbstractVersionHandler): return node_processes def install_swift_integration(self, servers): + if servers: + cpo.add_provisioning_step( + servers[0].cluster_id, _("Install swift integration"), + len(servers)) + for server in servers: server.install_swift_integration() @@ -209,6 +215,8 @@ class AmbariClient(object): raise ex.HadoopProvisionError( _('Failed to add cluster: %s') % result.text) + @cpo.event_wrapper(True, step=_("Add configurations to cluster"), + param=('ambari_info', 2)) def _add_configurations_to_cluster( self, cluster_spec, ambari_info, name): @@ -259,6 +267,8 @@ class AmbariClient(object): _('Failed to set configurations on cluster: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add services to cluster"), param=('ambari_info', 2)) def _add_services_to_cluster(self, cluster_spec, ambari_info, name): services = cluster_spec.services add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}' @@ -276,6 +286,8 @@ class AmbariClient(object): _('Failed to add services to cluster: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add components to services"), param=('ambari_info', 2)) def _add_components_to_services(self, cluster_spec, ambari_info, name): add_component_url = ('http://{0}/api/v1/clusters/{1}/services/{' '2}/components/{3}') @@ -295,6 +307,8 @@ class AmbariClient(object): _('Failed to add components to services: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Add host and components"), param=('ambari_info', 3)) def _add_hosts_and_components( self, cluster_spec, servers, ambari_info, name): @@ -333,6 +347,8 @@ class AmbariClient(object): _('Failed to add host component: %s') % result.text) + @cpo.event_wrapper( + True, step=_("Install services"), param=('ambari_info', 2)) def _install_services(self, cluster_name, ambari_info): ambari_address = ambari_info.get_address() install_url = ('http://{0}/api/v1/clusters/{' @@ -410,6 +426,8 @@ class AmbariClient(object): _('Unable to finalize Ambari state.')) LOG.info(_LI('Ambari cluster state finalized.')) + @cpo.event_wrapper( + True, step=_("Start services"), param=('ambari_info', 3)) def start_services(self, cluster_name, cluster_spec, ambari_info): start_url = ('http://{0}/api/v1/clusters/{1}/services?ServiceInfo/' 'state=INSTALLED'.format( @@ -541,6 +559,8 @@ class AmbariClient(object): 'components in scaled instances. status' ' code returned = {0}').format(result.status)) + @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), + param=('ambari_info', 2)) @g.await_process( 3600, 5, _("Ambari agents registering with server"), _check_ambari) def wait_for_host_registrations(self, num_hosts, ambari_info): @@ -620,6 +640,8 @@ class AmbariClient(object): self._install_and_start_components( name, servers, ambari_info, cluster_spec) + @cpo.event_wrapper( + True, step=_("Decommission nodes"), param=('cluster', 1)) def decommission_cluster_instances(self, cluster, clusterspec, instances, ambari_info): diff --git a/sahara/utils/cluster_progress_ops.py b/sahara/utils/cluster_progress_ops.py index fd24862117..4b25d36482 100644 --- a/sahara/utils/cluster_progress_ops.py +++ b/sahara/utils/cluster_progress_ops.py @@ -196,8 +196,16 @@ def _get_info_from_cluster(arg): return None +def _get_event_info(arg): + try: + return arg.get_event_info() + except AttributeError: + return None + + def _get_info_from_obj(arg): - functions = [_get_info_from_instance, _get_info_from_cluster] + functions = [_get_info_from_instance, _get_info_from_cluster, + _get_event_info] for func in functions: value = func(arg)