Merge "Enable YARN ResourceManager HA in CDH plugin"

This commit is contained in:
Jenkins 2015-08-27 08:26:01 +00:00 committed by Gerrit Code Review
commit 32d8be795f
9 changed files with 81 additions and 5 deletions

View File

@ -455,6 +455,25 @@ class ApiService(types.BaseApiResource):
) )
return self._cmd('hdfsEnableNnHa', data=args, api_version=6) return self._cmd('hdfsEnableNnHa', data=args, api_version=6)
def enable_rm_ha(self, new_rm_host_id, zk_service_name=None):
"""Enable high availability for a YARN ResourceManager.
@param new_rm_host_id: id of the host where the second ResourceManager
will be added.
@param zk_service_name: Name of the ZooKeeper service to use for auto-
failover. If YARN service depends on a
ZooKeeper service then that ZooKeeper service
will be used for auto-failover and in that case
this parameter can be omitted.
@return: Reference to the submitted command.
@since: API v6
"""
args = dict(
newRmHostId=new_rm_host_id,
zkServiceName=zk_service_name
)
return self._cmd('enableRmHa', data=args)
class ApiServiceSetupInfo(ApiService): class ApiServiceSetupInfo(ApiService):
_ATTRIBUTES = { _ATTRIBUTES = {

View File

@ -274,7 +274,8 @@ class ClouderaUtils(object):
self._add_role(instance, role, cluster) self._add_role(instance, role, cluster)
def _add_role(self, instance, process, cluster): def _add_role(self, instance, process, cluster):
if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE']: if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE',
'YARN_STANDBYRM']:
return return
process = self.pu.convert_role_showname(process) process = self.pu.convert_role_showname(process)

View File

@ -178,6 +178,8 @@ class AbstractPluginUtils(object):
return res.Resource(configs) return res.Resource(configs)
def convert_role_showname(self, showname): def convert_role_showname(self, showname):
# Yarn ResourceManager and Standby ResourceManager will
# be converted to ResourceManager.
name_dict = { name_dict = {
'CLOUDERA_MANAGER': 'MANAGER', 'CLOUDERA_MANAGER': 'MANAGER',
'HDFS_NAMENODE': 'NAMENODE', 'HDFS_NAMENODE': 'NAMENODE',
@ -185,6 +187,7 @@ class AbstractPluginUtils(object):
'HDFS_JOURNALNODE': 'JOURNALNODE', 'HDFS_JOURNALNODE': 'JOURNALNODE',
'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE', 'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE',
'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER', 'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER',
'YARN_STANDBYRM': 'RESOURCEMANAGER',
'YARN_NODEMANAGER': 'NODEMANAGER', 'YARN_NODEMANAGER': 'NODEMANAGER',
'YARN_JOBHISTORY': 'JOBHISTORY', 'YARN_JOBHISTORY': 'JOBHISTORY',
'OOZIE_SERVER': 'OOZIE_SERVER', 'OOZIE_SERVER': 'OOZIE_SERVER',

View File

@ -81,6 +81,8 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
return cm_cluster.get_service(self.KMS_SERVICE_NAME) return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']: elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME) return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
else: else:
return super(ClouderaUtilsV540, self).get_service_by_role( return super(ClouderaUtilsV540, self).get_service_by_role(
role, cluster, instance) role, cluster, instance)
@ -427,3 +429,13 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
standby_host_id=standby_nn_host_name, standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list nameservice=self.NAME_SERVICE, jns=jn_list
) )
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)

View File

@ -169,6 +169,9 @@ def start_cluster(cluster):
if len(CU.pu.get_jns(cluster)) > 0: if len(CU.pu.get_jns(cluster)) > 0:
CU.enable_namenode_ha(cluster) CU.enable_namenode_ha(cluster)
if CU.pu.get_stdb_rm(cluster):
CU.enable_resourcemanager_ha(cluster)
_finish_cluster_starting(cluster) _finish_cluster_starting(cluster)
@ -182,6 +185,7 @@ def get_open_ports(node_group):
'HDFS_SECONDARYNAMENODE': [50090, 50495], 'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020], 'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088], 'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042], 'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888], 'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083], 'HIVE_METASTORE': [9083],

View File

@ -98,6 +98,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
def get_jns(self, cluster): def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE') return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def convert_process_configs(self, configs): def convert_process_configs(self, configs):
p_dict = { p_dict = {
"CLOUDERA": ['MANAGER'], "CLOUDERA": ['MANAGER'],

View File

@ -60,17 +60,35 @@ def validate_cluster_creating(cluster):
if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster): if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE ' raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE '
'should be enabled ' 'should be enabled '
'in affinity_mask.')) 'in anti_affinity.'))
if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster): if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE ' raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE '
'should be enabled ' 'should be enabled '
'in affinity_mask.')) 'in anti_affinity.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER') rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1: if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER', raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count) _('0 or 1'), rm_count)
stdb_rm_count = _get_inst_count(cluster, 'YARN_STANDBYRM')
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException('YARN_STANDBYRM',
_('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException('YARN_RESOURCEMANAGER',
required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='RM HA')
if 'YARN_RESOURCEMANAGER' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in anti_affinity.'))
if 'YARN_STANDBYRM' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be enabled in anti_affinity.'))
hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY') hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1: if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY', raise ex.InvalidComponentCountException('YARN_JOBHISTORY',

View File

@ -70,7 +70,8 @@ class VersionHandler(avm.AbstractVersionHandler):
"KMS": ['KMS'], "KMS": ['KMS'],
"YARN_GATEWAY": [], "YARN_GATEWAY": [],
"HDFS_GATEWAY": [], "HDFS_GATEWAY": [],
"JOURNALNODE": ['HDFS_JOURNALNODE'] "JOURNALNODE": ['HDFS_JOURNALNODE'],
"STANDBYRESOURCEMANAGER": ['YARN_STANDBYRM']
} }
def validate(self, cluster): def validate(self, cluster):

View File

@ -115,7 +115,7 @@ class HadoopProvisionError(e.SaharaException):
class NameNodeHAConfigurationError(e.SaharaException): class NameNodeHAConfigurationError(e.SaharaException):
"""Exception indicating that hdp-2.0.6 HDFS HA failed. """Exception indicating that hdp or cdh HDFS HA failed.
A message indicating the reason for failure must be provided. A message indicating the reason for failure must be provided.
""" """
@ -127,3 +127,18 @@ class NameNodeHAConfigurationError(e.SaharaException):
self.message = self.base_message % message self.message = self.base_message % message
super(NameNodeHAConfigurationError, self).__init__() super(NameNodeHAConfigurationError, self).__init__()
class ResourceManagerHAConfigurationError(e.SaharaException):
"""Exception indicating that cdh YARN HA failed.
A message indicating the reason for failure must be provided.
"""
base_message = _("ResourceManager High Availability: %s")
def __init__(self, message):
self.code = "RESOURCEMANAGER_HIGHAVAILABILITY_CONFIGURATION_FAILED"
self.message = self.base_message % message
super(ResourceManagerHAConfigurationError, self).__init__()