Merge "Enable HDFS HA in Cloudera plugin"
This commit is contained in:
commit
b0e05d6337
@ -374,53 +374,6 @@ class ApiService(types.BaseApiResource):
|
|||||||
"""
|
"""
|
||||||
return self._cmd('hiveCreateHiveUserDir')
|
return self._cmd('hiveCreateHiveUserDir')
|
||||||
|
|
||||||
|
|
||||||
class ApiServiceSetupInfo(ApiService):
|
|
||||||
_ATTRIBUTES = {
|
|
||||||
'name': None,
|
|
||||||
'type': None,
|
|
||||||
'config': types.Attr(types.ApiConfig),
|
|
||||||
'roles': types.Attr(roles.ApiRole),
|
|
||||||
}
|
|
||||||
|
|
||||||
def __init__(self, name=None, type=None,
|
|
||||||
config=None, roles=None):
|
|
||||||
# The BaseApiObject expects a resource_root, which we don't care about
|
|
||||||
resource_root = None
|
|
||||||
# Unfortunately, the json key is called "type". So our input arg
|
|
||||||
# needs to be called "type" as well, despite it being a python keyword.
|
|
||||||
types.BaseApiObject.init(self, None, locals())
|
|
||||||
|
|
||||||
def set_config(self, config):
|
|
||||||
"""Set the service configuration
|
|
||||||
|
|
||||||
:param config: A dictionary of config key/value
|
|
||||||
"""
|
|
||||||
if self.config is None:
|
|
||||||
self.config = {}
|
|
||||||
self.config.update(types.config_to_api_list(config))
|
|
||||||
|
|
||||||
def add_role_info(self, role_name, role_type, host_id, config=None):
|
|
||||||
"""Add a role info
|
|
||||||
|
|
||||||
The role will be created along with the service setup.
|
|
||||||
|
|
||||||
:param role_name: Role name
|
|
||||||
:param role_type: Role type
|
|
||||||
:param host_id: The host where the role should run
|
|
||||||
:param config: (Optional) A dictionary of role config values
|
|
||||||
"""
|
|
||||||
if self.roles is None:
|
|
||||||
self.roles = []
|
|
||||||
api_config_list = (config is not None
|
|
||||||
and types.config_to_api_list(config)
|
|
||||||
or None)
|
|
||||||
self.roles.append({
|
|
||||||
'name': role_name,
|
|
||||||
'type': role_type,
|
|
||||||
'hostRef': {'hostId': host_id},
|
|
||||||
'config': api_config_list})
|
|
||||||
|
|
||||||
def enable_nn_ha(self, active_name, standby_host_id, nameservice, jns,
|
def enable_nn_ha(self, active_name, standby_host_id, nameservice, jns,
|
||||||
standby_name_dir_list=None, qj_name=None,
|
standby_name_dir_list=None, qj_name=None,
|
||||||
standby_name=None, active_fc_name=None,
|
standby_name=None, active_fc_name=None,
|
||||||
@ -501,3 +454,50 @@ class ApiServiceSetupInfo(ApiService):
|
|||||||
jns=jns
|
jns=jns
|
||||||
)
|
)
|
||||||
return self._cmd('hdfsEnableNnHa', data=args, api_version=6)
|
return self._cmd('hdfsEnableNnHa', data=args, api_version=6)
|
||||||
|
|
||||||
|
|
||||||
|
class ApiServiceSetupInfo(ApiService):
|
||||||
|
_ATTRIBUTES = {
|
||||||
|
'name': None,
|
||||||
|
'type': None,
|
||||||
|
'config': types.Attr(types.ApiConfig),
|
||||||
|
'roles': types.Attr(roles.ApiRole),
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, name=None, type=None,
|
||||||
|
config=None, roles=None):
|
||||||
|
# The BaseApiObject expects a resource_root, which we don't care about
|
||||||
|
resource_root = None
|
||||||
|
# Unfortunately, the json key is called "type". So our input arg
|
||||||
|
# needs to be called "type" as well, despite it being a python keyword.
|
||||||
|
types.BaseApiObject.init(self, None, locals())
|
||||||
|
|
||||||
|
def set_config(self, config):
|
||||||
|
"""Set the service configuration
|
||||||
|
|
||||||
|
:param config: A dictionary of config key/value
|
||||||
|
"""
|
||||||
|
if self.config is None:
|
||||||
|
self.config = {}
|
||||||
|
self.config.update(types.config_to_api_list(config))
|
||||||
|
|
||||||
|
def add_role_info(self, role_name, role_type, host_id, config=None):
|
||||||
|
"""Add a role info
|
||||||
|
|
||||||
|
The role will be created along with the service setup.
|
||||||
|
|
||||||
|
:param role_name: Role name
|
||||||
|
:param role_type: Role type
|
||||||
|
:param host_id: The host where the role should run
|
||||||
|
:param config: (Optional) A dictionary of role config values
|
||||||
|
"""
|
||||||
|
if self.roles is None:
|
||||||
|
self.roles = []
|
||||||
|
api_config_list = (config is not None
|
||||||
|
and types.config_to_api_list(config)
|
||||||
|
or None)
|
||||||
|
self.roles.append({
|
||||||
|
'name': role_name,
|
||||||
|
'type': role_type,
|
||||||
|
'hostRef': {'hostId': host_id},
|
||||||
|
'config': api_config_list})
|
||||||
|
@ -274,7 +274,7 @@ class ClouderaUtils(object):
|
|||||||
self._add_role(instance, role, cluster)
|
self._add_role(instance, role, cluster)
|
||||||
|
|
||||||
def _add_role(self, instance, process, cluster):
|
def _add_role(self, instance, process, cluster):
|
||||||
if process in ['CLOUDERA_MANAGER']:
|
if process in ['CLOUDERA_MANAGER', 'HDFS_JOURNALNODE']:
|
||||||
return
|
return
|
||||||
|
|
||||||
process = self.pu.convert_role_showname(process)
|
process = self.pu.convert_role_showname(process)
|
||||||
|
@ -148,6 +148,7 @@ class AbstractPluginUtils(object):
|
|||||||
'CLOUDERA_MANAGER': 'MANAGER',
|
'CLOUDERA_MANAGER': 'MANAGER',
|
||||||
'HDFS_NAMENODE': 'NAMENODE',
|
'HDFS_NAMENODE': 'NAMENODE',
|
||||||
'HDFS_DATANODE': 'DATANODE',
|
'HDFS_DATANODE': 'DATANODE',
|
||||||
|
'HDFS_JOURNALNODE': 'JOURNALNODE',
|
||||||
'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE',
|
'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE',
|
||||||
'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER',
|
'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER',
|
||||||
'YARN_NODEMANAGER': 'NODEMANAGER',
|
'YARN_NODEMANAGER': 'NODEMANAGER',
|
||||||
|
@ -50,6 +50,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
|
|||||||
SENTRY_SERVICE_NAME = 'sentry01'
|
SENTRY_SERVICE_NAME = 'sentry01'
|
||||||
KMS_SERVICE_NAME = 'kms01'
|
KMS_SERVICE_NAME = 'kms01'
|
||||||
CM_API_VERSION = 8
|
CM_API_VERSION = 8
|
||||||
|
NAME_SERVICE = 'nameservice01'
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
cu.ClouderaUtils.__init__(self)
|
cu.ClouderaUtils.__init__(self)
|
||||||
@ -78,6 +79,8 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
|
|||||||
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||||
elif role in ['KMS']:
|
elif role in ['KMS']:
|
||||||
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||||
|
elif role in ['JOURNALNODE']:
|
||||||
|
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||||
else:
|
else:
|
||||||
return super(ClouderaUtilsV540, self).get_service_by_role(
|
return super(ClouderaUtilsV540, self).get_service_by_role(
|
||||||
role, cluster, instance)
|
role, cluster, instance)
|
||||||
@ -401,3 +404,26 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
|
|||||||
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs)
|
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs)
|
||||||
|
|
||||||
return all_confs.get(service, {})
|
return all_confs.get(service, {})
|
||||||
|
|
||||||
|
@cpo.event_wrapper(
|
||||||
|
True, step=_("Enable NameNode HA"), param=('cluster', 1))
|
||||||
|
@cu.cloudera_cmd
|
||||||
|
def enable_namenode_ha(self, cluster):
|
||||||
|
standby_nn = self.pu.get_secondarynamenode(cluster)
|
||||||
|
standby_nn_host_name = standby_nn.fqdn()
|
||||||
|
jns = self.pu.get_jns(cluster)
|
||||||
|
jn_list = []
|
||||||
|
for index, jn in enumerate(jns):
|
||||||
|
jn_host_name = jn.fqdn()
|
||||||
|
jn_list.append({'jnHostId': jn_host_name,
|
||||||
|
'jnName': 'JN%i' % index,
|
||||||
|
'jnEditsDir': '/dfs/jn'
|
||||||
|
})
|
||||||
|
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||||
|
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||||
|
nn = hdfs.get_roles_by_type('NAMENODE')[0]
|
||||||
|
|
||||||
|
yield hdfs.enable_nn_ha(active_name=nn.name,
|
||||||
|
standby_host_id=standby_nn_host_name,
|
||||||
|
nameservice=self.NAME_SERVICE, jns=jn_list
|
||||||
|
)
|
||||||
|
@ -214,6 +214,7 @@ namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json')
|
|||||||
datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json')
|
datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json')
|
||||||
secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json')
|
secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json')
|
||||||
hdfs_gateway_confs = _load_json(path_to_config + "hdfs-gateway.json")
|
hdfs_gateway_confs = _load_json(path_to_config + "hdfs-gateway.json")
|
||||||
|
journalnode_confs = _load_json(path_to_config + 'hdfs-journalnode.json')
|
||||||
yarn_confs = _load_json(path_to_config + 'yarn-service.json')
|
yarn_confs = _load_json(path_to_config + 'yarn-service.json')
|
||||||
resourcemanager_confs = _load_json(
|
resourcemanager_confs = _load_json(
|
||||||
path_to_config + 'yarn-resourcemanager.json')
|
path_to_config + 'yarn-resourcemanager.json')
|
||||||
@ -291,6 +292,7 @@ def _get_ng_plugin_configs():
|
|||||||
cfg += _init_configs(datanode_confs, 'DATANODE', 'node')
|
cfg += _init_configs(datanode_confs, 'DATANODE', 'node')
|
||||||
cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node')
|
cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node')
|
||||||
cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node')
|
cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node')
|
||||||
|
cfg += _init_configs(journalnode_confs, 'JOURNALNODE', 'node')
|
||||||
cfg += _init_configs(yarn_confs, 'YARN', 'cluster')
|
cfg += _init_configs(yarn_confs, 'YARN', 'cluster')
|
||||||
cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node')
|
cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node')
|
||||||
cfg += _init_configs(yarn_gateway_confs, 'YARN_GATEWAY', 'node')
|
cfg += _init_configs(yarn_gateway_confs, 'YARN_GATEWAY', 'node')
|
||||||
|
@ -167,6 +167,9 @@ def start_cluster(cluster):
|
|||||||
|
|
||||||
CU.pu.configure_swift(cluster)
|
CU.pu.configure_swift(cluster)
|
||||||
|
|
||||||
|
if len(CU.pu.get_jns(cluster)) > 0:
|
||||||
|
CU.enable_namenode_ha(cluster)
|
||||||
|
|
||||||
_finish_cluster_starting(cluster)
|
_finish_cluster_starting(cluster)
|
||||||
|
|
||||||
|
|
||||||
@ -199,6 +202,7 @@ def get_open_ports(node_group):
|
|||||||
'IMPALA_STATESTORE': [25010, 24000],
|
'IMPALA_STATESTORE': [25010, 24000],
|
||||||
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
||||||
'KMS': [16000, 16001],
|
'KMS': [16000, 16001],
|
||||||
|
'JOURNALNODE': [8480, 8481, 8485]
|
||||||
}
|
}
|
||||||
|
|
||||||
for process in node_group.node_processes:
|
for process in node_group.node_processes:
|
||||||
|
@ -39,8 +39,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
|||||||
return 'http://%s:11000/oozie' % oozie_ip
|
return 'http://%s:11000/oozie' % oozie_ip
|
||||||
|
|
||||||
def get_name_node_uri(self, cluster):
|
def get_name_node_uri(self, cluster):
|
||||||
namenode_ip = CU.pu.get_namenode(cluster).fqdn()
|
if len(CU.pu.get_jns(cluster)) > 0:
|
||||||
return 'hdfs://%s:8020' % namenode_ip
|
return 'hdfs://%s' % CU.NAME_SERVICE
|
||||||
|
else:
|
||||||
|
namenode_ip = CU.pu.get_namenode(cluster).fqdn()
|
||||||
|
return 'hdfs://%s:8020' % namenode_ip
|
||||||
|
|
||||||
def get_resource_manager_uri(self, cluster):
|
def get_resource_manager_uri(self, cluster):
|
||||||
resourcemanager_ip = CU.pu.get_resourcemanager(cluster).fqdn()
|
resourcemanager_ip = CU.pu.get_resourcemanager(cluster).fqdn()
|
||||||
|
@ -45,6 +45,7 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
|
|||||||
'HOSTMONITOR': 'HM',
|
'HOSTMONITOR': 'HM',
|
||||||
'IMPALAD': 'ID',
|
'IMPALAD': 'ID',
|
||||||
'JOBHISTORY': 'JS',
|
'JOBHISTORY': 'JS',
|
||||||
|
'JOURNALNODE': 'JN',
|
||||||
'KMS': 'KMS',
|
'KMS': 'KMS',
|
||||||
'MASTER': 'M',
|
'MASTER': 'M',
|
||||||
'NAMENODE': 'NN',
|
'NAMENODE': 'NN',
|
||||||
@ -94,6 +95,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
|
|||||||
def get_kms(self, cluster):
|
def get_kms(self, cluster):
|
||||||
return u.get_instances(cluster, 'KMS')
|
return u.get_instances(cluster, 'KMS')
|
||||||
|
|
||||||
|
def get_jns(self, cluster):
|
||||||
|
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
|
||||||
|
|
||||||
def convert_process_configs(self, configs):
|
def convert_process_configs(self, configs):
|
||||||
p_dict = {
|
p_dict = {
|
||||||
"CLOUDERA": ['MANAGER'],
|
"CLOUDERA": ['MANAGER'],
|
||||||
@ -122,7 +126,8 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
|
|||||||
"SQOOP": ['SQOOP_SERVER'],
|
"SQOOP": ['SQOOP_SERVER'],
|
||||||
"KMS": ['KMS'],
|
"KMS": ['KMS'],
|
||||||
'YARN_GATEWAY': ['YARN_GATEWAY'],
|
'YARN_GATEWAY': ['YARN_GATEWAY'],
|
||||||
'HDFS_GATEWAY': ['HDFS_GATEWAY']
|
'HDFS_GATEWAY': ['HDFS_GATEWAY'],
|
||||||
|
"JOURNALNODE": ['JOURNALNODE']
|
||||||
}
|
}
|
||||||
if isinstance(configs, res.Resource):
|
if isinstance(configs, res.Resource):
|
||||||
configs = configs.to_dict()
|
configs = configs.to_dict()
|
||||||
|
@ -28,6 +28,7 @@ def validate_cluster_creating(cluster):
|
|||||||
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
|
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
|
||||||
1, mng_count)
|
1, mng_count)
|
||||||
|
|
||||||
|
zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||||
nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE')
|
nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE')
|
||||||
if nn_count != 1:
|
if nn_count != 1:
|
||||||
raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count)
|
raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count)
|
||||||
@ -44,6 +45,27 @@ def validate_cluster_creating(cluster):
|
|||||||
'HDFS_DATANODE', replicas, dn_count,
|
'HDFS_DATANODE', replicas, dn_count,
|
||||||
_('Number of datanodes must be not less than dfs_replication.'))
|
_('Number of datanodes must be not less than dfs_replication.'))
|
||||||
|
|
||||||
|
jn_count = _get_inst_count(cluster, 'HDFS_JOURNALNODE')
|
||||||
|
if jn_count > 0:
|
||||||
|
if jn_count < 3:
|
||||||
|
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||||
|
_('not less than 3'),
|
||||||
|
jn_count)
|
||||||
|
if not jn_count % 2:
|
||||||
|
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||||
|
_('be odd'), jn_count)
|
||||||
|
if zk_count < 1:
|
||||||
|
raise ex.RequiredServiceMissingException('ZOOKEEPER',
|
||||||
|
required_by='HDFS HA')
|
||||||
|
if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster):
|
||||||
|
raise ex.NameNodeHAConfigurationError(_('HDFS_SECONDARYNAMENODE '
|
||||||
|
'should be enabled '
|
||||||
|
'in affinity_mask.'))
|
||||||
|
if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster):
|
||||||
|
raise ex.NameNodeHAConfigurationError(_('HDFS_NAMENODE '
|
||||||
|
'should be enabled '
|
||||||
|
'in affinity_mask.'))
|
||||||
|
|
||||||
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||||
if rm_count > 1:
|
if rm_count > 1:
|
||||||
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
|
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
|
||||||
@ -125,7 +147,6 @@ def validate_cluster_creating(cluster):
|
|||||||
|
|
||||||
hbm_count = _get_inst_count(cluster, 'HBASE_MASTER')
|
hbm_count = _get_inst_count(cluster, 'HBASE_MASTER')
|
||||||
hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER')
|
hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER')
|
||||||
zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
|
||||||
|
|
||||||
if hbm_count >= 1:
|
if hbm_count >= 1:
|
||||||
if zk_count < 1:
|
if zk_count < 1:
|
||||||
@ -271,3 +292,7 @@ def _get_scalable_processes():
|
|||||||
|
|
||||||
def _get_inst_count(cluster, process):
|
def _get_inst_count(cluster, process):
|
||||||
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
|
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
|
||||||
|
|
||||||
|
|
||||||
|
def _get_anti_affinity(cluster):
|
||||||
|
return cluster.anti_affinity
|
||||||
|
@ -67,7 +67,8 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
"SENTRY": ['SENTRY_SERVER'],
|
"SENTRY": ['SENTRY_SERVER'],
|
||||||
"KMS": ['KMS'],
|
"KMS": ['KMS'],
|
||||||
"YARN_GATEWAY": [],
|
"YARN_GATEWAY": [],
|
||||||
"HDFS_GATEWAY": []
|
"HDFS_GATEWAY": [],
|
||||||
|
"JOURNALNODE": ['HDFS_JOURNALNODE']
|
||||||
}
|
}
|
||||||
|
|
||||||
def validate(self, cluster):
|
def validate(self, cluster):
|
||||||
|
Loading…
Reference in New Issue
Block a user