Merge "Added rack awareness in HDP plugin"

This commit is contained in:
Jenkins 2016-09-05 13:17:24 +00:00 committed by Gerrit Code Review
commit 518d7e3141
7 changed files with 133 additions and 24 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Added rack awareness feature for HDP plugin

View File

@ -20,7 +20,7 @@ from requests import auth
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import decomission_helper as d_helper
from sahara.plugins.ambari import requests_helper as r_helper
from sahara.plugins import exceptions as p_exc
@ -204,14 +204,14 @@ class AmbariClient(object):
def decommission_nodemanagers(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_nodemanager_decommission_request(cluster_name,
data = r_helper.build_nodemanager_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def decommission_datanodes(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_datanode_decommission_request(cluster_name,
data = r_helper.build_datanode_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
@ -237,17 +237,29 @@ class AmbariClient(object):
def restart_namenode(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_namenode_restart_request(cluster_name, instance)
data = r_helper.build_namenode_restart_request(cluster_name, instance)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def restart_resourcemanager(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_resourcemanager_restart_request(cluster_name,
data = r_helper.build_resourcemanager_restart_request(cluster_name,
instance)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def restart_service(self, cluster_name, service_name):
url = self._base_url + "/clusters/{}/services/{}".format(
cluster_name, service_name)
data = r_helper.build_stop_service_request(service_name)
resp = self.put(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
data = r_helper.build_start_service_request(service_name)
resp = self.put(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def delete_host(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/hosts/%s" % (cluster_name,
instance.fqdn())
@ -283,6 +295,17 @@ class AmbariClient(object):
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
def set_rack_info_for_instance(self, cluster_name, instance, rack_name):
url = self._base_url + "/clusters/%s/hosts/%s" % (
cluster_name, instance.fqdn())
data = {
"Hosts": {
"rack_info": rack_name
}
}
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
def get_request_info(self, cluster_name, request_id):
url = self._base_url + ("/clusters/%s/requests/%s" %
(cluster_name, request_id))

View File

@ -25,6 +25,7 @@ HDFS_SERVICE = "HDFS"
HIVE_SERVICE = "Hive"
KAFKA_SERVICE = "Kafka"
KNOX_SERVICE = "Knox"
MAPREDUCE2_SERVICE = "MAPREDUCE2"
OOZIE_SERVICE = "Oozie"
RANGER_SERVICE = "Ranger"
SLIDER_SERVICE = "Slider"

View File

@ -29,6 +29,7 @@ from sahara.plugins.ambari import configs
from sahara.plugins.ambari import ha_helper
from sahara.plugins import kerberos
from sahara.plugins import utils as plugin_utils
from sahara.topology import topology_helper as t_helper
from sahara.utils import poll_utils
@ -343,12 +344,15 @@ def _build_ambari_cluster_template(cluster):
if kerberos.is_kerberos_security_enabled(cluster):
cl_tmpl["credentials"] = _get_credentials(cluster)
cl_tmpl["security"] = {"type": "KERBEROS"}
topology = _configure_topology_data(cluster)
for ng in cluster.node_groups:
for instance in ng.instances:
host = {"fqdn": instance.fqdn()}
if t_helper.is_data_locality_enabled():
host["rack_info"] = topology[instance.instance_name]
cl_tmpl["host_groups"].append({
"name": instance.instance_name,
"hosts": [{"fqdn": instance.fqdn()}]
"hosts": [host]
})
return cl_tmpl
@ -468,18 +472,12 @@ def decommission_datanodes(cluster, instances):
def restart_namenode(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
with _get_ambari_client(cluster) as client:
client.restart_namenode(cluster.name, instance)
def restart_resourcemanager(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
with _get_ambari_client(cluster) as client:
client.restart_resourcemanager(cluster.name, instance)
@ -493,6 +491,11 @@ def restart_nns_and_rms(cluster):
restart_resourcemanager(cluster, rm)
def restart_service(cluster, service_name):
with _get_ambari_client(cluster) as client:
client.restart_service(cluster.name, service_name)
def remove_services_from_hosts(cluster, instances):
for inst in instances:
LOG.debug("Stopping and removing processes from host %s" % inst.fqdn())
@ -536,6 +539,29 @@ def _get_ambari_client(cluster):
return ambari_client.AmbariClient(ambari, password=password)
def _configure_topology_data(cluster):
if not t_helper.is_data_locality_enabled():
return {}
LOG.warning(_LW("Node group awareness is not implemented in YARN yet "
"so enable_hypervisor_awareness set to False "
"explicitly"))
return t_helper.generate_topology_map(cluster, is_node_awareness=False)
def configure_rack_awareness(cluster, instances):
if not t_helper.is_data_locality_enabled():
return
topology = _configure_topology_data(cluster)
with _get_ambari_client(cluster) as client:
for inst in instances:
client.set_rack_info_for_instance(
cluster.name, inst, topology[inst.instance_name])
client.restart_service(cluster.name, p_common.HDFS_SERVICE)
client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE)
def add_hadoop_swift_jar(instances):
new_jar = "/opt/hadoop-openstack.jar"
for inst in instances:

View File

@ -191,6 +191,7 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
deploy.add_new_hosts(cluster, instances)
deploy.manage_config_groups(cluster, instances)
deploy.manage_host_components(cluster, instances)
deploy.configure_rack_awareness(cluster, instances)
swift_helper.install_ssl_certs(instances)
deploy.add_hadoop_swift_jar(instances)

View File

@ -55,6 +55,17 @@ _COMMON_RESTART_TEMPLATE = {
]
}
_COMMON_RESTART_SERVICE_TEMPLATE = {
"RequestInfo": {
"context": "",
},
"Body": {
"ServiceInfo": {
"state": ""
}
}
}
def build_datanode_decommission_request(cluster_name, instances):
tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE)
@ -116,3 +127,19 @@ def build_resourcemanager_restart_request(cluster_name, rm_instance):
tmpl["Requests/resource_filters"][0]["hosts"] = rm_instance.fqdn()
return tmpl
def build_stop_service_request(service_name):
tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE)
tmpl["RequestInfo"]["context"] = (
"Restart %s service (stopping)" % service_name)
tmpl["Body"]["ServiceInfo"]["state"] = "INSTALLED"
return tmpl
def build_start_service_request(service_name):
tmpl = copy.deepcopy(_COMMON_RESTART_SERVICE_TEMPLATE)
tmpl["RequestInfo"]["context"] = (
"Restart %s service (starting)" % service_name)
tmpl["Body"]["ServiceInfo"]["state"] = "STARTED"
return tmpl

View File

@ -15,14 +15,14 @@
import mock
from sahara.plugins.ambari import decomission_helper
from sahara.plugins.ambari import requests_helper
from sahara.tests.unit import base
class DecommissionHelperTestCase(base.SaharaTestCase):
class RequestsHelperTestCase(base.SaharaTestCase):
def setUp(self):
super(DecommissionHelperTestCase, self).setUp()
super(RequestsHelperTestCase, self).setUp()
self.i1 = mock.MagicMock()
self.i1.fqdn.return_value = "i1"
@ -33,8 +33,8 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_datanode_decommission_request(c_name,
instances)
res = requests_helper.build_datanode_decommission_request(c_name,
instances)
self.assertEqual("i1,i2",
res["RequestInfo"]["parameters"]["excluded_hosts"])
self.assertEqual("c1",
@ -44,7 +44,7 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_nodemanager_decommission_request(
res = requests_helper.build_nodemanager_decommission_request(
c_name, instances)
self.assertEqual("i1,i2",
@ -53,16 +53,44 @@ class DecommissionHelperTestCase(base.SaharaTestCase):
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_namenode_restart_request(self):
res = decomission_helper.build_namenode_restart_request("c1", self.i1)
res = requests_helper.build_namenode_restart_request("c1", self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_resourcemanager_restart_request(self):
res = decomission_helper.build_resourcemanager_restart_request("c1",
self.i1)
res = requests_helper.build_resourcemanager_restart_request("c1",
self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_stop_service_request(self):
res = requests_helper.build_stop_service_request("HDFS")
expected = {
"RequestInfo": {
"context": "Restart HDFS service (stopping)",
},
"Body": {
"ServiceInfo": {
"state": "INSTALLED"
}
}
}
self.assertEqual(res, expected)
def test_build_start_service_request(self):
res = requests_helper.build_start_service_request("HDFS")
expected = {
"RequestInfo": {
"context": "Restart HDFS service (starting)",
},
"Body": {
"ServiceInfo": {
"state": "STARTED"
}
}
}
self.assertEqual(res, expected)