diff --git a/releasenotes/notes/event_log_for_hdp-a114511c477ef16d.yaml b/releasenotes/notes/event_log_for_hdp-a114511c477ef16d.yaml new file mode 100644 index 00000000..8ad1cfc7 --- /dev/null +++ b/releasenotes/notes/event_log_for_hdp-a114511c477ef16d.yaml @@ -0,0 +1,3 @@ +--- +features: + - Added event log for HDP plugin diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py index 07b7508c..17d21368 100644 --- a/sahara/plugins/ambari/deploy.py +++ b/sahara/plugins/ambari/deploy.py @@ -22,6 +22,7 @@ from oslo_utils import uuidutils from sahara import conductor from sahara import context +from sahara.i18n import _ from sahara.i18n import _LW from sahara.plugins.ambari import client as ambari_client from sahara.plugins.ambari import common as p_common @@ -30,6 +31,7 @@ from sahara.plugins.ambari import ha_helper from sahara.plugins import kerberos from sahara.plugins import utils as plugin_utils from sahara.topology import topology_helper as t_helper +from sahara.utils import cluster_progress_ops as cpo from sahara.utils import poll_utils @@ -56,6 +58,8 @@ os_type_map = { } +@cpo.event_wrapper(True, step=_("Set up Ambari management console"), + param=('cluster', 0)) def setup_ambari(cluster): LOG.debug("Set up Ambari management console") ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) @@ -79,6 +83,8 @@ def setup_agents(cluster, instances=None): def _setup_agents(instances, manager_address): + cpo.add_provisioning_step( + instances[0].cluster.id, _("Set up Ambari agents"), len(instances)) with context.ThreadGroup() as tg: for inst in instances: tg.spawn("hwx-agent-setup-%s" % inst.id, @@ -108,6 +114,7 @@ def disable_repos(cluster): _disable_repos_on_inst, inst) +@cpo.event_wrapper(True) def _setup_agent(instance, ambari_address): with instance.remote() as r: sudo = functools.partial(r.execute_command, run_as_root=True) @@ -118,6 +125,8 @@ def _setup_agent(instance, ambari_address): sudo("yum clean all") +@cpo.event_wrapper(True, step=_("Wait Ambari accessible"), + param=('cluster', 0)) def wait_ambari_accessible(cluster): ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) kwargs = {"host": ambari.management_ip, "port": 8080} @@ -162,6 +171,7 @@ def _prepare_ranger(cluster): sudo("rm /tmp/init.sql") +@cpo.event_wrapper(True, step=_("Prepare Hive"), param=('cluster', 0)) def prepare_hive(cluster): hive = plugin_utils.get_instance(cluster, p_common.HIVE_SERVER) if not hive: @@ -175,6 +185,8 @@ def prepare_hive(cluster): '/user/oozie/conf/hive-site.xml" hdfs') +@cpo.event_wrapper(True, step=_("Update default Ambari password"), + param=('cluster', 0)) def update_default_ambari_password(cluster): ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) new_password = uuidutils.generate_uuid() @@ -187,6 +199,8 @@ def update_default_ambari_password(cluster): cluster = conductor.cluster_get(ctx, cluster.id) +@cpo.event_wrapper(True, step=_("Wait registration of hosts"), + param=('cluster', 0)) def wait_host_registration(cluster, instances): with _get_ambari_client(cluster) as client: kwargs = {"client": client, "instances": instances} @@ -202,11 +216,9 @@ def _check_host_registration(client, instances): return True -def set_up_hdp_repos(cluster): - hdp_repo = configs.get_hdp_repo_url(cluster) - hdp_utils_repo = configs.get_hdp_utils_repo_url(cluster) - if not hdp_repo and not hdp_utils_repo: - return +@cpo.event_wrapper(True, step=_("Set up HDP repositories"), + param=('cluster', 0)) +def _set_up_hdp_repos(cluster, hdp_repo, hdp_utils_repo): ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) pv = cluster.hadoop_version repos = repo_id_map[pv] @@ -219,15 +231,20 @@ def set_up_hdp_repos(cluster): hdp_utils_repo) +def set_up_hdp_repos(cluster): + hdp_repo = configs.get_hdp_repo_url(cluster) + hdp_utils_repo = configs.get_hdp_utils_repo_url(cluster) + if hdp_repo or hdp_utils_repo: + _set_up_hdp_repos(cluster, hdp_repo, hdp_utils_repo) + + def get_kdc_server(cluster): return plugin_utils.get_instance( cluster, p_common.AMBARI_SERVER) -def prepare_kerberos(cluster, instances=None): - if not kerberos.is_kerberos_security_enabled(cluster): - return - +@cpo.event_wrapper(True, step=_("Prepare Kerberos"), param=('cluster', 0)) +def _prepare_kerberos(cluster, instances=None): if instances is None: kerberos.deploy_infrastructure(cluster, get_kdc_server(cluster)) kerberos.prepare_policy_files(cluster) @@ -239,6 +256,11 @@ def prepare_kerberos(cluster, instances=None): kerberos.prepare_policy_files(cluster) +def prepare_kerberos(cluster, instances=None): + if kerberos.is_kerberos_security_enabled(cluster): + _prepare_kerberos(cluster, instances) + + def _serialize_mit_kdc_kerberos_env(cluster): return { 'kerberos-env': { @@ -281,6 +303,8 @@ def get_host_group_components(cluster, processes): return result +@cpo.event_wrapper(True, step=_("Create Ambari blueprint"), + param=('cluster', 0)) def create_blueprint(cluster): _prepare_ranger(cluster) cluster = conductor.cluster_get(context.ctx(), cluster.id) @@ -344,7 +368,7 @@ def _build_ambari_cluster_template(cluster): if kerberos.is_kerberos_security_enabled(cluster): cl_tmpl["credentials"] = _get_credentials(cluster) cl_tmpl["security"] = {"type": "KERBEROS"} - topology = _configure_topology_data(cluster) + topology = _get_topology_data(cluster) for ng in cluster.node_groups: for instance in ng.instances: host = {"fqdn": instance.fqdn()} @@ -357,6 +381,7 @@ def _build_ambari_cluster_template(cluster): return cl_tmpl +@cpo.event_wrapper(True, step=_("Start cluster"), param=('cluster', 0)) def start_cluster(cluster): ambari_template = _build_ambari_cluster_template(cluster) with _get_ambari_client(cluster) as client: @@ -364,12 +389,21 @@ def start_cluster(cluster): client.wait_ambari_request(req_id, cluster.name) +@cpo.event_wrapper(True) +def _add_host_to_cluster(instance, client): + client.add_host_to_cluster(instance) + + def add_new_hosts(cluster, instances): with _get_ambari_client(cluster) as client: + cpo.add_provisioning_step( + cluster.id, _("Add new hosts"), len(instances)) for inst in instances: - client.add_host_to_cluster(inst) + _add_host_to_cluster(inst, client) +@cpo.event_wrapper(True, step=_("Generate config groups"), + param=('cluster', 0)) def manage_config_groups(cluster, instances): groups = [] for instance in instances: @@ -378,6 +412,8 @@ def manage_config_groups(cluster, instances): client.create_config_group(cluster, groups) +@cpo.event_wrapper(True, step=_("Cleanup config groups"), + param=('cluster', 0)) def cleanup_config_groups(cluster, instances): to_remove = set() for instance in instances: @@ -394,10 +430,9 @@ def cleanup_config_groups(cluster, instances): client.remove_config_group(cluster, cfg_id) +@cpo.event_wrapper(True, step=_("Regenerate keytabs for Kerberos"), + param=('cluster', 0)) def _regenerate_keytabs(cluster): - if not kerberos.is_kerberos_security_enabled(cluster): - return - with _get_ambari_client(cluster) as client: alias = "kdc.admin.credential" try: @@ -418,7 +453,9 @@ def _regenerate_keytabs(cluster): client.wait_ambari_request(req_id, cluster.name) -def manage_host_components(cluster, instances): +@cpo.event_wrapper(True, step=_("Install services on hosts"), + param=('cluster', 0)) +def _install_services_to_hosts(cluster, instances): requests_ids = [] with _get_ambari_client(cluster) as client: clients = p_common.get_clients(cluster) @@ -432,8 +469,10 @@ def manage_host_components(cluster, instances): instance, service, 'INSTALLED')) client.wait_ambari_requests(requests_ids, cluster.name) - _regenerate_keytabs(cluster) +@cpo.event_wrapper(True, step=_("Start services on hosts"), + param=('cluster', 0)) +def _start_services_on_hosts(cluster, instances): with _get_ambari_client(cluster) as client: # all services added and installed, let's start them requests_ids = [] @@ -447,6 +486,15 @@ def manage_host_components(cluster, instances): client.wait_ambari_requests(requests_ids, cluster.name) +def manage_host_components(cluster, instances): + _install_services_to_hosts(cluster, instances) + if kerberos.is_kerberos_security_enabled(cluster): + _regenerate_keytabs(cluster) + _start_services_on_hosts(cluster, instances) + + +@cpo.event_wrapper(True, step=_("Decommission NodeManagers and DataNodes"), + param=('cluster', 0)) def decommission_hosts(cluster, instances): nodemanager_instances = filter( lambda i: p_common.NODEMANAGER in i.node_group.node_processes, @@ -481,6 +529,8 @@ def restart_resourcemanager(cluster, instance): client.restart_resourcemanager(cluster.name, instance) +@cpo.event_wrapper(True, step=_("Restart NameNodes and ResourceManagers"), + param=('cluster', 0)) def restart_nns_and_rms(cluster): nns = plugin_utils.get_instances(cluster, p_common.NAMENODE) for nn in nns: @@ -496,6 +546,7 @@ def restart_service(cluster, service_name): client.restart_service(cluster.name, service_name) +@cpo.event_wrapper(True, step=_("Remove hosts"), param=('cluster', 0)) def remove_services_from_hosts(cluster, instances): for inst in instances: LOG.debug("Stopping and removing processes from host %s" % inst.fqdn()) @@ -539,7 +590,7 @@ def _get_ambari_client(cluster): return ambari_client.AmbariClient(ambari, password=password) -def _configure_topology_data(cluster): +def _get_topology_data(cluster): if not t_helper.is_data_locality_enabled(): return {} @@ -549,39 +600,60 @@ def _configure_topology_data(cluster): return t_helper.generate_topology_map(cluster, is_node_awareness=False) +@cpo.event_wrapper(True) +def _configure_topology_data(cluster, inst, client): + topology = _get_topology_data(cluster) + client.set_rack_info_for_instance( + cluster.name, inst, topology[inst.instance_name]) + + +@cpo.event_wrapper(True, step=_("Restart HDFS and MAPREDUCE2 services"), + param=('cluster', 0)) +def _restart_hdfs_and_mapred_services(cluster, client): + client.restart_service(cluster.name, p_common.HDFS_SERVICE) + client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE) + + def configure_rack_awareness(cluster, instances): if not t_helper.is_data_locality_enabled(): return - topology = _configure_topology_data(cluster) with _get_ambari_client(cluster) as client: + cpo.add_provisioning_step( + cluster.id, _("Configure rack awareness"), len(instances)) for inst in instances: - client.set_rack_info_for_instance( - cluster.name, inst, topology[inst.instance_name]) - client.restart_service(cluster.name, p_common.HDFS_SERVICE) - client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE) + _configure_topology_data(cluster, inst, client) + _restart_hdfs_and_mapred_services(cluster, client) + + +@cpo.event_wrapper(True) +def _add_hadoop_swift_jar(instance, new_jar): + with instance.remote() as r: + code, out = r.execute_command( + "test -f %s" % new_jar, raise_when_error=False) + if code == 0: + # get ambari hadoop version (e.g.: 2.7.1.2.3.4.0-3485) + code, amb_hadoop_version = r.execute_command( + "sudo hadoop version | grep 'Hadoop' | awk '{print $2}'") + amb_hadoop_version = amb_hadoop_version.strip() + # get special code of ambari hadoop version(e.g.:2.3.4.0-3485) + amb_code = '.'.join(amb_hadoop_version.split('.')[3:]) + origin_jar = ( + "/usr/hdp/{}/hadoop-mapreduce/hadoop-openstack-{}.jar".format( + amb_code, amb_hadoop_version)) + r.execute_command("sudo cp {} {}".format(new_jar, origin_jar)) + else: + LOG.warning(_LW("The {jar_file} file cannot be found " + "in the {dir} directory so Keystone API v3 " + "is not enabled for this cluster.") + .format(jar_file="hadoop-openstack.jar", + dir="/opt")) def add_hadoop_swift_jar(instances): new_jar = "/opt/hadoop-openstack.jar" + cpo.add_provisioning_step(instances[0].cluster.id, + _("Add Hadoop Swift jar to instances"), + len(instances)) for inst in instances: - with inst.remote() as r: - code, out = r.execute_command("test -f %s" % new_jar, - raise_when_error=False) - if code == 0: - # get ambari hadoop version (e.g.: 2.7.1.2.3.4.0-3485) - code, amb_hadoop_version = r.execute_command( - "sudo hadoop version | grep 'Hadoop' | awk '{print $2}'") - amb_hadoop_version = amb_hadoop_version.strip() - # get special code of ambari hadoop version(e.g.:2.3.4.0-3485) - amb_code = '.'.join(amb_hadoop_version.split('.')[3:]) - origin_jar = ( - "/usr/hdp/%s/hadoop-mapreduce/hadoop-openstack-%s.jar" % ( - amb_code, amb_hadoop_version)) - r.execute_command("sudo cp %s %s" % (new_jar, origin_jar)) - else: - LOG.warning(_LW("The {jar_file} file cannot be found " - "in the {dir} directory so Keystone API v3 " - "is not enabled for this cluster.") - .format(jar_file="hadoop-openstack.jar", - dir="/opt")) + _add_hadoop_swift_jar(inst, new_jar) diff --git a/sahara/tests/unit/plugins/ambari/test_deploy.py b/sahara/tests/unit/plugins/ambari/test_deploy.py index 71517cce..2650cc45 100644 --- a/sahara/tests/unit/plugins/ambari/test_deploy.py +++ b/sahara/tests/unit/plugins/ambari/test_deploy.py @@ -21,11 +21,14 @@ from sahara.tests.unit import base class TestDeploy(base.SaharaTestCase): + @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') + @mock.patch('sahara.utils.cluster.check_cluster_exists') @mock.patch('sahara.plugins.utils.get_instance') @mock.patch('sahara.plugins.ambari.client.AmbariClient.get') @mock.patch('sahara.plugins.ambari.client.AmbariClient.delete') def test_cleanup_config_groups(self, client_delete, client_get, - get_instance): + get_instance, check_cluster_exists, + add_provisioning_step): def response(data): fake = mock.Mock() fake.text = jsonutils.dumps(data) @@ -60,6 +63,8 @@ class TestDeploy(base.SaharaTestCase): ] client_delete.side_effect = [response({})] + check_cluster_exists.return_value = True + deploy.cleanup_config_groups(cl, [inst1]) get_calls = [ mock.call(