From 086ebb7658e396cf20440e3b79da34d5e95a51ba Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Wed, 8 Jul 2015 17:14:37 +0800 Subject: [PATCH] Enable HDFS HA in Cloudera plugin If HDFS_JOURNALNODE roles are selected, HDFS is assumed enabled. enable_nn_ha will be called to do the work. We add the involved implementation and cluster validation codes. We also modified the CM client codes, for we put enable_nn_ha method in wrong class before. Partial-implements: blueprint cdh-ha-support Change-Id: Id5c47d485e53b867d93ea9b4c94367587dc93c2a --- sahara/plugins/cdh/client/services.py | 94 ++++++++++----------- sahara/plugins/cdh/cloudera_utils.py | 2 +- sahara/plugins/cdh/plugin_utils.py | 1 + sahara/plugins/cdh/v5_4_0/cloudera_utils.py | 26 ++++++ sahara/plugins/cdh/v5_4_0/config_helper.py | 2 + sahara/plugins/cdh/v5_4_0/deploy.py | 4 + sahara/plugins/cdh/v5_4_0/edp_engine.py | 7 +- sahara/plugins/cdh/v5_4_0/plugin_utils.py | 7 +- sahara/plugins/cdh/v5_4_0/validation.py | 27 +++++- sahara/plugins/cdh/v5_4_0/versionhandler.py | 3 +- 10 files changed, 120 insertions(+), 53 deletions(-) diff --git a/sahara/plugins/cdh/client/services.py b/sahara/plugins/cdh/client/services.py index 5cfc763e..9fb14fa9 100644 --- a/sahara/plugins/cdh/client/services.py +++ b/sahara/plugins/cdh/client/services.py @@ -374,53 +374,6 @@ class ApiService(types.BaseApiResource): """ return self._cmd('hiveCreateHiveUserDir') - -class ApiServiceSetupInfo(ApiService): - _ATTRIBUTES = { - 'name': None, - 'type': None, - 'config': types.Attr(types.ApiConfig), - 'roles': types.Attr(roles.ApiRole), - } - - def __init__(self, name=None, type=None, - config=None, roles=None): - # The BaseApiObject expects a resource_root, which we don't care about - resource_root = None - # Unfortunately, the json key is called "type". So our input arg - # needs to be called "type" as well, despite it being a python keyword. - types.BaseApiObject.init(self, None, locals()) - - def set_config(self, config): - """Set the service configuration - - :param config: A dictionary of config key/value - """ - if self.config is None: - self.config = {} - self.config.update(types.config_to_api_list(config)) - - def add_role_info(self, role_name, role_type, host_id, config=None): - """Add a role info - - The role will be created along with the service setup. - - :param role_name: Role name - :param role_type: Role type - :param host_id: The host where the role should run - :param config: (Optional) A dictionary of role config values - """ - if self.roles is None: - self.roles = [] - api_config_list = (config is not None - and types.config_to_api_list(config) - or None) - self.roles.append({ - 'name': role_name, - 'type': role_type, - 'hostRef': {'hostId': host_id}, - 'config': api_config_list}) - def enable_nn_ha(self, active_name, standby_host_id, nameservice, jns, standby_name_dir_list=None, qj_name=None, standby_name=None, active_fc_name=None, @@ -501,3 +454,50 @@ class ApiServiceSetupInfo(ApiService): jns=jns ) return self._cmd('hdfsEnableNnHa', data=args, api_version=6) + + +class ApiServiceSetupInfo(ApiService): + _ATTRIBUTES = { + 'name': None, + 'type': None, + 'config': types.Attr(types.ApiConfig), + 'roles': types.Attr(roles.ApiRole), + } + + def __init__(self, name=None, type=None, + config=None, roles=None): + # The BaseApiObject expects a resource_root, which we don't care about + resource_root = None + # Unfortunately, the json key is called "type". So our input arg + # needs to be called "type" as well, despite it being a python keyword. + types.BaseApiObject.init(self, None, locals()) + + def set_config(self, config): + """Set the service configuration + + :param config: A dictionary of config key/value + """ + if self.config is None: + self.config = {} + self.config.update(types.config_to_api_list(config)) + + def add_role_info(self, role_name, role_type, host_id, config=None): + """Add a role info + + The role will be created along with the service setup. + + :param role_name: Role name + :param role_type: Role type + :param host_id: The host where the role should run + :param config: (Optional) A dictionary of role config values + """ + if self.roles is None: + self.roles = [] + api_config_list = (config is not None + and types.config_to_api_list(config) + or None) + self.roles.append({ + 'name': role_name, + 'type': role_type, + 'hostRef': {'hostId': host_id}, + 'config': api_config_list}) diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index 1201e66e..fdd4ec5e 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -274,7 +274,7 @@ class ClouderaUtils(object): self._add_role(instance, role, cluster) def _add_role(self, instance, process, cluster): - if process in ['CLOUDERA_MANAGER']: + if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE']: return process = self.pu.convert_role_showname(process) diff --git a/sahara/plugins/cdh/plugin_utils.py b/sahara/plugins/cdh/plugin_utils.py index de629cc1..2afde3f9 100644 --- a/sahara/plugins/cdh/plugin_utils.py +++ b/sahara/plugins/cdh/plugin_utils.py @@ -148,6 +148,7 @@ class AbstractPluginUtils(object): 'CLOUDERA_MANAGER': 'MANAGER', 'HDFS_NAMENODE': 'NAMENODE', 'HDFS_DATANODE': 'DATANODE', + 'HDFS_JOURNALNODE': 'JOURNALNODE', 'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE', 'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER', 'YARN_NODEMANAGER': 'NODEMANAGER', diff --git a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py index 38984100..6cb79849 100644 --- a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py @@ -66,6 +66,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils): SENTRY_SERVICE_NAME = 'sentry01' KMS_SERVICE_NAME = 'kms01' CM_API_VERSION = 8 + NAME_SERVICE = 'nameservice01' def __init__(self): cu.ClouderaUtils.__init__(self) @@ -94,6 +95,8 @@ class ClouderaUtilsV540(cu.ClouderaUtils): return cm_cluster.get_service(self.IMPALA_SERVICE_NAME) elif role in ['KMS']: return cm_cluster.get_service(self.KMS_SERVICE_NAME) + elif role in ['JOURNALNODE']: + return cm_cluster.get_service(self.HDFS_SERVICE_NAME) else: return super(ClouderaUtilsV540, self).get_service_by_role( role, cluster, instance) @@ -417,3 +420,26 @@ class ClouderaUtilsV540(cu.ClouderaUtils): all_confs = _merge_dicts(all_confs, ng_default_confs) return all_confs.get(service, {}) + + @cpo.event_wrapper( + True, step=_("Enable NameNode HA"), param=('cluster', 1)) + @cu.cloudera_cmd + def enable_namenode_ha(self, cluster): + standby_nn = self.pu.get_secondarynamenode(cluster) + standby_nn_host_name = standby_nn.fqdn() + jns = self.pu.get_jns(cluster) + jn_list = [] + for index, jn in enumerate(jns): + jn_host_name = jn.fqdn() + jn_list.append({'jnHostId': jn_host_name, + 'jnName': 'JN%i' % index, + 'jnEditsDir': '/dfs/jn' + }) + cm_cluster = self.get_cloudera_cluster(cluster) + hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME) + nn = hdfs.get_roles_by_type('NAMENODE')[0] + + yield hdfs.enable_nn_ha(active_name=nn.name, + standby_host_id=standby_nn_host_name, + nameservice=self.NAME_SERVICE, jns=jn_list + ) diff --git a/sahara/plugins/cdh/v5_4_0/config_helper.py b/sahara/plugins/cdh/v5_4_0/config_helper.py index 948debc9..bc6d4047 100644 --- a/sahara/plugins/cdh/v5_4_0/config_helper.py +++ b/sahara/plugins/cdh/v5_4_0/config_helper.py @@ -214,6 +214,7 @@ namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json') datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json') secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json') hdfs_gateway_confs = _load_json(path_to_config + "hdfs-gateway.json") +journalnode_confs = _load_json(path_to_config + 'hdfs-journalnode.json') yarn_confs = _load_json(path_to_config + 'yarn-service.json') resourcemanager_confs = _load_json( path_to_config + 'yarn-resourcemanager.json') @@ -291,6 +292,7 @@ def _get_ng_plugin_configs(): cfg += _init_configs(datanode_confs, 'DATANODE', 'node') cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node') cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node') + cfg += _init_configs(journalnode_confs, 'JOURNALNODE', 'node') cfg += _init_configs(yarn_confs, 'YARN', 'cluster') cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node') cfg += _init_configs(yarn_gateway_confs, 'YARN_GATEWAY', 'node') diff --git a/sahara/plugins/cdh/v5_4_0/deploy.py b/sahara/plugins/cdh/v5_4_0/deploy.py index 952f8c02..ab0212d7 100644 --- a/sahara/plugins/cdh/v5_4_0/deploy.py +++ b/sahara/plugins/cdh/v5_4_0/deploy.py @@ -167,6 +167,9 @@ def start_cluster(cluster): CU.pu.configure_swift(cluster) + if len(CU.pu.get_jns(cluster)) > 0: + CU.enable_namenode_ha(cluster) + _finish_cluster_starting(cluster) @@ -199,6 +202,7 @@ def get_open_ports(node_group): 'IMPALA_STATESTORE': [25010, 24000], 'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000], 'KMS': [16000, 16001], + 'JOURNALNODE': [8480, 8481, 8485] } for process in node_group.node_processes: diff --git a/sahara/plugins/cdh/v5_4_0/edp_engine.py b/sahara/plugins/cdh/v5_4_0/edp_engine.py index e93096ba..a6fee175 100644 --- a/sahara/plugins/cdh/v5_4_0/edp_engine.py +++ b/sahara/plugins/cdh/v5_4_0/edp_engine.py @@ -39,8 +39,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): return 'http://%s:11000/oozie' % oozie_ip def get_name_node_uri(self, cluster): - namenode_ip = CU.pu.get_namenode(cluster).fqdn() - return 'hdfs://%s:8020' % namenode_ip + if len(CU.pu.get_jns(cluster)) > 0: + return 'hdfs://%s' % CU.NAME_SERVICE + else: + namenode_ip = CU.pu.get_namenode(cluster).fqdn() + return 'hdfs://%s:8020' % namenode_ip def get_resource_manager_uri(self, cluster): resourcemanager_ip = CU.pu.get_resourcemanager(cluster).fqdn() diff --git a/sahara/plugins/cdh/v5_4_0/plugin_utils.py b/sahara/plugins/cdh/v5_4_0/plugin_utils.py index 7831b046..ffd85626 100644 --- a/sahara/plugins/cdh/v5_4_0/plugin_utils.py +++ b/sahara/plugins/cdh/v5_4_0/plugin_utils.py @@ -45,6 +45,7 @@ class PluginUtilsV540(pu.AbstractPluginUtils): 'HOSTMONITOR': 'HM', 'IMPALAD': 'ID', 'JOBHISTORY': 'JS', + 'JOURNALNODE': 'JN', 'KMS': 'KMS', 'MASTER': 'M', 'NAMENODE': 'NN', @@ -94,6 +95,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils): def get_kms(self, cluster): return u.get_instances(cluster, 'KMS') + def get_jns(self, cluster): + return u.get_instances(cluster, 'HDFS_JOURNALNODE') + def convert_process_configs(self, configs): p_dict = { "CLOUDERA": ['MANAGER'], @@ -122,7 +126,8 @@ class PluginUtilsV540(pu.AbstractPluginUtils): "SQOOP": ['SQOOP_SERVER'], "KMS": ['KMS'], 'YARN_GATEWAY': ['YARN_GATEWAY'], - 'HDFS_GATEWAY': ['HDFS_GATEWAY'] + 'HDFS_GATEWAY': ['HDFS_GATEWAY'], + "JOURNALNODE": ['JOURNALNODE'] } if isinstance(configs, res.Resource): configs = configs.to_dict() diff --git a/sahara/plugins/cdh/v5_4_0/validation.py b/sahara/plugins/cdh/v5_4_0/validation.py index c02e69cb..24ee9aa6 100644 --- a/sahara/plugins/cdh/v5_4_0/validation.py +++ b/sahara/plugins/cdh/v5_4_0/validation.py @@ -28,6 +28,7 @@ def validate_cluster_creating(cluster): raise ex.InvalidComponentCountException('CLOUDERA_MANAGER', 1, mng_count) + zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER') nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE') if nn_count != 1: raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count) @@ -44,6 +45,27 @@ def validate_cluster_creating(cluster): 'HDFS_DATANODE', replicas, dn_count, _('Number of datanodes must be not less than dfs_replication.')) + jn_count = _get_inst_count(cluster, 'HDFS_JOURNALNODE') + if jn_count > 0: + if jn_count < 3: + raise ex.InvalidComponentCountException('HDFS_JOURNALNODE', + _('not less than 3'), + jn_count) + if not jn_count % 2: + raise ex.InvalidComponentCountException('HDFS_JOURNALNODE', + _('be odd'), jn_count) + if zk_count < 1: + raise ex.RequiredServiceMissingException('ZOOKEEPER', + required_by='HDFS HA') + if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster): + raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE ' + 'should be enabled ' + 'in affinity_mask.')) + if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster): + raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE ' + 'should be enabled ' + 'in affinity_mask.')) + rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER') if rm_count > 1: raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER', @@ -125,7 +147,6 @@ def validate_cluster_creating(cluster): hbm_count = _get_inst_count(cluster, 'HBASE_MASTER') hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER') - zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER') if hbm_count >= 1: if zk_count < 1: @@ -271,3 +292,7 @@ def _get_scalable_processes(): def _get_inst_count(cluster, process): return sum([ng.count for ng in u.get_node_groups(cluster, process)]) + + +def _get_anti_affinity(cluster): + return cluster.anti_affinity diff --git a/sahara/plugins/cdh/v5_4_0/versionhandler.py b/sahara/plugins/cdh/v5_4_0/versionhandler.py index f6a95fe9..2483bb9f 100644 --- a/sahara/plugins/cdh/v5_4_0/versionhandler.py +++ b/sahara/plugins/cdh/v5_4_0/versionhandler.py @@ -67,7 +67,8 @@ class VersionHandler(avm.AbstractVersionHandler): "SENTRY": ['SENTRY_SERVER'], "KMS": ['KMS'], "YARN_GATEWAY": [], - "HDFS_GATEWAY": [] + "HDFS_GATEWAY": [], + "JOURNALNODE": ['HDFS_JOURNALNODE'] } def validate(self, cluster):