Enable HDFS NameNode High Availability with HDP 2.0.6 plugin
Extend HDP 2.0.6 plugin to include the setup and configuration of the HDFS NameNode High Availability after creating and configuring the cluster. Change-Id: I32116d42453bd3470b65bf863cf493f27301dab7 Implements: blueprint hdp-plugin-enable-hdfs-ha
This commit is contained in:
parent
67ecb73577
commit
6894d85bb8
@ -193,6 +193,24 @@ Sahara supports multi region deployment. In this case, each instance of Sahara
|
||||
should have the ``os_region_name=<region>`` property set in the
|
||||
configuration file.
|
||||
|
||||
Hadoop HDFS High Availability
|
||||
-----------------------------
|
||||
Hadoop HDFS High Availability (HDFS HA) uses 2 Namenodes in an active/standby
|
||||
architecture to ensure that HDFS will continue to work even when the active namenode fails.
|
||||
The High Availability is achieved by using a set of JournalNodes and Zookeeper servers along
|
||||
with ZooKeeper Failover Controllers (ZKFC) and some additional configurations and changes to
|
||||
HDFS and other services that use HDFS.
|
||||
|
||||
Currently HDFS HA is only supported with the HDP 2.0.6 plugin. The feature is enabled through
|
||||
a cluster_configs parameter in the cluster's JSON:
|
||||
|
||||
.. sourcecode:: cfg
|
||||
"cluster_configs": {
|
||||
"HDFSHA": {
|
||||
"hdfs.nnha": true
|
||||
}
|
||||
}
|
||||
|
||||
Plugin Capabilities
|
||||
-------------------
|
||||
The below tables provides a plugin capability matrix:
|
||||
|
@ -85,6 +85,38 @@ tags: 'hdp' and '<hdp version>' (e.g. '1.3.2').
|
||||
Also in the Image Registry you will need to specify username for an image.
|
||||
The username specified should be 'cloud-user'.
|
||||
|
||||
HDFS NameNode High Availability
|
||||
-------------------------------
|
||||
HDFS NameNode High Availability (Using the Quorum Journal Manager) can be deployed
|
||||
automatically with HDP 2.0.6. Currently the only way to deploy it is through the
|
||||
command line client (python-saharaclient) or Sahara REST API by simply adding the
|
||||
following cluster_configs parameter in the cluster's JSON :
|
||||
|
||||
.. sourcecode:: cfg
|
||||
"cluster_configs": {
|
||||
"HDFSHA": {
|
||||
"hdfs.nnha": true
|
||||
}
|
||||
}
|
||||
|
||||
Support for deploying the NameNode High Availability through Sahara Dashboard
|
||||
will be added in the future.
|
||||
|
||||
The NameNode High Availability is deployed using 2 NameNodes, one active and
|
||||
one standby. The NameNodes use a set of JOURNALNODES and ZOOKEEPER_SERVERS to
|
||||
ensure the necessary synchronization.
|
||||
|
||||
A typical Highly available HDP 2.0.6 cluster uses 2 separate NameNodes, at least 3
|
||||
JOURNALNODES and at least 3 ZOOKEEPER_SERVERS.
|
||||
|
||||
When HDFS NameNode High Availability is enabled, the plugin will perform the
|
||||
following additional validations:
|
||||
|
||||
* Ensure the existence of 2 NAMENODES processes in the cluster
|
||||
* Ensure the existence of at least 3 JOURNALNODES processes in the cluster
|
||||
* Ensure the existence of at least 3 ZOOKEEPER_SERVERS processes in the cluster
|
||||
|
||||
|
||||
Limitations
|
||||
-----------
|
||||
The HDP plugin currently has the following limitations:
|
||||
|
@ -98,3 +98,18 @@ class HadoopProvisionError(e.SaharaException):
|
||||
self.message = self.base_message % message
|
||||
|
||||
super(HadoopProvisionError, self).__init__()
|
||||
|
||||
|
||||
class NameNodeHAConfigurationError(e.SaharaException):
|
||||
"""Exception indicating that hdp-2.0.6 HDFS HA failed.
|
||||
|
||||
A message indicating the reason for failure must be provided.
|
||||
"""
|
||||
|
||||
base_message = _("NameNode High Availability: %s")
|
||||
|
||||
def __init__(self, message):
|
||||
self.code = "NAMENODE_HIGHAVAILABILITY_CONFIGURATION_FAILED"
|
||||
self.message = self.base_message % message
|
||||
|
||||
super(NameNodeHAConfigurationError, self).__init__()
|
||||
|
@ -26,6 +26,7 @@ from sahara.plugins.hdp import saharautils as utils
|
||||
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.topology import topology_helper as th
|
||||
from sahara.utils import general as g
|
||||
|
||||
|
||||
conductor = conductor.API
|
||||
@ -69,6 +70,34 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
# add service urls
|
||||
self._set_cluster_info(cluster, cluster_spec)
|
||||
|
||||
# check if HDFS HA is enabled; set it up if so
|
||||
if cluster_spec.is_hdfs_ha_enabled(cluster):
|
||||
cluster = g.change_cluster_status(cluster, "Configuring HA")
|
||||
self.configure_hdfs_ha(cluster)
|
||||
|
||||
def configure_hdfs_ha(self, cluster):
|
||||
version = cluster.hadoop_version
|
||||
handler = self.version_factory.get_version_handler(version)
|
||||
|
||||
cluster_spec = handler.get_cluster_spec(
|
||||
cluster, self._map_to_user_inputs(
|
||||
version, cluster.cluster_configs))
|
||||
hosts = self._get_servers(cluster)
|
||||
ambari_info = self.get_ambari_info(cluster_spec)
|
||||
self.cluster_ambari_mapping[cluster.name] = ambari_info
|
||||
rpm = self._get_rpm_uri(cluster_spec)
|
||||
|
||||
servers = []
|
||||
for host in hosts:
|
||||
host_role = utils.get_host_role(host)
|
||||
servers.append(
|
||||
h.HadoopServer(host, cluster_spec.node_groups[host_role],
|
||||
ambari_rpm=rpm))
|
||||
|
||||
ambari_client = handler.get_ambari_client()
|
||||
ambari_client.setup_hdfs_ha(cluster_spec, servers, ambari_info,
|
||||
cluster.name)
|
||||
|
||||
def _get_servers(self, cluster):
|
||||
servers = []
|
||||
if hasattr(cluster, 'node_groups') and cluster.node_groups is not None:
|
||||
|
@ -112,6 +112,14 @@ class ClusterSpec(object):
|
||||
|
||||
return components
|
||||
|
||||
def is_hdfs_ha_enabled(self, cluster):
|
||||
if self.version == '2.0.6':
|
||||
if cluster.cluster_configs.get('HDFSHA', False):
|
||||
if cluster.cluster_configs.HDFSHA.get('hdfs.nnha',
|
||||
False) is True:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _parse_services(self, template_json):
|
||||
handler = (vhf.VersionHandlerFactory.get_instance().
|
||||
get_version_handler(self.version))
|
||||
|
@ -198,6 +198,49 @@ class HadoopServer(object):
|
||||
# running)
|
||||
r.execute_command('ambari-agent restart', run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def set_namenode_safemode(self, jh, r):
|
||||
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
|
||||
"hdfs dfsadmin -safemode enter'".format(jh),
|
||||
run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def save_namenode_namespace(self, jh, r):
|
||||
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
|
||||
"hdfs dfsadmin -saveNamespace'".format(jh),
|
||||
run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def initialize_shared_edits(self, jh, r):
|
||||
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
|
||||
"hdfs namenode -initializeSharedEdits'".format(jh),
|
||||
run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def format_zookeeper_fc(self, jh, r):
|
||||
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
|
||||
"hdfs zkfc -formatZK'".format(jh),
|
||||
run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def bootstrap_standby_namenode(self, jh, r):
|
||||
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
|
||||
"hdfs namenode -bootstrapStandby'".format(jh),
|
||||
run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def install_httpfs(self, r):
|
||||
r.execute_command("yum -y install hadoop-httpfs", run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def start_httpfs(self, r):
|
||||
r.execute_command("service hadoop-httpfs start", run_as_root=True)
|
||||
|
||||
@saharautils.inject_remote('r')
|
||||
def write_hue_temp_file(self, filename, content, r):
|
||||
r.execute_command("echo %s > %s" % (content, filename),
|
||||
run_as_root=True)
|
||||
|
||||
def _log(self, buf):
|
||||
LOG.debug(buf)
|
||||
|
||||
|
@ -3517,6 +3517,20 @@
|
||||
"scope": "cluster"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"tag": "hdfsha",
|
||||
"properties": [
|
||||
{
|
||||
"applicable_target": "HDFSHA",
|
||||
"config_type": "boolean",
|
||||
"default_value": false,
|
||||
"description": "Enable HDFS NameNode High Availability",
|
||||
"is_optional": true,
|
||||
"name": "hdfs.nnha",
|
||||
"scope": "cluster"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -44,6 +44,16 @@
|
||||
"name" : "HDFS_CLIENT",
|
||||
"type" : "CLIENT",
|
||||
"cardinality" : "1+"
|
||||
},
|
||||
{
|
||||
"name" : "JOURNALNODE",
|
||||
"type" : "MASTER",
|
||||
"cardinality" : "1+"
|
||||
},
|
||||
{
|
||||
"name" : "ZKFC",
|
||||
"type" : "MASTER",
|
||||
"cardinality" : "1+"
|
||||
}
|
||||
],
|
||||
"configurations" : [
|
||||
@ -1824,6 +1834,12 @@
|
||||
{ "name": "oozie.service.ProxyUserService.proxyuser.hue.hosts", "value": "*" },
|
||||
{ "name": "oozie.service.ProxyUserService.proxyuser.hue.groups", "value": "*" }
|
||||
]
|
||||
},
|
||||
{
|
||||
"name" : "hdfsha",
|
||||
"properties" : [
|
||||
{ "name": "hdfs.nnha", "value": "false"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -130,10 +130,36 @@ class HdfsService(Service):
|
||||
return 'HDFS'
|
||||
|
||||
def validate(self, cluster_spec, cluster):
|
||||
# check for a single NAMENODE
|
||||
count = cluster_spec.get_deployed_node_group_count('NAMENODE')
|
||||
if count != 1:
|
||||
raise ex.InvalidComponentCountException('NAMENODE', 1, count)
|
||||
# Check NAMENODE and HDFS HA constraints
|
||||
nn_count = cluster_spec.get_deployed_node_group_count('NAMENODE')
|
||||
jn_count = cluster_spec.get_deployed_node_group_count('JOURNALNODE')
|
||||
zkfc_count = cluster_spec.get_deployed_node_group_count('ZKFC')
|
||||
|
||||
if cluster_spec.is_hdfs_ha_enabled(cluster):
|
||||
if nn_count != 2:
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
"Hadoop cluster with HDFS HA enabled requires "
|
||||
"2 NAMENODE. Actual NAMENODE count is %s" % nn_count)
|
||||
# Check the number of journalnodes
|
||||
if not (jn_count >= 3 and (jn_count % 2 == 1)):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
"JOURNALNODE count should be an odd number "
|
||||
"greater than or equal 3 for NameNode High Availability. "
|
||||
"Actual JOURNALNODE count is %s" % jn_count)
|
||||
else:
|
||||
if nn_count != 1:
|
||||
raise ex.InvalidComponentCountException('NAMENODE', 1,
|
||||
nn_count)
|
||||
# make sure that JOURNALNODE is only used when HDFS HA is enabled
|
||||
if jn_count > 0:
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
"JOURNALNODE can only be added when "
|
||||
"NameNode High Availability is enabled.")
|
||||
# make sure that ZKFC is only used when HDFS HA is enabled
|
||||
if zkfc_count > 0:
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
"ZKFC can only be added when "
|
||||
"NameNode High Availability is enabled.")
|
||||
|
||||
def finalize_configuration(self, cluster_spec):
|
||||
nn_hosts = cluster_spec.determine_component_hosts('NAMENODE')
|
||||
@ -630,6 +656,15 @@ class ZookeeperService(Service):
|
||||
raise ex.InvalidComponentCountException(
|
||||
'ZOOKEEPER_SERVER', '1+', count)
|
||||
|
||||
# check if HDFS HA is enabled
|
||||
if cluster_spec.is_hdfs_ha_enabled(cluster):
|
||||
# check if we have an odd number of zookeeper_servers > 3
|
||||
if not (count >= 3 and (count % 2 == 1)):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
"ZOOKEEPER_SERVER count should be an odd number "
|
||||
"greater than 3 for NameNode High Availability. "
|
||||
"Actual ZOOKEEPER_SERVER count is %s" % count)
|
||||
|
||||
def is_mandatory(self):
|
||||
return True
|
||||
|
||||
@ -1070,6 +1105,17 @@ class HueService(Service):
|
||||
r.write_file_to('/etc/hue/conf/hue.ini',
|
||||
hue_ini,
|
||||
True)
|
||||
# update hue.ini if HDFS HA is enabled and restart hadoop-httpfs
|
||||
# /tmp/hueini-hdfsha is written by versionhandler when HDFS is
|
||||
# enabled
|
||||
r.execute_command('[ -f /tmp/hueini-hdfsha ] && sed -i '
|
||||
'"s/hdfs.*.:8020/hdfs:\\/\\/`cat '
|
||||
'/tmp/hueini-hdfsha`/g" /etc/hue/conf/hue.ini',
|
||||
run_as_root=True)
|
||||
r.execute_command('[ -f /tmp/hueini-hdfsha ] && sed -i '
|
||||
'"s/http.*.\\/webhdfs\\/v1\\//http:\\/\\'
|
||||
'/localhost:14000\\/webhdfs\\/v1\\//g" '
|
||||
'/etc/hue/conf/hue.ini', run_as_root=True)
|
||||
|
||||
LOG.info(_LI('Uninstalling Shell, if it is installed '
|
||||
'on {0}').format(instance.fqdn()))
|
||||
@ -1094,10 +1140,15 @@ class HueService(Service):
|
||||
else:
|
||||
cmd = ''
|
||||
|
||||
cmd += '/etc/init.d/hue restart'
|
||||
cmd += 'service hue start'
|
||||
|
||||
r.execute_command(cmd, run_as_root=True)
|
||||
|
||||
# start httpfs if HDFS HA is enabled
|
||||
r.execute_command('[ -f /tmp/hueini-hdfsha ] &&'
|
||||
'service hadoop-httpfs start',
|
||||
run_as_root=True)
|
||||
|
||||
def finalize_configuration(self, cluster_spec):
|
||||
# add Hue-specific properties to the core-site file ideally only on
|
||||
# the following nodes:
|
||||
|
@ -365,7 +365,8 @@ class AmbariClient(object):
|
||||
started = True
|
||||
for items in json_result['items']:
|
||||
status = items['Tasks']['status']
|
||||
if status == 'FAILED' or status == 'ABORTED':
|
||||
if (status == 'FAILED' or status == 'ABORTED' or
|
||||
status == 'TIMEDOUT'):
|
||||
return False
|
||||
else:
|
||||
if status != 'COMPLETED':
|
||||
@ -756,7 +757,370 @@ class AmbariClient(object):
|
||||
cluster_spec, ambari_info):
|
||||
started_services = self._get_services_in_state(
|
||||
cluster_name, ambari_info, 'STARTED')
|
||||
|
||||
for service in cluster_spec.services:
|
||||
if service.deployed and service.name not in started_services:
|
||||
service.pre_service_start(cluster_spec, ambari_info,
|
||||
started_services)
|
||||
|
||||
def setup_hdfs_ha(self, cluster_spec, servers, ambari_info, name):
|
||||
|
||||
# Get HA cluster map
|
||||
hac = self._hdfs_ha_cluster_map(cluster_spec, servers,
|
||||
ambari_info, name)
|
||||
|
||||
# start active namenode in order to format and save namesapce
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_active'],
|
||||
'NAMENODE', 'STARTED')
|
||||
|
||||
hac['server_active'].set_namenode_safemode(hac['java_home'])
|
||||
hac['server_active'].save_namenode_namespace(hac['java_home'])
|
||||
|
||||
# shutdown active namenode
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_active'],
|
||||
'NAMENODE', 'INSTALLED')
|
||||
|
||||
# Install HDFS_CLIENT on namenodes, to be used later for updating
|
||||
# HDFS configs
|
||||
if hac['nn_active'] not in hac['hdfsc_hosts']:
|
||||
self._hdfs_ha_add_host_component(hac, hac['nn_active'],
|
||||
'HDFS_CLIENT')
|
||||
if hac['nn_standby'] not in hac['hdfsc_hosts']:
|
||||
self._hdfs_ha_add_host_component(hac, hac['nn_standby'],
|
||||
'HDFS_CLIENT')
|
||||
|
||||
# start the journal_nodes
|
||||
for jn in hac['jn_hosts']:
|
||||
self._hdfs_ha_update_host_component(hac, jn,
|
||||
'JOURNALNODE', 'STARTED')
|
||||
|
||||
# disable any secondary namnodes
|
||||
for snn in hac['snn_hosts']:
|
||||
self._hdfs_ha_update_host_component(hac, snn,
|
||||
'SECONDARY_NAMENODE',
|
||||
'DISABLED')
|
||||
|
||||
# get hdfs-site config tag
|
||||
hdfs_site_tag = self._hdfs_ha_get_config_tag(hac, 'hdfs-site')
|
||||
|
||||
# get hdfs-site config
|
||||
hdfs_site = self._hdfs_ha_get_config(hac, 'hdfs-site', hdfs_site_tag)
|
||||
|
||||
# update hdfs-site with HDFS HA properties
|
||||
hdfs_site_ha = self._hdfs_ha_update_hdfs_site(hac, hdfs_site)
|
||||
|
||||
# put new hdfs-site config
|
||||
self._hdfs_ha_put_config(hac, 'hdfs-site', hac['config_ver'],
|
||||
hdfs_site_ha)
|
||||
|
||||
# get core-site tag
|
||||
core_site_tag = self._hdfs_ha_get_config_tag(hac, 'core-site')
|
||||
|
||||
# get core-site config
|
||||
core_site = self._hdfs_ha_get_config(hac, 'core-site', core_site_tag)
|
||||
|
||||
# update core-site with HDFS HA properties
|
||||
core_site_ha = self._hdfs_ha_update_core_site(hac, core_site)
|
||||
|
||||
# put new HA core-site config
|
||||
self._hdfs_ha_put_config(hac, 'core-site', hac['config_ver'],
|
||||
core_site_ha)
|
||||
|
||||
# update hbase-site if Hbase is installed
|
||||
if hac['hbase_hosts']:
|
||||
hbase_site_tag = self._hdfs_ha_get_config_tag(hac, 'hbase-site')
|
||||
hbase_site = self._hdfs_ha_get_config(hac, 'hbase-site',
|
||||
hbase_site_tag)
|
||||
hbase_site_ha = self._hdfs_ha_update_hbase_site(hac, hbase_site)
|
||||
self._hdfs_ha_put_config(hac, 'hbase_site', hac['config_ver'],
|
||||
hbase_site_ha)
|
||||
|
||||
# force the deployment of HDFS HA configs on namenodes by re-installing
|
||||
# hdfs-client
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_active'],
|
||||
'HDFS_CLIENT', 'INSTALLED')
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_standby'],
|
||||
'HDFS_CLIENT', 'INSTALLED')
|
||||
|
||||
# initialize shared edits on the active namenode
|
||||
hac['server_active'].initialize_shared_edits(hac['java_home'])
|
||||
|
||||
# start zookeeper servers
|
||||
for zk in hac['zk_hosts']:
|
||||
self._hdfs_ha_update_host_component(hac, zk,
|
||||
'ZOOKEEPER_SERVER', 'STARTED')
|
||||
|
||||
# start active namenode
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_active'],
|
||||
'NAMENODE', 'STARTED')
|
||||
|
||||
# setup active namenode automatic failover
|
||||
hac['server_active'].format_zookeeper_fc(hac['java_home'])
|
||||
|
||||
# format standby namenode
|
||||
hac['server_standby'].bootstrap_standby_namenode(hac['java_home'])
|
||||
|
||||
# start namenode process on standby namenode
|
||||
self._hdfs_ha_update_host_component(hac, hac['nn_standby'],
|
||||
'NAMENODE', 'STARTED')
|
||||
|
||||
# add, install and start ZKFC on namenodes for automatic fail-over
|
||||
for nn in hac['nn_hosts']:
|
||||
self._hdfs_ha_add_host_component(hac, nn, 'ZKFC')
|
||||
self._hdfs_ha_update_host_component(hac, nn, 'ZKFC', 'INSTALLED')
|
||||
self._hdfs_ha_update_host_component(hac, nn, 'ZKFC', 'STARTED')
|
||||
|
||||
# delete any secondary namenodes
|
||||
for snn in hac['snn_hosts']:
|
||||
self._hdfs_ha_delete_host_component(hac, snn, 'SECONDARY_NAMENODE')
|
||||
|
||||
# stop journalnodes and namenodes before terminating
|
||||
# not doing so causes warnings in Ambari for stale config
|
||||
for jn in hac['jn_hosts']:
|
||||
self._hdfs_ha_update_host_component(hac, jn, 'JOURNALNODE',
|
||||
'INSTALLED')
|
||||
for nn in hac['nn_hosts']:
|
||||
self._hdfs_ha_update_host_component(hac, nn, 'NAMENODE',
|
||||
'INSTALLED')
|
||||
|
||||
# install httpfs and write temp file if HUE is installed
|
||||
if hac['hue_host']:
|
||||
self._hdfs_ha_setup_hue(hac)
|
||||
|
||||
def _hdfs_ha_cluster_map(self, cluster_spec, servers, ambari_info, name):
|
||||
|
||||
hacluster = {}
|
||||
|
||||
hacluster['name'] = name
|
||||
|
||||
hacluster['config_ver'] = 'v2'
|
||||
|
||||
# set JAVA_HOME
|
||||
global_config = cluster_spec.configurations.get('global', None)
|
||||
global_config_jh = (global_config.get('java64_home', None) or
|
||||
global_config.get('java_home', None) if
|
||||
global_config else None)
|
||||
hacluster['java_home'] = global_config_jh or '/opt/jdk1.6.0_31'
|
||||
|
||||
# set namnode ports
|
||||
hacluster['nn_rpc'] = '8020'
|
||||
hacluster['nn_ui'] = '50070'
|
||||
|
||||
hacluster['ambari_info'] = ambari_info
|
||||
|
||||
# get host lists
|
||||
hacluster['nn_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'NAMENODE')]
|
||||
hacluster['snn_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'SECONDARY_NAMENODE')]
|
||||
hacluster['jn_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'JOURNALNODE')]
|
||||
hacluster['zk_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'ZOOKEEPER_SERVER')]
|
||||
hacluster['hdfsc_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'HDFS_CLIENT')]
|
||||
hacluster['hbase_hosts'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts(
|
||||
'HBASE_MASTER')]
|
||||
hacluster['hue_host'] = [x.fqdn().lower() for x in
|
||||
cluster_spec.determine_component_hosts('HUE')]
|
||||
|
||||
# get servers for remote command execution
|
||||
# consider hacluster['nn_hosts'][0] as active namenode
|
||||
hacluster['nn_active'] = hacluster['nn_hosts'][0]
|
||||
hacluster['nn_standby'] = hacluster['nn_hosts'][1]
|
||||
# get the 2 namenode servers and hue server
|
||||
for server in servers:
|
||||
if server.instance.fqdn().lower() == hacluster['nn_active']:
|
||||
hacluster['server_active'] = server
|
||||
if server.instance.fqdn().lower() == hacluster['nn_standby']:
|
||||
hacluster['server_standby'] = server
|
||||
if hacluster['hue_host']:
|
||||
if server.instance.fqdn().lower() == hacluster['hue_host'][0]:
|
||||
hacluster['server_hue'] = server
|
||||
|
||||
return hacluster
|
||||
|
||||
def _hdfs_ha_delete_host_component(self, hac, host, component):
|
||||
|
||||
delete_service_component_url = ('http://{0}/api/v1/clusters/{1}/hosts'
|
||||
'/{2}/host_components/{3}').format(
|
||||
hac['ambari_info'].get_address(),
|
||||
hac['name'], host, component)
|
||||
|
||||
result = self._delete(delete_service_component_url, hac['ambari_info'])
|
||||
if result.status_code != 200:
|
||||
LOG.error(_LE('Configuring HDFS HA failed. %s'), result.text)
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_add_host_component(self, hac, host, component):
|
||||
add_host_component_url = ('http://{0}/api/v1/clusters/{1}'
|
||||
'/hosts/{2}/host_components/{3}').format(
|
||||
hac['ambari_info'].get_address(),
|
||||
hac['name'], host, component)
|
||||
|
||||
result = self._post(add_host_component_url, hac['ambari_info'])
|
||||
if result.status_code != 201:
|
||||
LOG.error(_LE('Configuring HDFS HA failed. %s'), result.text)
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_update_host_component(self, hac, host, component, state):
|
||||
|
||||
update_host_component_url = ('http://{0}/api/v1/clusters/{1}'
|
||||
'/hosts/{2}/host_components/{3}').format(
|
||||
hac['ambari_info'].get_address(),
|
||||
hac['name'], host, component)
|
||||
component_state = {"HostRoles": {"state": state}}
|
||||
body = json.dumps(component_state)
|
||||
|
||||
result = self._put(update_host_component_url,
|
||||
hac['ambari_info'], data=body)
|
||||
|
||||
if result.status_code == 202:
|
||||
json_result = json.loads(result.text)
|
||||
request_id = json_result['Requests']['id']
|
||||
success = self._wait_for_async_request(self._get_async_request_uri(
|
||||
hac['ambari_info'], hac['name'], request_id),
|
||||
hac['ambari_info'])
|
||||
if success:
|
||||
LOG.info(_LI("HDFS-HA: Host component updated successfully: "
|
||||
"{0} {1}").format(host, component))
|
||||
else:
|
||||
LOG.critical(_LC("HDFS-HA: Host component update failed: "
|
||||
"{0} {1}").format(host, component))
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
elif result.status_code != 200:
|
||||
LOG.error(
|
||||
_LE('Configuring HDFS HA failed. {0}').format(result.text))
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_get_config_tag(self, hac, config_name):
|
||||
|
||||
config_url = ('http://{0}/api/v1/clusters/{1}'
|
||||
'/configurations?type={2}').format(
|
||||
hac['ambari_info'].get_address(), hac['name'],
|
||||
config_name)
|
||||
|
||||
result = self._get(config_url, hac['ambari_info'])
|
||||
if result.status_code == 200:
|
||||
json_result = json.loads(result.text)
|
||||
items = json_result['items']
|
||||
return items[0]['tag']
|
||||
else:
|
||||
LOG.error(
|
||||
_LE('Configuring HDFS HA failed. {0}').format(result.text))
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_get_config(self, hac, config_name, tag):
|
||||
|
||||
config_url = ('http://{0}/api/v1/clusters/{1}'
|
||||
'/configurations?type={2}&tag={3}').format(
|
||||
hac['ambari_info'].get_address(), hac['name'],
|
||||
config_name, tag)
|
||||
|
||||
result = self._get(config_url, hac['ambari_info'])
|
||||
if result.status_code == 200:
|
||||
json_result = json.loads(result.text)
|
||||
items = json_result['items']
|
||||
return items[0]['properties']
|
||||
else:
|
||||
LOG.error(
|
||||
_LE('Configuring HDFS HA failed. {0}').format(result.text))
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_put_config(self, hac, config_name, tag, properties):
|
||||
|
||||
config_url = ('http://{0}/api/v1/clusters/{1}').format(
|
||||
hac['ambari_info'].get_address(), hac['name'])
|
||||
|
||||
body = {}
|
||||
clusters = {}
|
||||
body['Clusters'] = clusters
|
||||
body['Clusters']['desired_config'] = {}
|
||||
body['Clusters']['desired_config']['type'] = config_name
|
||||
body['Clusters']['desired_config']['tag'] = tag
|
||||
body['Clusters']['desired_config']['properties'] = properties
|
||||
|
||||
LOG.debug(("body: %s") % (body))
|
||||
|
||||
result = self._put(config_url, hac['ambari_info'],
|
||||
data=json.dumps(body))
|
||||
if result.status_code != 200:
|
||||
LOG.error(
|
||||
_LE('Configuring HDFS HA failed. {0}').format(result.text))
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
'Configuring HDFS HA failed. %s' % result.text)
|
||||
|
||||
def _hdfs_ha_update_hdfs_site(self, hac, hdfs_site):
|
||||
|
||||
hdfs_site['dfs.nameservices'] = hac['name']
|
||||
|
||||
hdfs_site['dfs.ha.namenodes.{0}'.format(
|
||||
hac['name'])] = hac['nn_active'] + ',' + hac['nn_standby']
|
||||
|
||||
hdfs_site['dfs.namenode.rpc-address.{0}.{1}'.format(
|
||||
hac['name'], hac['nn_active'])] = '{0}:{1}'.format(
|
||||
hac['nn_active'], hac['nn_rpc'])
|
||||
hdfs_site['dfs.namenode.rpc-address.{0}.{1}'.format(
|
||||
hac['name'], hac['nn_standby'])] = '{0}:{1}'.format(
|
||||
hac['nn_standby'], hac['nn_rpc'])
|
||||
hdfs_site['dfs.namenode.http-address.{0}.{1}'.format(
|
||||
hac['name'], hac['nn_active'])] = '{0}:{1}'.format(
|
||||
hac['nn_active'], hac['nn_ui'])
|
||||
hdfs_site['dfs.namenode.http-address.{0}.{1}'.format(
|
||||
hac['name'], hac['nn_standby'])] = '{0}:{1}'.format(
|
||||
hac['nn_standby'], hac['nn_ui'])
|
||||
|
||||
qjournal = ';'.join([x+':8485' for x in hac['jn_hosts']])
|
||||
hdfs_site['dfs.namenode.shared.edits.dir'] = ('qjournal://{0}/{1}'.
|
||||
format(qjournal,
|
||||
hac['name']))
|
||||
|
||||
hdfs_site['dfs.client.failover.proxy.provider.{0}'.format(
|
||||
hac['name'])] = ("org.apache.hadoop.hdfs.server.namenode.ha."
|
||||
"ConfiguredFailoverProxyProvider")
|
||||
|
||||
hdfs_site['dfs.ha.fencing.methods'] = 'shell(/bin/true)'
|
||||
|
||||
hdfs_site['dfs.ha.automatic-failover.enabled'] = 'true'
|
||||
|
||||
return hdfs_site
|
||||
|
||||
def _hdfs_ha_update_core_site(self, hac, core_site):
|
||||
|
||||
core_site['fs.defaultFS'] = 'hdfs://{0}'.format(hac['name'])
|
||||
core_site['ha.zookeeper.quorum'] = '{0}'.format(
|
||||
','.join([x+':2181' for x in hac['zk_hosts']]))
|
||||
|
||||
# if HUE is installed add some httpfs configs
|
||||
if hac['hue_host']:
|
||||
core_site['hadoop.proxyuser.httpfs.groups'] = '*'
|
||||
core_site['hadoop.proxyuser.httpfs.hosts'] = '*'
|
||||
|
||||
return core_site
|
||||
|
||||
def _hdfs_ha_update_hbase_site(self, hac, hbase_site):
|
||||
|
||||
hbase_site['hbase.rootdir'] = 'hdfs://{0}/apps/hbase/data'.format(
|
||||
hac['name'])
|
||||
return hbase_site
|
||||
|
||||
def _hdfs_ha_setup_hue(self, hac):
|
||||
|
||||
hac['server_hue'].install_httpfs()
|
||||
|
||||
# write a temp file and
|
||||
# use it when starting HUE with HDFS HA enabled
|
||||
hac['server_hue'].write_hue_temp_file('/tmp/hueini-hdfsha',
|
||||
hac['name'])
|
||||
|
@ -60,10 +60,13 @@ class InstanceInfo(object):
|
||||
|
||||
|
||||
class TestCluster(object):
|
||||
def __init__(self, node_groups):
|
||||
def __init__(self, node_groups, cluster_configs=None):
|
||||
self.plugin_name = 'hdp'
|
||||
self.hadoop_version = None
|
||||
self.cluster_configs = {}
|
||||
if cluster_configs:
|
||||
self.cluster_configs = cluster_configs
|
||||
else:
|
||||
self.cluster_configs = {}
|
||||
self.node_groups = node_groups
|
||||
self.default_image_id = '11111'
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
import mock
|
||||
import pkg_resources as pkg
|
||||
|
||||
from sahara.conductor import resource as rsc
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins.hdp import clusterspec as cs
|
||||
from sahara.plugins.hdp.versions.version_2_0_6 import services as s2
|
||||
@ -1177,6 +1178,92 @@ class ClusterSpecTestForHDP2(sahara_base.SaharaTestCase):
|
||||
# expected
|
||||
pass
|
||||
|
||||
def test_validate_hdfs_ha(self, patched):
|
||||
server1 = base.TestServer('host1', 'slave', '11111', 3,
|
||||
'111.11.1111', '222.22.2222')
|
||||
server2 = base.TestServer('host2', 'master', '11112', 3,
|
||||
'111.11.1112', '222.22.2223')
|
||||
server3 = base.TestServer('host3', 'master', '11113', 3,
|
||||
'111.11.1113', '222.22.2224')
|
||||
|
||||
node_group1 = TestNodeGroup(
|
||||
'slave', [server1], ["DATANODE", "NODEMANAGER", "HDFS_CLIENT",
|
||||
"MAPREDUCE2_CLIENT"], 1)
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "ZOOKEEPER_SERVER",
|
||||
"JOURNALNODE"], 1)
|
||||
node_group3 = TestNodeGroup(
|
||||
'master2', [server3], ["RESOURCEMANAGER", "HISTORYSERVER",
|
||||
"ZOOKEEPER_SERVER", "AMBARI_SERVER",
|
||||
"JOURNALNODE"], 1)
|
||||
|
||||
# Setup a cluster_configs resource with HDFS HA ON
|
||||
cc = {'HDFSHA': {'hdfs.nnha': True}}
|
||||
cc_r = rsc.Resource(cc)
|
||||
cluster_config = base.create_clusterspec(hdp_version='2.0.6')
|
||||
|
||||
# Test namenodes
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
# should fail due to missing second namenode
|
||||
self.assertRaises(ex.NameNodeHAConfigurationError,
|
||||
cluster_config.create_operational_config,
|
||||
cluster1, [])
|
||||
|
||||
# Test Journalnodes
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "ZOOKEEPER_SERVER"], 2)
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
# should fail due to missing odd number greater than 3 of journalnodes
|
||||
self.assertRaises(ex.NameNodeHAConfigurationError,
|
||||
cluster_config.create_operational_config,
|
||||
cluster1, [])
|
||||
|
||||
# Test zookeepers
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "JOURNALNODE"], 2)
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
# should fail due to missing odd number greater than 3 of zookeepers
|
||||
self.assertRaises(ex.NameNodeHAConfigurationError,
|
||||
cluster_config.create_operational_config,
|
||||
cluster1, [])
|
||||
|
||||
# should validate successfully now
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "JOURNALNODE",
|
||||
"ZOOKEEPER_SERVER"], 2)
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
cluster_config.create_operational_config(cluster1, [])
|
||||
|
||||
# Test when HDFS HA disabled
|
||||
cc = {'HDFSHA': {'hdfs.nnha': False}}
|
||||
cc_r = rsc.Resource(cc)
|
||||
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "JOURNALNODE",
|
||||
"ZOOKEEPER_SERVER"], 1)
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
|
||||
# should fail due to using journalnode in non HDFS HA case
|
||||
self.assertRaises(ex.NameNodeHAConfigurationError,
|
||||
cluster_config.create_operational_config,
|
||||
cluster1, [])
|
||||
|
||||
node_group2 = TestNodeGroup(
|
||||
'master1', [server2], ["NAMENODE", "ZKFC", "ZOOKEEPER_SERVER"], 1)
|
||||
|
||||
cluster1 = base.TestCluster([node_group1, node_group2, node_group3],
|
||||
cc_r)
|
||||
|
||||
# should fail due to using zkfc in non HDFS HA case
|
||||
self.assertRaises(ex.NameNodeHAConfigurationError,
|
||||
cluster_config.create_operational_config,
|
||||
cluster1, [])
|
||||
|
||||
def test_validate_yarn(self, patched):
|
||||
server = base.TestServer('host1', 'slave', '11111', 3,
|
||||
'111.11.1111', '222.22.2222')
|
||||
@ -1725,7 +1812,7 @@ class ClusterSpecTestForHDP2(sahara_base.SaharaTestCase):
|
||||
for component in service.components:
|
||||
found_components[component.name] = component
|
||||
|
||||
self.assertEqual(4, len(found_components))
|
||||
self.assertEqual(6, len(found_components))
|
||||
self._assert_component('NAMENODE', 'MASTER', "1",
|
||||
found_components['NAMENODE'])
|
||||
self._assert_component('DATANODE', 'SLAVE', "1+",
|
||||
@ -1734,6 +1821,10 @@ class ClusterSpecTestForHDP2(sahara_base.SaharaTestCase):
|
||||
found_components['SECONDARY_NAMENODE'])
|
||||
self._assert_component('HDFS_CLIENT', 'CLIENT', "1+",
|
||||
found_components['HDFS_CLIENT'])
|
||||
self._assert_component('JOURNALNODE', 'MASTER', "1+",
|
||||
found_components['JOURNALNODE'])
|
||||
self._assert_component('ZKFC', 'MASTER', "1+",
|
||||
found_components['ZKFC'])
|
||||
# TODO(jspeidel) config
|
||||
|
||||
def _assert_mrv2(self, service):
|
||||
@ -1899,7 +1990,7 @@ class ClusterSpecTestForHDP2(sahara_base.SaharaTestCase):
|
||||
self.assertEqual(cardinality, component.cardinality)
|
||||
|
||||
def _assert_configurations(self, configurations):
|
||||
self.assertEqual(16, len(configurations))
|
||||
self.assertEqual(17, len(configurations))
|
||||
self.assertIn('global', configurations)
|
||||
self.assertIn('core-site', configurations)
|
||||
self.assertIn('yarn-site', configurations)
|
||||
@ -1916,6 +2007,7 @@ class ClusterSpecTestForHDP2(sahara_base.SaharaTestCase):
|
||||
self.assertIn('hue-hdfs-site', configurations)
|
||||
self.assertIn('hue-webhcat-site', configurations)
|
||||
self.assertIn('hue-oozie-site', configurations)
|
||||
self.assertIn('hdfsha', configurations)
|
||||
|
||||
|
||||
class TestNodeGroup(object):
|
||||
|
Loading…
Reference in New Issue
Block a user