Enable YARN ResourceManager HA in CDH plugin

We add the codes about YARN RM HA implementation. If
YARN_STANDBYRM role is enabled in cluster, we will do some
validation, and call CM API enable_rm_ha to enable RM HA in the
cluster.

Partial-implements: blueprint cdh-ha-support
Change-Id: I5562a310ef5b6fffa3439b28db2ceb212cc6286f
This commit is contained in:
Ken Chen 2015-08-14 00:20:24 +08:00
parent 7d68e386e3
commit b144c7ee11
9 changed files with 81 additions and 5 deletions

View File

@ -455,6 +455,25 @@ class ApiService(types.BaseApiResource):
)
return self._cmd('hdfsEnableNnHa', data=args, api_version=6)
def enable_rm_ha(self, new_rm_host_id, zk_service_name=None):
"""Enable high availability for a YARN ResourceManager.
@param new_rm_host_id: id of the host where the second ResourceManager
will be added.
@param zk_service_name: Name of the ZooKeeper service to use for auto-
failover. If YARN service depends on a
ZooKeeper service then that ZooKeeper service
will be used for auto-failover and in that case
this parameter can be omitted.
@return: Reference to the submitted command.
@since: API v6
"""
args = dict(
newRmHostId=new_rm_host_id,
zkServiceName=zk_service_name
)
return self._cmd('enableRmHa', data=args)
class ApiServiceSetupInfo(ApiService):
_ATTRIBUTES = {

View File

@ -274,7 +274,8 @@ class ClouderaUtils(object):
self._add_role(instance, role, cluster)
def _add_role(self, instance, process, cluster):
if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE']:
if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE',
'YARN_STANDBYRM']:
return
process = self.pu.convert_role_showname(process)

View File

@ -178,6 +178,8 @@ class AbstractPluginUtils(object):
return res.Resource(configs)
def convert_role_showname(self, showname):
# Yarn ResourceManager and Standby ResourceManager will
# be converted to ResourceManager.
name_dict = {
'CLOUDERA_MANAGER': 'MANAGER',
'HDFS_NAMENODE': 'NAMENODE',
@ -185,6 +187,7 @@ class AbstractPluginUtils(object):
'HDFS_JOURNALNODE': 'JOURNALNODE',
'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE',
'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER',
'YARN_STANDBYRM': 'RESOURCEMANAGER',
'YARN_NODEMANAGER': 'NODEMANAGER',
'YARN_JOBHISTORY': 'JOBHISTORY',
'OOZIE_SERVER': 'OOZIE_SERVER',

View File

@ -81,6 +81,8 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
else:
return super(ClouderaUtilsV540, self).get_service_by_role(
role, cluster, instance)
@ -427,3 +429,13 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list
)
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)

View File

@ -169,6 +169,9 @@ def start_cluster(cluster):
if len(CU.pu.get_jns(cluster)) > 0:
CU.enable_namenode_ha(cluster)
if CU.pu.get_stdb_rm(cluster):
CU.enable_resourcemanager_ha(cluster)
_finish_cluster_starting(cluster)
@ -182,6 +185,7 @@ def get_open_ports(node_group):
'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083],

View File

@ -98,6 +98,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def convert_process_configs(self, configs):
p_dict = {
"CLOUDERA": ['MANAGER'],

View File

@ -60,17 +60,35 @@ def validate_cluster_creating(cluster):
if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE '
'should be enabled '
'in affinity_mask.'))
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE '
'should be enabled '
'in affinity_mask.'))
'in anti_affinity.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
stdb_rm_count = _get_inst_count(cluster, 'YARN_STANDBYRM')
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException('YARN_STANDBYRM',
_('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException('YARN_RESOURCEMANAGER',
required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='RM HA')
if 'YARN_RESOURCEMANAGER' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in anti_affinity.'))
if 'YARN_STANDBYRM' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be enabled in anti_affinity.'))
hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',

View File

@ -70,7 +70,8 @@ class VersionHandler(avm.AbstractVersionHandler):
"KMS": ['KMS'],
"YARN_GATEWAY": [],
"HDFS_GATEWAY": [],
"JOURNALNODE": ['HDFS_JOURNALNODE']
"JOURNALNODE": ['HDFS_JOURNALNODE'],
"STANDBYRESOURCEMANAGER": ['YARN_STANDBYRM']
}
def validate(self, cluster):

View File

@ -115,7 +115,7 @@ class HadoopProvisionError(e.SaharaException):
class NameNodeHAConfigurationError(e.SaharaException):
"""Exception indicating that hdp-2.0.6 HDFS HA failed.
"""Exception indicating that hdp or cdh HDFS HA failed.
A message indicating the reason for failure must be provided.
"""
@ -127,3 +127,18 @@ class NameNodeHAConfigurationError(e.SaharaException):
self.message = self.base_message % message
super(NameNodeHAConfigurationError, self).__init__()
class ResourceManagerHAConfigurationError(e.SaharaException):
"""Exception indicating that cdh YARN HA failed.
A message indicating the reason for failure must be provided.
"""
base_message = _("ResourceManager High Availability: %s")
def __init__(self, message):
self.code = "RESOURCEMANAGER_HIGHAVAILABILITY_CONFIGURATION_FAILED"
self.message = self.base_message % message
super(ResourceManagerHAConfigurationError, self).__init__()