From 9f8c2197c01927fb5e9f3bb4f1a351487754f257 Mon Sep 17 00:00:00 2001 From: Michael Ionkin Date: Fri, 26 Aug 2016 14:14:04 +0300 Subject: [PATCH] Added rack awareness in HDP plugin Rack awareness is added in HDP plugin for maintain data locality feature of Hadoop. Also renamed decomission_helper.py to requests_helper.py note: Ambari allows to set up rack info through cluster creation blueprint, but it doesn't work for scaling so that is why we manually adds rack info for new instances (when scaling) through Ambari API and that is why we manually restarts HDFS and MAPREDUCE2 services after scaling. closes-bug: 1618831 Change-Id: I2a696ea534d0a93f7e88c5e22ef99b008248c875 --- ...ck_awareness_for_hdp-6e3d44468cc141a5.yaml | 3 ++ sahara/plugins/ambari/client.py | 33 +++++++++++-- sahara/plugins/ambari/common.py | 1 + sahara/plugins/ambari/deploy.py | 46 +++++++++++++++---- sahara/plugins/ambari/plugin.py | 1 + ...comission_helper.py => requests_helper.py} | 27 +++++++++++ ...sion_helper.py => test_requests_helper.py} | 46 +++++++++++++++---- sahara/topology/topology_helper.py | 4 ++ 8 files changed, 137 insertions(+), 24 deletions(-) create mode 100644 releasenotes/notes/rack_awareness_for_hdp-6e3d44468cc141a5.yaml rename sahara/plugins/ambari/{decomission_helper.py => requests_helper.py} (83%) rename sahara/tests/unit/plugins/ambari/{test_decommission_helper.py => test_requests_helper.py} (62%) diff --git a/releasenotes/notes/rack_awareness_for_hdp-6e3d44468cc141a5.yaml b/releasenotes/notes/rack_awareness_for_hdp-6e3d44468cc141a5.yaml new file mode 100644 index 0000000000..967fa8b97e --- /dev/null +++ b/releasenotes/notes/rack_awareness_for_hdp-6e3d44468cc141a5.yaml @@ -0,0 +1,3 @@ +--- +features: + - Added rack awareness feature for HDP plugin diff --git a/sahara/plugins/ambari/client.py b/sahara/plugins/ambari/client.py index 50621702de..28a9063ee7 100644 --- a/sahara/plugins/ambari/client.py +++ b/sahara/plugins/ambari/client.py @@ -20,7 +20,7 @@ from requests import auth from sahara import context from sahara.i18n import _ -from sahara.plugins.ambari import decomission_helper as d_helper +from sahara.plugins.ambari import requests_helper as r_helper from sahara.plugins import exceptions as p_exc @@ -204,14 +204,14 @@ class AmbariClient(object): def decommission_nodemanagers(self, cluster_name, instances): url = self._base_url + "/clusters/%s/requests" % cluster_name - data = d_helper.build_nodemanager_decommission_request(cluster_name, + data = r_helper.build_nodemanager_decommission_request(cluster_name, instances) resp = self.post(url, data=jsonutils.dumps(data)) self.wait_ambari_request(self.req_id(resp), cluster_name) def decommission_datanodes(self, cluster_name, instances): url = self._base_url + "/clusters/%s/requests" % cluster_name - data = d_helper.build_datanode_decommission_request(cluster_name, + data = r_helper.build_datanode_decommission_request(cluster_name, instances) resp = self.post(url, data=jsonutils.dumps(data)) self.wait_ambari_request(self.req_id(resp), cluster_name) @@ -237,17 +237,29 @@ class AmbariClient(object): def restart_namenode(self, cluster_name, instance): url = self._base_url + "/clusters/%s/requests" % cluster_name - data = d_helper.build_namenode_restart_request(cluster_name, instance) + data = r_helper.build_namenode_restart_request(cluster_name, instance) resp = self.post(url, data=jsonutils.dumps(data)) self.wait_ambari_request(self.req_id(resp), cluster_name) def restart_resourcemanager(self, cluster_name, instance): url = self._base_url + "/clusters/%s/requests" % cluster_name - data = d_helper.build_resourcemanager_restart_request(cluster_name, + data = r_helper.build_resourcemanager_restart_request(cluster_name, instance) resp = self.post(url, data=jsonutils.dumps(data)) self.wait_ambari_request(self.req_id(resp), cluster_name) + def restart_service(self, cluster_name, service_name): + url = self._base_url + "/clusters/{}/services/{}".format( + cluster_name, service_name) + + data = r_helper.build_stop_service_request(service_name) + resp = self.put(url, data=jsonutils.dumps(data)) + self.wait_ambari_request(self.req_id(resp), cluster_name) + + data = r_helper.build_start_service_request(service_name) + resp = self.put(url, data=jsonutils.dumps(data)) + self.wait_ambari_request(self.req_id(resp), cluster_name) + def delete_host(self, cluster_name, instance): url = self._base_url + "/clusters/%s/hosts/%s" % (cluster_name, instance.fqdn()) @@ -283,6 +295,17 @@ class AmbariClient(object): resp = self.put(url, data=jsonutils.dumps(data)) self.check_response(resp) + def set_rack_info_for_instance(self, cluster_name, instance, rack_name): + url = self._base_url + "/clusters/%s/hosts/%s" % ( + cluster_name, instance.fqdn()) + data = { + "Hosts": { + "rack_info": rack_name + } + } + resp = self.put(url, data=jsonutils.dumps(data)) + self.check_response(resp) + def get_request_info(self, cluster_name, request_id): url = self._base_url + ("/clusters/%s/requests/%s" % (cluster_name, request_id)) diff --git a/sahara/plugins/ambari/common.py b/sahara/plugins/ambari/common.py index 84a26eb67c..825085834f 100644 --- a/sahara/plugins/ambari/common.py +++ b/sahara/plugins/ambari/common.py @@ -25,6 +25,7 @@ HDFS_SERVICE = "HDFS" HIVE_SERVICE = "Hive" KAFKA_SERVICE = "Kafka" KNOX_SERVICE = "Knox" +MAPREDUCE2_SERVICE = "MAPREDUCE2" OOZIE_SERVICE = "Oozie" RANGER_SERVICE = "Ranger" SLIDER_SERVICE = "Slider" diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py index 590c79280c..bc0f1ac52a 100644 --- a/sahara/plugins/ambari/deploy.py +++ b/sahara/plugins/ambari/deploy.py @@ -29,6 +29,7 @@ from sahara.plugins.ambari import configs from sahara.plugins.ambari import ha_helper from sahara.plugins import kerberos from sahara.plugins import utils as plugin_utils +from sahara.topology import topology_helper as t_helper from sahara.utils import poll_utils @@ -342,12 +343,15 @@ def _build_ambari_cluster_template(cluster): if kerberos.is_kerberos_security_enabled(cluster): cl_tmpl["credentials"] = _get_credentials(cluster) cl_tmpl["security"] = {"type": "KERBEROS"} - + topology = _configure_topology_data(cluster) for ng in cluster.node_groups: for instance in ng.instances: + host = {"fqdn": instance.fqdn()} + if t_helper.is_data_locality_enabled(): + host["rack_info"] = topology[instance.instance_name] cl_tmpl["host_groups"].append({ "name": instance.instance_name, - "hosts": [{"fqdn": instance.fqdn()}] + "hosts": [host] }) return cl_tmpl @@ -467,18 +471,12 @@ def decommission_datanodes(cluster, instances): def restart_namenode(cluster, instance): - ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) - password = cluster.extra["ambari_password"] - - with ambari_client.AmbariClient(ambari, password=password) as client: + with _get_ambari_client(cluster) as client: client.restart_namenode(cluster.name, instance) def restart_resourcemanager(cluster, instance): - ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) - password = cluster.extra["ambari_password"] - - with ambari_client.AmbariClient(ambari, password=password) as client: + with _get_ambari_client(cluster) as client: client.restart_resourcemanager(cluster.name, instance) @@ -492,6 +490,11 @@ def restart_nns_and_rms(cluster): restart_resourcemanager(cluster, rm) +def restart_service(cluster, service_name): + with _get_ambari_client(cluster) as client: + client.restart_service(cluster.name, service_name) + + def remove_services_from_hosts(cluster, instances): for inst in instances: LOG.debug("Stopping and removing processes from host %s" % inst.fqdn()) @@ -535,6 +538,29 @@ def _get_ambari_client(cluster): return ambari_client.AmbariClient(ambari, password=password) +def _configure_topology_data(cluster): + if not t_helper.is_data_locality_enabled(): + return {} + + LOG.warning(_LW("Node group awareness is not implemented in YARN yet " + "so enable_hypervisor_awareness set to False " + "explicitly")) + return t_helper.generate_topology_map(cluster, is_node_awareness=False) + + +def configure_rack_awareness(cluster, instances): + if not t_helper.is_data_locality_enabled(): + return + + topology = _configure_topology_data(cluster) + with _get_ambari_client(cluster) as client: + for inst in instances: + client.set_rack_info_for_instance( + cluster.name, inst, topology[inst.instance_name]) + client.restart_service(cluster.name, p_common.HDFS_SERVICE) + client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE) + + def add_hadoop_swift_jar(instances): new_jar = "/opt/hadoop-openstack.jar" for inst in instances: diff --git a/sahara/plugins/ambari/plugin.py b/sahara/plugins/ambari/plugin.py index e074ee15e3..469dbc7418 100644 --- a/sahara/plugins/ambari/plugin.py +++ b/sahara/plugins/ambari/plugin.py @@ -191,6 +191,7 @@ class AmbariPluginProvider(p.ProvisioningPluginBase): deploy.add_new_hosts(cluster, instances) deploy.manage_config_groups(cluster, instances) deploy.manage_host_components(cluster, instances) + deploy.configure_rack_awareness(cluster, instances) swift_helper.install_ssl_certs(instances) deploy.add_hadoop_swift_jar(instances) diff --git a/sahara/plugins/ambari/decomission_helper.py b/sahara/plugins/ambari/requests_helper.py similarity index 83% rename from sahara/plugins/ambari/decomission_helper.py rename to sahara/plugins/ambari/requests_helper.py index 1e8f28b1c8..9488b993df 100644 --- a/sahara/plugins/ambari/decomission_helper.py +++ b/sahara/plugins/ambari/requests_helper.py @@ -55,6 +55,17 @@ _COMMON_RESTART_TEMPLATE = { ] } +_COMMON_RESTART_SERVICE_TEMPLATE = { + "RequestInfo": { + "context": "", + }, + "Body": { + "ServiceInfo": { + "state": "" + } + } +} + def build_datanode_decommission_request(cluster_name, instances): tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE) @@ -116,3 +127,19 @@ def build_resourcemanager_restart_request(cluster_name, rm_instance): tmpl["Requests/resource_filters"][0]["hosts"] = rm_instance.fqdn() return tmpl + + +def build_stop_service_request(service_name): + tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE) + tmpl["RequestInfo"]["context"] = ( + "Restart %s service (stopping)" % service_name) + tmpl["Body"]["ServiceInfo"]["state"] = "INSTALLED" + return tmpl + + +def build_start_service_request(service_name): + tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE) + tmpl["RequestInfo"]["context"] = ( + "Restart %s service (starting)" % service_name) + tmpl["Body"]["ServiceInfo"]["state"] = "STARTED" + return tmpl diff --git a/sahara/tests/unit/plugins/ambari/test_decommission_helper.py b/sahara/tests/unit/plugins/ambari/test_requests_helper.py similarity index 62% rename from sahara/tests/unit/plugins/ambari/test_decommission_helper.py rename to sahara/tests/unit/plugins/ambari/test_requests_helper.py index 4b685bb0fc..46c1d7e7bd 100644 --- a/sahara/tests/unit/plugins/ambari/test_decommission_helper.py +++ b/sahara/tests/unit/plugins/ambari/test_requests_helper.py @@ -15,14 +15,14 @@ import mock -from sahara.plugins.ambari import decomission_helper +from sahara.plugins.ambari import requests_helper from sahara.tests.unit import base -class DecommissionHelperTestCase(base.SaharaTestCase): +class RequestsHelperTestCase(base.SaharaTestCase): def setUp(self): - super(DecommissionHelperTestCase, self).setUp() + super(RequestsHelperTestCase, self).setUp() self.i1 = mock.MagicMock() self.i1.fqdn.return_value = "i1" @@ -33,8 +33,8 @@ class DecommissionHelperTestCase(base.SaharaTestCase): c_name = "c1" instances = [self.i1, self.i2] - res = decomission_helper.build_datanode_decommission_request(c_name, - instances) + res = requests_helper.build_datanode_decommission_request(c_name, + instances) self.assertEqual("i1,i2", res["RequestInfo"]["parameters"]["excluded_hosts"]) self.assertEqual("c1", @@ -44,7 +44,7 @@ class DecommissionHelperTestCase(base.SaharaTestCase): c_name = "c1" instances = [self.i1, self.i2] - res = decomission_helper.build_nodemanager_decommission_request( + res = requests_helper.build_nodemanager_decommission_request( c_name, instances) self.assertEqual("i1,i2", @@ -53,16 +53,44 @@ class DecommissionHelperTestCase(base.SaharaTestCase): res["RequestInfo"]["operation_level"]["cluster_name"]) def test_build_namenode_restart_request(self): - res = decomission_helper.build_namenode_restart_request("c1", self.i1) + res = requests_helper.build_namenode_restart_request("c1", self.i1) self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"]) self.assertEqual("c1", res["RequestInfo"]["operation_level"]["cluster_name"]) def test_build_resourcemanager_restart_request(self): - res = decomission_helper.build_resourcemanager_restart_request("c1", - self.i1) + res = requests_helper.build_resourcemanager_restart_request("c1", + self.i1) self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"]) self.assertEqual("c1", res["RequestInfo"]["operation_level"]["cluster_name"]) + + def test_build_stop_service_request(self): + res = requests_helper.build_stop_service_request("HDFS") + expected = { + "RequestInfo": { + "context": "Restart HDFS service (stopping)", + }, + "Body": { + "ServiceInfo": { + "state": "INSTALLED" + } + } + } + self.assertEqual(res, expected) + + def test_build_start_service_request(self): + res = requests_helper.build_start_service_request("HDFS") + expected = { + "RequestInfo": { + "context": "Restart HDFS service (starting)", + }, + "Body": { + "ServiceInfo": { + "state": "STARTED" + } + } + } + self.assertEqual(res, expected) diff --git a/sahara/topology/topology_helper.py b/sahara/topology/topology_helper.py index 96a3dc82a0..01a4d02e52 100644 --- a/sahara/topology/topology_helper.py +++ b/sahara/topology/topology_helper.py @@ -162,3 +162,7 @@ def vm_awareness_mapred_config(): def vm_awareness_all_config(): return vm_awareness_core_config() + vm_awareness_mapred_config() + + +def is_data_locality_enabled(): + return CONF.enable_data_locality