[HDP] Nameservice awareness for NNHA case

With the addition of NNHA in Kilo, Oozie continued to be pointed at only
one namenode's IP. This change directs EDP jobs to the nameservice,
which defaults to the cluster's name as sent to HDP.

As it is intended primarily for backport, to allow Sahara's EDP to
function in the NNHA case, this change takes a minimal-path approach to
resolving this issue, which can be supplemented or replaced by a more
robust solution for nameservice configuration and load balancing for all
components as time permits.

Change-Id: Icc937fcb534427f752d6db788ac10a934dfbfd4c
Closes-bug: 1470841
This commit is contained in:
Ethan Gafford 2015-07-06 17:13:17 -04:00
parent a470fa6aec
commit aced37e13f
5 changed files with 61 additions and 22 deletions

View File

@ -209,7 +209,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
info = {} info = {}
for service in cluster_spec.services: for service in cluster_spec.services:
if service.deployed: if service.deployed:
service.register_service_urls(cluster_spec, info) service.register_service_urls(cluster_spec, info, cluster)
conductor.cluster_update(context.ctx(), cluster, {'info': info}) conductor.cluster_update(context.ctx(), cluster, {'info': info})

View File

@ -24,7 +24,8 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
return 'hdfs' return 'hdfs'
def get_name_node_uri(self, cluster): def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode'] hdfs = cluster['info']['HDFS']
return hdfs.get('NameService', hdfs['NameNode'])
def get_oozie_server_uri(self, cluster): def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/" return cluster['info']['JobFlow']['Oozie'] + "/oozie/"

View File

