diff --git a/sahara/plugins/cdh/client/services.py b/sahara/plugins/cdh/client/services.py index 9fb14fa9..14708aac 100644 --- a/sahara/plugins/cdh/client/services.py +++ b/sahara/plugins/cdh/client/services.py @@ -455,6 +455,25 @@ class ApiService(types.BaseApiResource): ) return self._cmd('hdfsEnableNnHa', data=args, api_version=6) + def enable_rm_ha(self, new_rm_host_id, zk_service_name=None): + """Enable high availability for a YARN ResourceManager. + + @param new_rm_host_id: id of the host where the second ResourceManager + will be added. + @param zk_service_name: Name of the ZooKeeper service to use for auto- + failover. If YARN service depends on a + ZooKeeper service then that ZooKeeper service + will be used for auto-failover and in that case + this parameter can be omitted. + @return: Reference to the submitted command. + @since: API v6 + """ + args = dict( + newRmHostId=new_rm_host_id, + zkServiceName=zk_service_name + ) + return self._cmd('enableRmHa', data=args) + class ApiServiceSetupInfo(ApiService): _ATTRIBUTES = { diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index fdd4ec5e..438795ad 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -274,7 +274,8 @@ class ClouderaUtils(object): self._add_role(instance, role, cluster) def _add_role(self, instance, process, cluster): - if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE']: + if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE', + 'YARN_STANDBYRM']: return process = self.pu.convert_role_showname(process) diff --git a/sahara/plugins/cdh/plugin_utils.py b/sahara/plugins/cdh/plugin_utils.py index ffeb60f4..b8481115 100644 --- a/sahara/plugins/cdh/plugin_utils.py +++ b/sahara/plugins/cdh/plugin_utils.py @@ -178,6 +178,8 @@ class AbstractPluginUtils(object): return res.Resource(configs) def convert_role_showname(self, showname): + # Yarn ResourceManager and Standby ResourceManager will + # be converted to ResourceManager. name_dict = { 'CLOUDERA_MANAGER': 'MANAGER', 'HDFS_NAMENODE': 'NAMENODE', @@ -185,6 +187,7 @@ class AbstractPluginUtils(object): 'HDFS_JOURNALNODE': 'JOURNALNODE', 'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE', 'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER', + 'YARN_STANDBYRM': 'RESOURCEMANAGER', 'YARN_NODEMANAGER': 'NODEMANAGER', 'YARN_JOBHISTORY': 'JOBHISTORY', 'OOZIE_SERVER': 'OOZIE_SERVER', diff --git a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py index 8ea14946..54ec974b 100644 --- a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py @@ -81,6 +81,8 @@ class ClouderaUtilsV540(cu.ClouderaUtils): return cm_cluster.get_service(self.KMS_SERVICE_NAME) elif role in ['JOURNALNODE']: return cm_cluster.get_service(self.HDFS_SERVICE_NAME) + elif role in ['YARN_STANDBYRM']: + return cm_cluster.get_service(self.YARN_SERVICE_NAME) else: return super(ClouderaUtilsV540, self).get_service_by_role( role, cluster, instance) @@ -427,3 +429,13 @@ class ClouderaUtilsV540(cu.ClouderaUtils): standby_host_id=standby_nn_host_name, nameservice=self.NAME_SERVICE, jns=jn_list ) + + @cpo.event_wrapper( + True, step=_("Enable ResourceManager HA"), param=('cluster', 1)) + @cu.cloudera_cmd + def enable_resourcemanager_ha(self, cluster): + new_rm = self.pu.get_stdb_rm(cluster) + new_rm_host_name = new_rm.fqdn() + cm_cluster = self.get_cloudera_cluster(cluster) + yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME) + yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name) diff --git a/sahara/plugins/cdh/v5_4_0/deploy.py b/sahara/plugins/cdh/v5_4_0/deploy.py index f90347c6..d91b59e8 100644 --- a/sahara/plugins/cdh/v5_4_0/deploy.py +++ b/sahara/plugins/cdh/v5_4_0/deploy.py @@ -169,6 +169,9 @@ def start_cluster(cluster): if len(CU.pu.get_jns(cluster)) > 0: CU.enable_namenode_ha(cluster) + if CU.pu.get_stdb_rm(cluster): + CU.enable_resourcemanager_ha(cluster) + _finish_cluster_starting(cluster) @@ -182,6 +185,7 @@ def get_open_ports(node_group): 'HDFS_SECONDARYNAMENODE': [50090, 50495], 'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020], 'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088], + 'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088], 'YARN_NODEMANAGER': [8040, 8041, 8042], 'YARN_JOBHISTORY': [10020, 19888], 'HIVE_METASTORE': [9083], diff --git a/sahara/plugins/cdh/v5_4_0/plugin_utils.py b/sahara/plugins/cdh/v5_4_0/plugin_utils.py index ffd85626..f6420cef 100644 --- a/sahara/plugins/cdh/v5_4_0/plugin_utils.py +++ b/sahara/plugins/cdh/v5_4_0/plugin_utils.py @@ -98,6 +98,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils): def get_jns(self, cluster): return u.get_instances(cluster, 'HDFS_JOURNALNODE') + def get_stdb_rm(self, cluster): + return u.get_instance(cluster, 'YARN_STANDBYRM') + def convert_process_configs(self, configs): p_dict = { "CLOUDERA": ['MANAGER'], diff --git a/sahara/plugins/cdh/v5_4_0/validation.py b/sahara/plugins/cdh/v5_4_0/validation.py index 5eed84f6..e6c3ee1d 100644 --- a/sahara/plugins/cdh/v5_4_0/validation.py +++ b/sahara/plugins/cdh/v5_4_0/validation.py @@ -60,17 +60,35 @@ def validate_cluster_creating(cluster): if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster): raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE ' 'should be enabled ' - 'in affinity_mask.')) + 'in anti_affinity.')) if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster): raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE ' 'should be enabled ' - 'in affinity_mask.')) + 'in anti_affinity.')) rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER') if rm_count > 1: raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER', _('0 or 1'), rm_count) + stdb_rm_count = _get_inst_count(cluster, 'YARN_STANDBYRM') + if stdb_rm_count > 1: + raise ex.InvalidComponentCountException('YARN_STANDBYRM', + _('0 or 1'), stdb_rm_count) + if stdb_rm_count > 0: + if rm_count < 1: + raise ex.RequiredServiceMissingException('YARN_RESOURCEMANAGER', + required_by='RM HA') + if zk_count < 1: + raise ex.RequiredServiceMissingException('ZOOKEEPER', + required_by='RM HA') + if 'YARN_RESOURCEMANAGER' not in _get_anti_affinity(cluster): + raise ex.ResourceManagerHAConfigurationError( + _('YARN_RESOURCEMANAGER should be enabled in anti_affinity.')) + if 'YARN_STANDBYRM' not in _get_anti_affinity(cluster): + raise ex.ResourceManagerHAConfigurationError( + _('YARN_STANDBYRM should be enabled in anti_affinity.')) + hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY') if hs_count > 1: raise ex.InvalidComponentCountException('YARN_JOBHISTORY', diff --git a/sahara/plugins/cdh/v5_4_0/versionhandler.py b/sahara/plugins/cdh/v5_4_0/versionhandler.py index aa65932e..ef311ec4 100644 --- a/sahara/plugins/cdh/v5_4_0/versionhandler.py +++ b/sahara/plugins/cdh/v5_4_0/versionhandler.py @@ -70,7 +70,8 @@ class VersionHandler(avm.AbstractVersionHandler): "KMS": ['KMS'], "YARN_GATEWAY": [], "HDFS_GATEWAY": [], - "JOURNALNODE": ['HDFS_JOURNALNODE'] + "JOURNALNODE": ['HDFS_JOURNALNODE'], + "STANDBYRESOURCEMANAGER": ['YARN_STANDBYRM'] } def validate(self, cluster): diff --git a/sahara/plugins/exceptions.py b/sahara/plugins/exceptions.py index bafb65ec..77a27d71 100644 --- a/sahara/plugins/exceptions.py +++ b/sahara/plugins/exceptions.py @@ -115,7 +115,7 @@ class HadoopProvisionError(e.SaharaException): class NameNodeHAConfigurationError(e.SaharaException): - """Exception indicating that hdp-2.0.6 HDFS HA failed. + """Exception indicating that hdp or cdh HDFS HA failed. A message indicating the reason for failure must be provided. """ @@ -127,3 +127,18 @@ class NameNodeHAConfigurationError(e.SaharaException): self.message = self.base_message % message super(NameNodeHAConfigurationError, self).__init__() + + +class ResourceManagerHAConfigurationError(e.SaharaException): + """Exception indicating that cdh YARN HA failed. + + A message indicating the reason for failure must be provided. + """ + + base_message = _("ResourceManager High Availability: %s") + + def __init__(self, message): + self.code = "RESOURCEMANAGER_HIGHAVAILABILITY_CONFIGURATION_FAILED" + self.message = self.base_message % message + + super(ResourceManagerHAConfigurationError, self).__init__()