@ -67,7 +67,7 @@ class Service(object):
def register_user_input_handlers(self, ui_handlers): def register_user_input_handlers(self, ui_handlers):
pass pass
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
return url_info return url_info
def pre_service_start(self, cluster_spec, ambari_info, started_services): def pre_service_start(self, cluster_spec, ambari_info, started_services):
@ -178,7 +178,7 @@ class HdfsService(Service):
global_config['dfs_data_dir'] = self._generate_storage_path( global_config['dfs_data_dir'] = self._generate_storage_path(
common_paths, '/hadoop/hdfs/data') common_paths, '/hadoop/hdfs/data')
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
namenode_ip = cluster_spec.determine_component_hosts( namenode_ip = cluster_spec.determine_component_hosts(
'NAMENODE').pop().management_ip 'NAMENODE').pop().management_ip
@ -267,7 +267,7 @@ class MapReduceService(Service):
if 'HISTORYSERVER' not in ng.components: if 'HISTORYSERVER' not in ng.components:
ng.components.append('HISTORYSERVER') ng.components.append('HISTORYSERVER')
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
jobtracker_ip = cluster_spec.determine_component_hosts( jobtracker_ip = cluster_spec.determine_component_hosts(
'JOBTRACKER').pop().management_ip 'JOBTRACKER').pop().management_ip
@ -483,7 +483,7 @@ class HBaseService(Service):
if count != 1: if count != 1:
raise ex.InvalidComponentCountException('HBASE_MASTER', 1, count) raise ex.InvalidComponentCountException('HBASE_MASTER', 1, count)
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
master_ip = cluster_spec.determine_component_hosts( master_ip = cluster_spec.determine_component_hosts(
'HBASE_MASTER').pop().management_ip 'HBASE_MASTER').pop().management_ip
@ -617,7 +617,7 @@ class OozieService(Service):
if 'MAPREDUCE_CLIENT' not in components: if 'MAPREDUCE_CLIENT' not in components:
components.append('MAPREDUCE_CLIENT') components.append('MAPREDUCE_CLIENT')
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
oozie_ip = cluster_spec.determine_component_hosts( oozie_ip = cluster_spec.determine_component_hosts(
'OOZIE_SERVER').pop().management_ip 'OOZIE_SERVER').pop().management_ip
port = self._get_port_from_cluster_spec(cluster_spec, 'oozie-site', port = self._get_port_from_cluster_spec(cluster_spec, 'oozie-site',
@ -686,7 +686,7 @@ class AmbariService(Service):
if count != 1: if count != 1:
raise ex.InvalidComponentCountException('AMBARI_SERVER', 1, count) raise ex.InvalidComponentCountException('AMBARI_SERVER', 1, count)
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
ambari_ip = cluster_spec.determine_component_hosts( ambari_ip = cluster_spec.determine_component_hosts(
'AMBARI_SERVER').pop().management_ip 'AMBARI_SERVER').pop().management_ip

View File

@ -72,7 +72,7 @@ class Service(object):
def register_user_input_handlers(self, ui_handlers): def register_user_input_handlers(self, ui_handlers):
pass pass
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
return url_info return url_info
def pre_service_start(self, cluster_spec, ambari_info, started_services): def pre_service_start(self, cluster_spec, ambari_info, started_services):
@ -207,7 +207,7 @@ class HdfsService(Service):
self._generate_storage_path( self._generate_storage_path(
common_paths, '/hadoop/hdfs/data')) common_paths, '/hadoop/hdfs/data'))
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
namenode_ip = cluster_spec.determine_component_hosts( namenode_ip = cluster_spec.determine_component_hosts(
'NAMENODE').pop().management_ip 'NAMENODE').pop().management_ip
@ -220,6 +220,9 @@ class HdfsService(Service):
'Web UI': 'http://%s:%s' % (namenode_ip, ui_port), 'Web UI': 'http://%s:%s' % (namenode_ip, ui_port),
'NameNode': 'hdfs://%s:%s' % (namenode_ip, nn_port) 'NameNode': 'hdfs://%s:%s' % (namenode_ip, nn_port)
} }
if cluster_spec.is_hdfs_ha_enabled(cluster):
url_info['HDFS'].update({
'NameService': 'hdfs://%s' % cluster.name})
return url_info return url_info
def finalize_ng_components(self, cluster_spec): def finalize_ng_components(self, cluster_spec):
@ -269,7 +272,7 @@ class MapReduce2Service(Service):
for prop in th.vm_awareness_mapred_config(): for prop in th.vm_awareness_mapred_config():
mapred_site_config[prop['name']] = prop['value'] mapred_site_config[prop['name']] = prop['value']
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
historyserver_ip = cluster_spec.determine_component_hosts( historyserver_ip = cluster_spec.determine_component_hosts(
'HISTORYSERVER').pop().management_ip 'HISTORYSERVER').pop().management_ip
@ -348,7 +351,7 @@ class YarnService(Service):
self._generate_storage_path(common_paths, self._generate_storage_path(common_paths,
'/hadoop/yarn/local')) '/hadoop/yarn/local'))
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
resourcemgr_ip = cluster_spec.determine_component_hosts( resourcemgr_ip = cluster_spec.determine_component_hosts(
'RESOURCEMANAGER').pop().management_ip 'RESOURCEMANAGER').pop().management_ip
@ -563,7 +566,7 @@ class HBaseService(Service):
if count != 1: if count != 1:
raise ex.InvalidComponentCountException('HBASE_MASTER', 1, count) raise ex.InvalidComponentCountException('HBASE_MASTER', 1, count)
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
master_ip = cluster_spec.determine_component_hosts( master_ip = cluster_spec.determine_component_hosts(
'HBASE_MASTER').pop().management_ip 'HBASE_MASTER').pop().management_ip
@ -724,7 +727,7 @@ class OozieService(Service):
if 'MAPREDUCE2_CLIENT' not in components: if 'MAPREDUCE2_CLIENT' not in components:
components.append('MAPREDUCE2_CLIENT') components.append('MAPREDUCE2_CLIENT')
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
oozie_ip = cluster_spec.determine_component_hosts( oozie_ip = cluster_spec.determine_component_hosts(
'OOZIE_SERVER').pop().management_ip 'OOZIE_SERVER').pop().management_ip
port = self._get_port_from_cluster_spec(cluster_spec, 'oozie-site', port = self._get_port_from_cluster_spec(cluster_spec, 'oozie-site',
@ -790,7 +793,7 @@ class AmbariService(Service):
if count != 1: if count != 1:
raise ex.InvalidComponentCountException('AMBARI_SERVER', 1, count) raise ex.InvalidComponentCountException('AMBARI_SERVER', 1, count)
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
ambari_ip = cluster_spec.determine_component_hosts( ambari_ip = cluster_spec.determine_component_hosts(
'AMBARI_SERVER').pop().management_ip 'AMBARI_SERVER').pop().management_ip
@ -1180,7 +1183,7 @@ class HueService(Service):
self._merge_configurations(cluster_spec, 'hue-oozie-site', self._merge_configurations(cluster_spec, 'hue-oozie-site',
'oozie-site') 'oozie-site')
def register_service_urls(self, cluster_spec, url_info): def register_service_urls(self, cluster_spec, url_info, cluster):
hosts = cluster_spec.determine_component_hosts('HUE') hosts = cluster_spec.determine_component_hosts('HUE')
if hosts is not None: if hosts is not None:

View File

@ -60,8 +60,10 @@ class ServicesTest(base.SaharaTestCase):
instance_mock.management_ip = '127.0.0.1' instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock( cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock]) return_value=[instance_mock])
cluster = mock.Mock(cluster_configs={}, name="hdp")
url_info = {} url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info) url_info = service.register_service_urls(cluster_spec, url_info,
cluster)
self.assertEqual(url_info['HDFS']['Web UI'], self.assertEqual(url_info['HDFS']['Web UI'],
'http://127.0.0.1:10070') 'http://127.0.0.1:10070')
self.assertEqual(url_info['HDFS']['NameNode'], self.assertEqual(url_info['HDFS']['NameNode'],
@ -83,13 +85,43 @@ class ServicesTest(base.SaharaTestCase):
instance_mock.management_ip = '127.0.0.1' instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock( cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock]) return_value=[instance_mock])
cluster = mock.Mock(cluster_configs={}, name="hdp")
url_info = {} url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info) url_info = service.register_service_urls(cluster_spec, url_info,
cluster)
self.assertEqual(url_info['HDFS']['Web UI'], self.assertEqual(url_info['HDFS']['Web UI'],
'http://127.0.0.1:10070') 'http://127.0.0.1:10070')
self.assertEqual(url_info['HDFS']['NameNode'], self.assertEqual(url_info['HDFS']['NameNode'],
'hdfs://127.0.0.1:9020') 'hdfs://127.0.0.1:9020')
def test_hdp2_ha_hdfs_service_register_urls(self):
s = self.get_services_processor('2.0.6')
service = s.create_service('HDFS')
cluster_spec = mock.Mock()
cluster_spec.configurations = {
'core-site': {
'fs.defaultFS': 'hdfs://not_expected.com:9020'
},
'hdfs-site': {
'dfs.namenode.http-address': 'http://not_expected.com:10070'
}
}
instance_mock = mock.Mock()
instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock])
cluster = mock.Mock(cluster_configs={'HDFSHA': {'hdfs.nnha': True}})
cluster.name = "hdp-cluster"
url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info,
cluster)
self.assertEqual(url_info['HDFS']['Web UI'],
'http://127.0.0.1:10070')
self.assertEqual(url_info['HDFS']['NameNode'],
'hdfs://127.0.0.1:9020')
self.assertEqual(url_info['HDFS']['NameService'],
'hdfs://hdp-cluster')
def test_create_mr_service(self): def test_create_mr_service(self):
s = self.get_services_processor() s = self.get_services_processor()
service = s.create_service('MAPREDUCE') service = s.create_service('MAPREDUCE')
@ -126,7 +158,8 @@ class ServicesTest(base.SaharaTestCase):
cluster_spec.determine_component_hosts = mock.Mock( cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock]) return_value=[instance_mock])
url_info = {} url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info) url_info = service.register_service_urls(cluster_spec, url_info,
mock.Mock())
self.assertEqual(url_info['MapReduce']['Web UI'], self.assertEqual(url_info['MapReduce']['Web UI'],
'http://127.0.0.1:10030') 'http://127.0.0.1:10030')
self.assertEqual(url_info['MapReduce']['JobTracker'], self.assertEqual(url_info['MapReduce']['JobTracker'],
@ -149,7 +182,8 @@ class ServicesTest(base.SaharaTestCase):
cluster_spec.determine_component_hosts = mock.Mock( cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock]) return_value=[instance_mock])
url_info = {} url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info) url_info = service.register_service_urls(cluster_spec, url_info,
mock.Mock())
self.assertEqual(url_info['MapReduce2']['Web UI'], self.assertEqual(url_info['MapReduce2']['Web UI'],
'http://127.0.0.1:10030') 'http://127.0.0.1:10030')
self.assertEqual(url_info['MapReduce2']['History Server'], self.assertEqual(url_info['MapReduce2']['History Server'],
@ -210,7 +244,8 @@ class ServicesTest(base.SaharaTestCase):
cluster_spec.determine_component_hosts = mock.Mock( cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock]) return_value=[instance_mock])
url_info = {} url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info) url_info = service.register_service_urls(cluster_spec, url_info,
mock.Mock())
self.assertEqual('http://127.0.0.1:21000', self.assertEqual('http://127.0.0.1:21000',
url_info['JobFlow']['Oozie']) url_info['JobFlow']['Oozie'])
@ -795,7 +830,7 @@ class ServicesTest(base.SaharaTestCase):
service = s.create_service('HBASE') service = s.create_service('HBASE')
url_info = {} url_info = {}
service.register_service_urls(cluster_spec, url_info) service.register_service_urls(cluster_spec, url_info, mock.Mock())
self.assertEqual(1, len(url_info)) self.assertEqual(1, len(url_info))
self.assertEqual(6, len(url_info['HBase'])) self.assertEqual(6, len(url_info['HBase']))
self.assertEqual('http://222.22.2222:60010/master-status', self.assertEqual('http://222.22.2222:60010/master-status',