Enable HDP 2 deployment leveraging HDP plugin

This commit introduces the ability to deploy HDP 2.0.6 clusters.
Both basic cluster provisioning functions (create, scale) as well
as EDP have been implemented.

There is some follow on work required to complete the integration:

- Topology/data locality support
- Verify cinder support
- Refactoring (there is ample opportunity for extracting parent
classes for common code, etc)

I have filed bug #1277653 to track these items.

Most of the items above were not undertaken as part of this commit in
an attempt to limit the scope/size of the commit as well to allow for
an iterative approach for achieving full functionality.

partially implements blueprint hdp-hdp2-support

Change-Id: I1cc483f05b65e1df0fb630d5e819ef23ab407492
This commit is contained in:
Jon Maron 2014-02-07 14:03:48 -05:00
parent 881f484baf
commit 7e9d14b438
23 changed files with 8955 additions and 407 deletions

View File

@ -128,6 +128,13 @@ Returns the instance object for the host running the Oozie server (this service
*Returns*: The Oozie server instance object
get_resource_manager_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the mapred resource manager (e.g Hadoop 1.x - jobtracker, Hadoop 2.x - yarn resource manager)
*Returns*: The resource manager URI
Object Model
============

View File

@ -38,8 +38,13 @@ def get_namenode(cluster):
return get_instance(cluster, "namenode")
#TODO(jmaron): name change?
def get_jobtracker(cluster):
return get_instance(cluster, "jobtracker")
instance = get_instance(cluster, "jobtracker")
if not instance:
instance = get_instance(cluster, "resourcemanager")
return instance
def get_oozie(cluster):

View File

@ -83,8 +83,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def get_node_processes(self, hadoop_version):
node_processes = {}
version_handler = \
self.version_factory.get_version_handler(hadoop_version)
version_handler = (
self.version_factory.get_version_handler(hadoop_version))
default_config = version_handler.get_default_cluster_configuration()
for service in default_config.services:
components = []
@ -142,6 +142,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "oozie_server")
def get_resource_manager_uri(self, cluster):
version_handler = (
self.version_factory.get_version_handler(cluster.hadoop_version))
return version_handler.get_resource_manager_uri(cluster)
def update_infra(self, cluster):
pass
@ -183,8 +188,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def _set_ambari_credentials(self, cluster_spec, ambari_info, version):
services = cluster_spec.services
ambari_client = self.version_factory.get_version_handler(version).\
get_ambari_client()
ambari_client = (self.version_factory.get_version_handler(version).
get_ambari_client())
for service in services:
if service.name == 'AMBARI':
is_admin_provided = False
@ -259,9 +264,9 @@ class AmbariPlugin(p.ProvisioningPluginBase):
return 'Hortonworks Data Platform'
def get_description(self):
return 'The Hortonworks OpenStack plugin works with project ' \
'Savanna to automate the deployment of the Hortonworks data' \
' platform on OpenStack based public & private clouds'
return ('The Hortonworks OpenStack plugin works with project '
'Savanna to automate the deployment of the Hortonworks data'
' platform on OpenStack based public & private clouds')
def validate(self, cluster):
# creating operational config results in validation

View File

@ -134,7 +134,12 @@ class HadoopServer:
LOG.info(
'{0}: Starting Ambari Agent ...'.format(self.instance.hostname()))
r.execute_command('ambari-agent start', run_as_root=True)
# If the HDP 2 ambari agent is pre-installed on an image, the agent
# will start up during instance launch and therefore the agent
# registration will fail. It is therefore more appropriate to call
# restart since it will either start (if stopped) or restart (if
# running)
r.execute_command('ambari-agent restart', run_as_root=True)
def _log(self, buf):
LOG.debug(buf)

View File

@ -57,3 +57,7 @@ class AbstractVersionHandler():
@abc.abstractmethod
def get_services_processor(self):
return
@abc.abstractmethod
def get_resource_manager_uri(self, cluster):
return

View File

@ -242,8 +242,8 @@ class MapReduceService(Service):
if tt_node_groups:
global_config = cluster_spec.configurations['global']
common_paths = self._get_common_paths(tt_node_groups)
mapred_site_config['mapred.local.dir'] = \
self._generate_storage_path(common_paths, '/hadoop/mapred')
mapred_site_config['mapred.local.dir'] = (
self._generate_storage_path(common_paths, '/hadoop/mapred'))
global_config['mapred_local_dir'] = self._generate_storage_path(
common_paths, '/hadoop/mapred')

View File

@ -102,6 +102,9 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_services_processor(self):
return services
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']
class AmbariClient():
@ -163,6 +166,8 @@ class AmbariClient():
existing_configs = json_result['Clusters']['desired_configs']
configs = cluster_spec.get_deployed_configurations()
if 'ambari' in configs:
configs.remove('ambari')
if len(configs) == len(existing_configs):
# nothing to do
return
@ -175,9 +180,6 @@ class AmbariClient():
version = 1
body['Clusters'] = clusters
for config_name in configs:
if config_name == 'ambari':
# ambari configs are currently internal to the plugin
continue
if config_name in existing_configs:
if config_name == 'core-site' or config_name == 'global':
existing_version = existing_configs[config_name]['tag']\

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,20 @@
#!/bin/bash
HADOOP_CONF=/etc/hadoop/conf
while [ $# -gt 0 ] ; do
nodeArg=$1
exec< ${HADOOP_CONF}/topology.data
result=""
while read line ; do
ar=( $line )
if [ "${ar[0]}" = "$nodeArg" ] ; then
result="${ar[1]}"
fi
done
shift
if [ -z "$result" ] ; then
echo -n "/default/rack "
else
echo -n "$result "
fi
done

View File

@ -0,0 +1,880 @@
# Copyright (c) 2014 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import six
from oslo.config import cfg
from savanna import exceptions as e
from savanna.plugins.general import exceptions as ex
from savanna.plugins.general import utils
from savanna.swift import swift_helper as h
from savanna.topology import topology_helper as th
CONF = cfg.CONF
TOPOLOGY_CONFIG = {
"net.topology.node.switch.mapping.impl":
"org.apache.hadoop.net.ScriptBasedMapping",
"net.topology.script.file.name":
"/etc/hadoop/conf/topology.sh"
}
def create_service(name):
for cls in Service.__subclasses__():
if cls.get_service_id() == name:
return cls()
# no subclass found, return service base class
return Service(name)
class Service(object):
def __init__(self, name):
self.name = name
self.configurations = set(['global', 'core-site'])
self.components = []
self.users = []
self.deployed = False
def add_component(self, component):
self.components.append(component)
def add_user(self, user):
self.users.append(user)
def validate(self, cluster_spec, cluster):
pass
def finalize_configuration(self, cluster_spec):
pass
def register_user_input_handlers(self, ui_handlers):
pass
def register_service_urls(self, cluster_spec, url_info):
return url_info
def pre_service_start(self, cluster_spec, ambari_info, started_services):
pass
def finalize_ng_components(self, cluster_spec):
pass
def is_user_template_component(self, component):
return True
def is_mandatory(self):
return False
def _replace_config_token(self, cluster_spec, token, value, props):
for config_name, props in six.iteritems(props):
config = cluster_spec.configurations[config_name]
for prop in props:
config[prop] = config[prop].replace(token, value)
def _update_config_values(self, configurations, value, props):
for absolute_prop_name in props:
tokens = absolute_prop_name.split('/')
config_name = tokens[0]
prop_name = tokens[1]
config = configurations[config_name]
config[prop_name] = value
def _get_common_paths(self, node_groups):
if len(node_groups) == 1:
paths = node_groups[0].storage_paths()
else:
sets = [set(ng.storage_paths()) for ng in node_groups]
paths = list(set.intersection(*sets))
if len(paths) > 1 and '/mnt' in paths:
paths.remove('/mnt')
return paths
def _generate_storage_path(self, storage_paths, path):
return ",".join([p + path for p in storage_paths])
def _get_port_from_cluster_spec(self, cluster_spec, service, prop_name):
address = cluster_spec.configurations[service][prop_name]
return utils.get_port_from_address(address)
class HdfsService(Service):
def __init__(self):
super(HdfsService, self).__init__(HdfsService.get_service_id())
self.configurations.add('hdfs-site')
@classmethod
def get_service_id(cls):
return 'HDFS'
def validate(self, cluster_spec, cluster):
# check for a single NAMENODE
count = cluster_spec.get_deployed_node_group_count('NAMENODE')
if count != 1:
raise ex.InvalidComponentCountException('NAMENODE', 1, count)
def finalize_configuration(self, cluster_spec):
nn_hosts = cluster_spec.determine_component_hosts('NAMENODE')
if nn_hosts:
props = {'core-site': ['fs.defaultFS'],
'hdfs-site': ['dfs.namenode.http-address',
'dfs.namenode.https-address']}
self._replace_config_token(
cluster_spec, '%NN_HOST%', nn_hosts.pop().fqdn(), props)
snn_hosts = cluster_spec.determine_component_hosts(
'SECONDARY_NAMENODE')
if snn_hosts:
props = {'hdfs-site': ['dfs.namenode.secondary.http-address']}
self._replace_config_token(
cluster_spec, '%SNN_HOST%', snn_hosts.pop().fqdn(), props)
# add swift properties to configuration
core_site_config = cluster_spec.configurations['core-site']
for prop in self._get_swift_properties():
core_site_config[prop['name']] = prop['value']
# add topology properties to configuration, if enabled
if CONF.enable_data_locality:
for prop in th.vm_awareness_core_config():
core_site_config[prop['name']] = prop['value']
core_site_config.update(TOPOLOGY_CONFIG)
# process storage paths to accommodate ephemeral or cinder storage
nn_ng = cluster_spec.get_node_groups_containing_component(
'NAMENODE')[0]
dn_node_groups = cluster_spec.get_node_groups_containing_component(
'DATANODE')
common_paths = []
if dn_node_groups:
common_paths = self._get_common_paths(dn_node_groups)
hdfs_site_config = cluster_spec.configurations['hdfs-site']
hdfs_site_config['dfs.namenode.name.dir'] = (
self._generate_storage_path(
nn_ng.storage_paths(), '/hadoop/hdfs/namenode'))
if common_paths:
hdfs_site_config['dfs.datanode.data.dir'] = (
self._generate_storage_path(
common_paths, '/hadoop/hdfs/data'))
def register_service_urls(self, cluster_spec, url_info):
namenode_ip = cluster_spec.determine_component_hosts(
'NAMENODE').pop().management_ip
ui_port = self._get_port_from_cluster_spec(cluster_spec, 'hdfs-site',
'dfs.namenode.http-address')
nn_port = self._get_port_from_cluster_spec(cluster_spec, 'core-site',
'fs.defaultFS')
url_info['HDFS'] = {
'Web UI': 'http://%s:%s' % (namenode_ip, ui_port),
'NameNode': 'hdfs://%s:%s' % (namenode_ip, nn_port)
}
return url_info
def finalize_ng_components(self, cluster_spec):
hdfs_ng = cluster_spec.get_node_groups_containing_component(
'NAMENODE')[0]
components = hdfs_ng.components
if not cluster_spec.get_deployed_node_group_count('ZOOKEEPER_SERVER'):
zk_service = next(service for service in cluster_spec.services
if service.name == 'ZOOKEEPER')
zk_service.deployed = True
components.append('ZOOKEEPER_SERVER')
def is_mandatory(self):
return True
def _get_swift_properties(self):
return h.get_swift_configs()
class MapReduce2Service(Service):
def __init__(self):
super(MapReduce2Service, self).__init__(
MapReduce2Service.get_service_id())
self.configurations.add('mapred-site')
@classmethod
def get_service_id(cls):
return 'MAPREDUCE2'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('HISTORYSERVER')
if count != 1:
raise ex.InvalidComponentCountException('HISTORYSERVER', 1, count)
def finalize_configuration(self, cluster_spec):
hs_hosts = cluster_spec.determine_component_hosts('HISTORYSERVER')
if hs_hosts:
props = {'mapred-site': ['mapreduce.jobhistory.webapp.address',
'mapreduce.jobhistory.address']}
self._replace_config_token(
cluster_spec, '%HS_HOST%', hs_hosts.pop().fqdn(), props)
# data locality/rack awareness prop processing
mapred_site_config = cluster_spec.configurations['mapred-site']
if CONF.enable_data_locality:
for prop in th.vm_awareness_mapred_config():
mapred_site_config[prop['name']] = prop['value']
def register_service_urls(self, cluster_spec, url_info):
historyserver_ip = cluster_spec.determine_component_hosts(
'HISTORYSERVER').pop().management_ip
ui_port = self._get_port_from_cluster_spec(
cluster_spec, 'mapred-site', 'mapreduce.jobhistory.webapp.address')
hs_port = self._get_port_from_cluster_spec(
cluster_spec, 'mapred-site', 'mapreduce.jobhistory.address')
url_info['MapReduce2'] = {
'Web UI': 'http://%s:%s' % (historyserver_ip, ui_port),
'History Server': '%s:%s' % (historyserver_ip, hs_port)
}
return url_info
def finalize_ng_components(self, cluster_spec):
mr2_ng = cluster_spec.get_node_groups_containing_component(
'HISTORYSERVER')[0]
components = mr2_ng.components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
def is_mandatory(self):
return True
class YarnService(Service):
def __init__(self):
super(YarnService, self).__init__(
YarnService.get_service_id())
self.configurations.add('yarn-site')
self.configurations.add('capacity-scheduler')
@classmethod
def get_service_id(cls):
return 'YARN'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('RESOURCEMANAGER')
if count != 1:
raise ex.InvalidComponentCountException('RESOURCEMANAGER', 1,
count)
count = cluster_spec.get_deployed_node_group_count('NODEMANAGER')
if not count:
raise ex.InvalidComponentCountException(
'NODEMANAGER', '> 0', count)
def finalize_configuration(self, cluster_spec):
rm_hosts = cluster_spec.determine_component_hosts('RESOURCEMANAGER')
if rm_hosts:
props = {'yarn-site': ['yarn.resourcemanager.'
'resource-tracker.address',
'yarn.resourcemanager.hostname',
'yarn.resourcemanager.address',
'yarn.resourcemanager.scheduler.address',
'yarn.resourcemanager.webapp.address',
'yarn.log.server.url',
'yarn.resourcemanager.admin.address']}
self._replace_config_token(
cluster_spec, '%RM_HOST%', rm_hosts.pop().fqdn(), props)
# data locality/rack awareness prop processing
mapred_site_config = cluster_spec.configurations['mapred-site']
if CONF.enable_data_locality:
for prop in th.vm_awareness_mapred_config():
mapred_site_config[prop['name']] = prop['value']
# process storage paths to accommodate ephemeral or cinder storage
nm_node_groups = cluster_spec.get_node_groups_containing_component(
'NODEMANAGER')
if nm_node_groups:
common_paths = self._get_common_paths(nm_node_groups)
mapred_site_config['yarn.nodemanager.local-dirs'] = (
self._generate_storage_path(common_paths,
'/hadoop/yarn/local'))
def register_service_urls(self, cluster_spec, url_info):
resourcemgr_ip = cluster_spec.determine_component_hosts(
'RESOURCEMANAGER').pop().management_ip
ui_port = self._get_port_from_cluster_spec(
cluster_spec, 'yarn-site', 'yarn.resourcemanager.webapp.address')
rm_port = self._get_port_from_cluster_spec(
cluster_spec, 'yarn-site', 'yarn.resourcemanager.address')
url_info['Yarn'] = {
'Web UI': 'http://%s:%s' % (resourcemgr_ip, ui_port),
'ResourceManager': '%s:%s' % (resourcemgr_ip, rm_port)
}
return url_info
def is_mandatory(self):
return True
class HiveService(Service):
def __init__(self):
super(HiveService, self).__init__(HiveService.get_service_id())
self.configurations.add('hive-site')
@classmethod
def get_service_id(cls):
return 'HIVE'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('HIVE_SERVER')
if count != 1:
raise ex.InvalidComponentCountException('HIVE_SERVER', 1, count)
def finalize_configuration(self, cluster_spec):
hive_servers = cluster_spec.determine_component_hosts('HIVE_SERVER')
if hive_servers:
props = {'global': ['hive_hostname'],
'core-site': ['hadoop.proxyuser.hive.hosts']}
self._replace_config_token(
cluster_spec, '%HIVE_HOST%', hive_servers.pop().fqdn(), props)
hive_ms = cluster_spec.determine_component_hosts('HIVE_METASTORE')
if hive_ms:
self._replace_config_token(
cluster_spec, '%HIVE_METASTORE_HOST%', hive_ms.pop().fqdn(),
{'hive-site': ['hive.metastore.uris']})
hive_mysql = cluster_spec.determine_component_hosts('MYSQL_SERVER')
if hive_mysql:
self._replace_config_token(
cluster_spec, '%HIVE_MYSQL_HOST%', hive_mysql.pop().fqdn(),
{'hive-site': ['javax.jdo.option.ConnectionURL']})
def register_user_input_handlers(self, ui_handlers):
ui_handlers['hive-site/javax.jdo.option.ConnectionUserName'] = (
self._handle_user_property_metastore_user)
ui_handlers['hive-site/javax.jdo.option.ConnectionPassword'] = (
self._handle_user_property_metastore_pwd)
def _handle_user_property_metastore_user(self, user_input, configurations):
hive_site_config_map = configurations['hive-site']
hive_site_config_map['javax.jdo.option.ConnectionUserName'] = (
user_input.value)
def _handle_user_property_metastore_pwd(self, user_input, configurations):
hive_site_config_map = configurations['hive-site']
hive_site_config_map['javax.jdo.option.ConnectionPassword'] = (
user_input.value)
def finalize_ng_components(self, cluster_spec):
hive_ng = cluster_spec.get_node_groups_containing_component(
'HIVE_SERVER')[0]
components = hive_ng.components
if 'MAPREDUCE2_CLIENT' not in components:
components.append('MAPREDUCE2_CLIENT')
if not cluster_spec.get_deployed_node_group_count('HIVE_METASTORE'):
components.append('HIVE_METASTORE')
if not cluster_spec.get_deployed_node_group_count('MYSQL_SERVER'):
components.append('MYSQL_SERVER')
if not cluster_spec.get_deployed_node_group_count('ZOOKEEPER_SERVER'):
zk_service = next(service for service in cluster_spec.services
if service.name == 'ZOOKEEPER')
zk_service.deployed = True
components.append('ZOOKEEPER_SERVER')
def pre_service_start(self, cluster_spec, ambari_info, started_services):
# this code is needed because of a bug in Ambari where hdfs dir's
# are only created at NN initial startup. Remove this code when
# the bug is fixed in Ambari.
if 'HDFS' not in started_services:
return
# get any instance
with cluster_spec.servers[0].remote() as r:
r.execute_command('su -c "hadoop fs -mkdir /user/hive" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chown -R '
'hive:hdfs /user/hive" -s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -mkdir /apps/hive" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chmod -R 755 /apps/hive" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -mkdir /apps/hive/warehouse" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chown -R hive:hdfs '
'/apps/hive/warehouse" -s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chmod -R 777 '
'/apps/hive/warehouse" -s /bin/sh hdfs')
class WebHCatService(Service):
def __init__(self):
super(WebHCatService, self).__init__(WebHCatService.get_service_id())
self.configurations.add('webhcat-site')
@classmethod
def get_service_id(cls):
return 'WEBHCAT'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('WEBHCAT_SERVER')
if count != 1:
raise ex.InvalidComponentCountException('WEBHCAT_SERVER', 1, count)
def finalize_configuration(self, cluster_spec):
webhcat_servers = cluster_spec.determine_component_hosts(
'WEBHCAT_SERVER')
if webhcat_servers:
self._replace_config_token(
cluster_spec, '%WEBHCAT_HOST%', webhcat_servers.pop().fqdn(),
{'core-site': ['hadoop.proxyuser.hcat.hosts']})
hive_ms_servers = cluster_spec.determine_component_hosts(
'HIVE_METASTORE')
if hive_ms_servers:
self._replace_config_token(
cluster_spec, '%HIVE_METASTORE_HOST%',
hive_ms_servers.pop().fqdn(),
{'webhcat-site': ['templeton.hive.properties']})
zk_servers = cluster_spec.determine_component_hosts('ZOOKEEPER_SERVER')
if zk_servers:
self._replace_config_token(
cluster_spec, '%ZOOKEEPER_HOST%', zk_servers.pop().fqdn(),
{'webhcat-site': ['templeton.zookeeper.hosts']})
def finalize_ng_components(self, cluster_spec):
webhcat_ng = cluster_spec.get_node_groups_containing_component(
'WEBHCAT_SERVER')[0]
components = webhcat_ng.components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
if 'MAPREDUCE2_CLIENT' not in components:
components.append('MAPREDUCE2_CLIENT')
# per AMBARI-3483
if 'YARN_CLIENT' not in components:
components.append('YARN_CLIENT')
if 'ZOOKEEPER_CLIENT' not in components:
# if zk server isn't in cluster, add to ng
if not cluster_spec.get_deployed_node_group_count(
'ZOOKEEPER_SERVER'):
zk_service = next(service for service in cluster_spec.services
if service.name == 'ZOOKEEPER')
zk_service.deployed = True
components.append('ZOOKEEPER_SERVER')
components.append('ZOOKEEPER_CLIENT')
def pre_service_start(self, cluster_spec, ambari_info, started_services):
# this code is needed because of a bug in Ambari where hdfs dir's
# are only created at NN initial startup. Remove this code when
# the bug is fixed in Ambari.
if 'HDFS' not in started_services:
return
# get any instance
with cluster_spec.servers[0].remote() as r:
r.execute_command('su -c "hadoop fs -mkdir /user/hcat" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chown -R hcat:hdfs '
'/user/hcat" -s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chmod -R 755 /user/hcat" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -mkdir /apps/webhcat" '
'-s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chown -R hcat:hdfs '
'/apps/webhcat" -s /bin/sh hdfs')
r.execute_command('su -c "hadoop fs -chmod -R 755 /apps/webhcat" '
'-s /bin/sh hdfs')
class HBaseService(Service):
property_map = {
'hbase-site/hbase.tmp.dir': [
'hbase-site/hbase.tmp.dir', 'global/hbase_tmp_dir'],
'hbase-site/hbase.regionserver.global.memstore.upperLimit': [
'hbase-site/hbase.regionserver.global.memstore.upperLimit',
'global/regionserver_memstore_upperlimit'],
'hbase-site/hbase.hstore.blockingStoreFiles': [
'hbase-site/hbase.hstore.blockingStoreFiles',
'global/hstore_blockingstorefiles'],
'hbase-site/hbase.hstore.compactionThreshold': [
'hbase-site/hbase.hstore.compactionThreshold',
'global/hstore_compactionthreshold'],
'hbase-site/hfile.block.cache.size': [
'hbase-site/hfile.block.cache.size',
'global/hfile_blockcache_size'],
'hbase-site/hbase.hregion.max.filesize': [
'hbase-site/hbase.hregion.max.filesize',
'global/hstorefile_maxsize'],
'hbase-site/hbase.regionserver.handler.count': [
'hbase-site/hbase.regionserver.handler.count',
'global/regionserver_handlers'],
'hbase-site/hbase.hregion.majorcompaction': [
'hbase-site/hbase.hregion.majorcompaction',
'global/hregion_majorcompaction'],
'hbase-site/hbase.regionserver.global.memstore.lowerLimit': [
'hbase-site/hbase.regionserver.global.memstore.lowerLimit',
'global/regionserver_memstore_lowerlimit'],
'hbase-site/hbase.hregion.memstore.block.multiplier': [
'hbase-site/hbase.hregion.memstore.block.multiplier',
'global/hregion_blockmultiplier'],
'hbase-site/hbase.hregion.memstore.mslab.enabled': [
'hbase-site/hbase.hregion.memstore.mslab.enabled',
'global/regionserver_memstore_lab'],
'hbase-site/hbase.hregion.memstore.flush.size': [
'hbase-site/hbase.hregion.memstore.flush.size'],
'hbase-site/hbase.client.scanner.caching': [
'hbase-site/hbase.client.scanner.caching',
'global/client_scannercaching'],
'hbase-site/zookeeper.session.timeout': [
'hbase-site/zookeeper.session.timeout',
'global/zookeeper_sessiontimeout'],
'hbase-site/hbase.client.keyvalue.maxsize': [
'hbase-site/hbase.client.keyvalue.maxsize',
'global/hfile_max_keyvalue_size'],
'hdfs-site/dfs.support.append': [
'hdfs-site/dfs.support.append',
'hbase-site/dfs.support.append',
'global/hdfs_support_append'],
'hbase-site/dfs.client.read.shortcircuit': [
'hbase-site/dfs.client.read.shortcircuit',
'global/hdfs_enable_shortcircuit_read']
}
def __init__(self):
super(HBaseService, self).__init__(
HBaseService.get_service_id())
self.configurations.add('hbase-site')
#self.configurations.add('hbase-policy')
@classmethod
def get_service_id(cls):
return 'HBASE'
def validate(self, cluster_spec, cluster):
# check for a single HBASE_SERVER
count = cluster_spec.get_deployed_node_group_count('HBASE_MASTER')
if count != 1:
raise ex.InvalidComponentCountException('HBASE_MASTER', 1, count)
def register_service_urls(self, cluster_spec, url_info):
master_ip = cluster_spec.determine_component_hosts(
'HBASE_MASTER').pop().management_ip
hbase_config = cluster_spec.configurations['hbase-site']
info_port = hbase_config['hbase.master.info.port']
url_info['HBase'] = {
'Web UI': 'http://%s:%s/master-status' % (master_ip, info_port),
'Logs': 'http://%s:%s/logs' % (master_ip, info_port),
'Zookeeper Info': 'http://%s:%s/zk.jsp' % (master_ip, info_port),
'JMX': 'http://%s:%s/jmx' % (master_ip, info_port),
'Debug Dump': 'http://%s:%s/dump' % (master_ip, info_port),
'Thread Stacks': 'http://%s:%s/stacks' % (master_ip, info_port)
}
return url_info
def register_user_input_handlers(self, ui_handlers):
for prop_name in self.property_map:
ui_handlers[prop_name] = (
self._handle_config_property_update)
ui_handlers['hbase-site/hbase.rootdir'] = (
self._handle_user_property_root_dir)
def _handle_config_property_update(self, user_input, configurations):
self._update_config_values(configurations, user_input.value,
self.property_map[user_input.config.name])
def _handle_user_property_root_dir(self, user_input, configurations):
configurations['hbase-site']['hbase.rootdir'] = user_input.value
match = re.search('(^hdfs://)(.*?)(/.*)', user_input.value)
if match:
configurations['global']['hbase_hdfs_root_dir'] = match.group(3)
else:
raise e.InvalidDataException(
"Invalid value for property 'hbase-site/hbase.rootdir' : %s" %
user_input.value)
def finalize_configuration(self, cluster_spec):
nn_servers = cluster_spec.determine_component_hosts('NAMENODE')
if nn_servers:
self._replace_config_token(
cluster_spec, '%NN_HOST%', nn_servers.pop().fqdn(),
{'hbase-site': ['hbase.rootdir']})
zk_servers = cluster_spec.determine_component_hosts('ZOOKEEPER_SERVER')
if zk_servers:
self._replace_config_token(
cluster_spec, '%ZOOKEEPER_HOST%', zk_servers.pop().fqdn(),
{'hbase-site': ['hbase.zookeeper.quorum']})
def finalize_ng_components(self, cluster_spec):
hbase_ng = cluster_spec.get_node_groups_containing_component(
'HBASE_MASTER')
components = hbase_ng[0].components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
if not cluster_spec.get_deployed_node_group_count(
'HBASE_REGIONSERVER'):
components.append('HBASE_REGIONSERVER')
else:
hbase_ng = cluster_spec.get_node_groups_containing_component(
'HBASE_REGIONSERVER')
for ng in hbase_ng:
components = ng.components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
if not cluster_spec.get_deployed_node_group_count('ZOOKEEPER_SERVER'):
zk_service = next(service for service in cluster_spec.services
if service.name == 'ZOOKEEPER')
zk_service.deployed = True
components.append('ZOOKEEPER_SERVER')
class ZookeeperService(Service):
def __init__(self):
super(ZookeeperService, self).__init__(
ZookeeperService.get_service_id())
@classmethod
def get_service_id(cls):
return 'ZOOKEEPER'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('ZOOKEEPER_SERVER')
if count != 1:
raise ex.InvalidComponentCountException(
'ZOOKEEPER_SERVER', 1, count)
def is_mandatory(self):
return True
class OozieService(Service):
def __init__(self):
super(OozieService, self).__init__(OozieService.get_service_id())
self.configurations.add('oozie-site')
@classmethod
def get_service_id(cls):
return 'OOZIE'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('OOZIE_SERVER')
if count != 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', 1, count)
count = cluster_spec.get_deployed_node_group_count('OOZIE_CLIENT')
if not count:
raise ex.InvalidComponentCountException(
'OOZIE_CLIENT', '1+', count)
def finalize_configuration(self, cluster_spec):
oozie_servers = cluster_spec.determine_component_hosts('OOZIE_SERVER')
if oozie_servers:
oozie_server = oozie_servers.pop()
name_list = [oozie_server.fqdn(), oozie_server.internal_ip,
oozie_server.management_ip]
self._replace_config_token(
cluster_spec, '%OOZIE_HOST%', oozie_server.fqdn(),
{'global': ['oozie_hostname'],
'oozie-site': ['oozie.base.url']})
self._replace_config_token(
cluster_spec, '%OOZIE_HOST%', ",".join(name_list),
{'core-site': ['hadoop.proxyuser.oozie.hosts']})
def finalize_ng_components(self, cluster_spec):
oozie_ng = cluster_spec.get_node_groups_containing_component(
'OOZIE_SERVER')[0]
components = oozie_ng.components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
if 'MAPREDUCE2_CLIENT' not in components:
components.append('MAPREDUCE2_CLIENT')
# per AMBARI-3483
if 'YARN_CLIENT' not in components:
components.append('YARN_CLIENT')
# ensure that mr and hdfs clients are colocated with oozie client
client_ngs = cluster_spec.get_node_groups_containing_component(
'OOZIE_CLIENT')
for ng in client_ngs:
components = ng.components
if 'HDFS_CLIENT' not in components:
components.append('HDFS_CLIENT')
if 'MAPREDUCE2_CLIENT' not in components:
components.append('MAPREDUCE2_CLIENT')
def register_service_urls(self, cluster_spec, url_info):
oozie_ip = cluster_spec.determine_component_hosts(
'OOZIE_SERVER').pop().management_ip
port = self._get_port_from_cluster_spec(cluster_spec, 'oozie-site',
'oozie.base.url')
url_info['JobFlow'] = {
'Oozie': 'http://%s:%s' % (oozie_ip, port)
}
return url_info
def register_user_input_handlers(self, ui_handlers):
ui_handlers['oozie-site/oozie.service.JPAService.jdbc.username'] = (
self._handle_user_property_db_user)
ui_handlers['oozie.service.JPAService.jdbc.password'] = (
self._handle_user_property_db_pwd)
def _handle_user_property_db_user(self, user_input, configurations):
oozie_site_config_map = configurations['oozie-site']
oozie_site_config_map['oozie.service.JPAService.jdbc.username'] = (
user_input.value)
def _handle_user_property_db_pwd(self, user_input, configurations):
oozie_site_config_map = configurations['oozie-site']
oozie_site_config_map['oozie.service.JPAService.jdbc.password'] = (
user_input.value)
class GangliaService(Service):
def __init__(self):
super(GangliaService, self).__init__(GangliaService.get_service_id())
@classmethod
def get_service_id(cls):
return 'GANGLIA'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('GANGLIA_SERVER')
if count != 1:
raise ex.InvalidComponentCountException('GANGLIA_SERVER', 1, count)
def is_user_template_component(self, component):
return component.name != 'GANGLIA_MONITOR'
def finalize_ng_components(self, cluster_spec):
for ng in cluster_spec.node_groups.values():
if 'GANGLIA_MONITOR' not in ng.components:
ng.components.append('GANGLIA_MONITOR')
class AmbariService(Service):
def __init__(self):
super(AmbariService, self).__init__(AmbariService.get_service_id())
self.configurations.add('ambari')
#TODO(jspeidel): don't hard code default admin user
self.admin_user_name = 'admin'
@classmethod
def get_service_id(cls):
return 'AMBARI'
def validate(self, cluster_spec, cluster):
count = cluster_spec.get_deployed_node_group_count('AMBARI_SERVER')
if count != 1:
raise ex.InvalidComponentCountException('AMBARI_SERVER', 1, count)
def register_service_urls(self, cluster_spec, url_info):
ambari_ip = cluster_spec.determine_component_hosts(
'AMBARI_SERVER').pop().management_ip
port = cluster_spec.configurations['ambari'].get(
'server.port', '8080')
url_info['Ambari Console'] = {
'Web UI': 'http://{0}:{1}'.format(ambari_ip, port)
}
return url_info
def is_user_template_component(self, component):
return component.name != 'AMBARI_AGENT'
def register_user_input_handlers(self, ui_handlers):
ui_handlers['ambari-stack/ambari.admin.user'] = (
self._handle_user_property_admin_user)
ui_handlers['ambari-stack/ambari.admin.password'] = (
self._handle_user_property_admin_password)
def is_mandatory(self):
return True
def _handle_user_property_admin_user(self, user_input, configurations):
admin_user = next(user for user in self.users
if user.name == 'admin')
admin_user.name = user_input.value
self.admin_user_name = user_input.value
def _handle_user_property_admin_password(self, user_input, configurations):
admin_user = next(user for user in self.users
if user.name == self.admin_user_name)
admin_user.password = user_input.value
class SqoopService(Service):
def __init__(self):
super(SqoopService, self).__init__(SqoopService.get_service_id())
@classmethod
def get_service_id(cls):
return 'SQOOP'
def finalize_ng_components(self, cluster_spec):
sqoop_ngs = cluster_spec.get_node_groups_containing_component('SQOOP')
for ng in sqoop_ngs:
if 'HDFS_CLIENT' not in ng.components:
ng.components.append('HDFS_CLIENT')
if 'MAPREDUCE2_CLIENT' not in ng.components:
ng.components.append('MAPREDUCE2_CLIENT')
class NagiosService(Service):
def __init__(self):
super(NagiosService, self).__init__(NagiosService.get_service_id())
@classmethod
def get_service_id(cls):
return 'NAGIOS'
def finalize_ng_components(self, cluster_spec):
# per AMBARI-2946
nagios_ngs = (
cluster_spec.get_node_groups_containing_component('NAGIOS_SERVER'))
for ng in nagios_ngs:
if 'YARN_CLIENT' not in ng.components:
ng.components.append('YARN_CLIENT')
if 'MAPREDUCE2_CLIENT' not in ng.components:
ng.components.append('MAPREDUCE2_CLIENT')
if cluster_spec.get_deployed_node_group_count('OOZIE_SERVER'):
if 'OOZIE_CLIENT' not in ng.components:
ng.components.append('OOZIE_CLIENT')
if cluster_spec.get_deployed_node_group_count('HIVE_SERVER'):
if 'HIVE_CLIENT' not in ng.components:
ng.components.append('HIVE_CLIENT')
if 'HCAT' not in ng.components:
if not cluster_spec.get_deployed_node_group_count(
'HCATALOG'):
hcat_service = next(service for service in
cluster_spec.services if
service.name == 'HCATALOG')
hcat_service.deployed = True
ng.components.append('HCAT')

View File

@ -0,0 +1,588 @@
# Copyright (c) 2014 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import requests
from oslo.config import cfg
import pkg_resources as pkg
from savanna import context
from savanna.plugins.general import exceptions as ex
from savanna.plugins.hdp import clusterspec as cs
from savanna.plugins.hdp import configprovider as cfgprov
from savanna.plugins.hdp.versions import abstractversionhandler as avm
from savanna.plugins.hdp.versions.version_2_0_6 import services
from savanna import version
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VersionHandler(avm.AbstractVersionHandler):
config_provider = None
version = None
client = None
def _set_version(self, version):
self.version = version
def _get_config_provider(self):
if self.config_provider is None:
self.config_provider = cfgprov.ConfigurationProvider(
json.load(pkg.resource_stream(version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')))
return self.config_provider
def get_version(self):
return self.version
def get_ambari_client(self):
if not self.client:
self.client = AmbariClient(self)
return self.client
def get_config_items(self):
return self._get_config_provider().get_config_items()
def get_applicable_target(self, name):
return self._get_config_provider().get_applicable_target(name)
def get_cluster_spec(self, cluster, user_inputs,
scaled_groups=None, cluster_template=None):
if cluster_template:
cluster_spec = cs.ClusterSpec(cluster_template, '2.0.6')
else:
cluster_spec = self.get_default_cluster_configuration()
cluster_spec.create_operational_config(
cluster, user_inputs, scaled_groups)
return cluster_spec
def get_default_cluster_configuration(self):
return cs.ClusterSpec(self._get_default_cluster_template(), '2.0.6')
def _get_default_cluster_template(self):
return pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
def get_node_processes(self):
node_processes = {}
for service in self.get_default_cluster_configuration().services:
components = []
for component in service.components:
components.append(component.name)
node_processes[service.name] = components
return node_processes
def install_swift_integration(self, servers):
for server in servers:
server.install_swift_integration()
def get_services_processor(self):
return services
def get_resource_manager_uri(self, cluster):
return cluster['info']['Yarn']['ResourceManager']
class AmbariClient():
def __init__(self, handler):
# add an argument for neutron discovery
self.handler = handler
def _get_http_session(self, host, port):
return host.remote().get_http_client(port)
def _get_standard_headers(self):
return {"X-Requested-By": "savanna"}
def _post(self, url, ambari_info, data=None):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.post(url, data=data,
auth=(ambari_info.user, ambari_info.password),
headers=self._get_standard_headers())
def _delete(self, url, ambari_info):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.delete(url,
auth=(ambari_info.user, ambari_info.password),
headers=self._get_standard_headers())
def _put(self, url, ambari_info, data=None):
session = self._get_http_session(ambari_info.host, ambari_info.port)
auth = (ambari_info.user, ambari_info.password)
return session.put(url, data=data, auth=auth,
headers=self._get_standard_headers())
def _get(self, url, ambari_info):
session = self._get_http_session(ambari_info.host, ambari_info.port)
return session.get(url, auth=(ambari_info.user, ambari_info.password),
headers=self._get_standard_headers())
def _add_cluster(self, ambari_info, name):
add_cluster_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
result = self._post(add_cluster_url, ambari_info,
data='{"Clusters": {"version" : "HDP-' +
self.handler.get_version() + '"}}')
if result.status_code != 201:
LOG.error('Create cluster command failed. %s' % result.text)
raise ex.HadoopProvisionError(
'Failed to add cluster: %s' % result.text)
def _add_configurations_to_cluster(
self, cluster_spec, ambari_info, name):
existing_config_url = ('http://{0}/api/v1/clusters/{1}?fields='
'Clusters/desired_configs'.format(
ambari_info.get_address(), name))
result = self._get(existing_config_url, ambari_info)
json_result = json.loads(result.text)
existing_configs = json_result['Clusters']['desired_configs']
configs = cluster_spec.get_deployed_configurations()
if 'ambari' in configs:
configs.remove('ambari')
if len(configs) == len(existing_configs):
# nothing to do
return
config_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
body = {}
clusters = {}
version = 1
body['Clusters'] = clusters
for config_name in configs:
if config_name in existing_configs:
if config_name == 'core-site' or config_name == 'global':
existing_version = (existing_configs[config_name]['tag']
.lstrip('v'))
version = int(existing_version) + 1
else:
continue
config_body = {}
clusters['desired_config'] = config_body
config_body['type'] = config_name
config_body['tag'] = 'v%s' % version
config_body['properties'] = (
cluster_spec.configurations[config_name])
result = self._put(config_url, ambari_info, data=json.dumps(body))
if result.status_code != 200:
LOG.error(
'Set configuration command failed. {0}'.format(
result.text))
raise ex.HadoopProvisionError(
'Failed to set configurations on cluster: %s'
% result.text)
def _add_services_to_cluster(self, cluster_spec, ambari_info, name):
services = cluster_spec.services
add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}'
for service in services:
if service.deployed and service.name != 'AMBARI':
result = self._post(add_service_url.format(
ambari_info.get_address(), name, service.name),
ambari_info)
if result.status_code not in [201, 409]:
LOG.error(
'Create service command failed. {0}'.format(
result.text))
raise ex.HadoopProvisionError(
'Failed to add services to cluster: %s' % result.text)
def _add_components_to_services(self, cluster_spec, ambari_info, name):
add_component_url = ('http://{0}/api/v1/clusters/{1}/services/{'
'2}/components/{3}')
for service in cluster_spec.services:
if service.deployed and service.name != 'AMBARI':
for component in service.components:
result = self._post(add_component_url.format(
ambari_info.get_address(), name, service.name,
component.name),
ambari_info)
if result.status_code not in [201, 409]:
LOG.error(
'Create component command failed. {0}'.format(
result.text))
raise ex.HadoopProvisionError(
'Failed to add components to services: %s'
% result.text)
def _add_hosts_and_components(
self, cluster_spec, servers, ambari_info, name):
add_host_url = 'http://{0}/api/v1/clusters/{1}/hosts/{2}'
add_host_component_url = ('http://{0}/api/v1/clusters/{1}'
'/hosts/{2}/host_components/{3}')
for host in servers:
hostname = host.instance.fqdn().lower()
result = self._post(
add_host_url.format(ambari_info.get_address(), name, hostname),
ambari_info)
if result.status_code != 201:
LOG.error(
'Create host command failed. {0}'.format(result.text))
raise ex.HadoopProvisionError(
'Failed to add host: %s' % result.text)
node_group_name = host.node_group.name
#TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
ambari_info)
if result.status_code != 201:
LOG.error(
'Create host_component command failed. %s' %
result.text)
raise ex.HadoopProvisionError(
'Failed to add host component: %s' % result.text)
def _install_services(self, cluster_name, ambari_info):
LOG.info('Installing required Hadoop services ...')
ambari_address = ambari_info.get_address()
install_url = ('http://{0}/api/v1/clusters/{'
'1}/services?ServiceInfo/state=INIT'.format(
ambari_address, cluster_name))
body = ('{"RequestInfo" : { "context" : "Install all services" },'
'"Body" : {"ServiceInfo": {"state" : "INSTALLED"}}}')
result = self._put(install_url, ambari_info, data=body)
if result.status_code == 202:
json_result = json.loads(result.text)
request_id = json_result['Requests']['id']
success = self._wait_for_async_request(self._get_async_request_uri(
ambari_info, cluster_name, request_id),
ambari_info)
if success:
LOG.info("Install of Hadoop stack successful.")
self._finalize_ambari_state(ambari_info)
else:
LOG.critical('Install command failed.')
raise ex.HadoopProvisionError(
'Installation of Hadoop stack failed.')
elif result.status_code != 200:
LOG.error(
'Install command failed. {0}'.format(result.text))
raise ex.HadoopProvisionError(
'Installation of Hadoop stack failed.')
def _get_async_request_uri(self, ambari_info, cluster_name, request_id):
return ('http://{0}/api/v1/clusters/{1}/requests/{'
'2}/tasks?fields=Tasks/status'.format(
ambari_info.get_address(), cluster_name,
request_id))
def _wait_for_async_request(self, request_url, ambari_info):
started = False
while not started:
result = self._get(request_url, ambari_info)
LOG.debug(
'async request ' + request_url + ' response:\n' + result.text)
json_result = json.loads(result.text)
started = True
for items in json_result['items']:
status = items['Tasks']['status']
if status == 'FAILED' or status == 'ABORTED':
return False
else:
if status != 'COMPLETED':
started = False
context.sleep(5)
return started
def _finalize_ambari_state(self, ambari_info):
LOG.info('Finalizing Ambari cluster state.')
persist_state_uri = 'http://{0}/api/v1/persist'.format(
ambari_info.get_address())
# this post data has non-standard format because persist
# resource doesn't comply with Ambari API standards
persist_data = ('{ "CLUSTER_CURRENT_STATUS":'
'"{\\"clusterState\\":\\"CLUSTER_STARTED_5\\"}" }')
result = self._post(persist_state_uri, ambari_info, data=persist_data)
if result.status_code != 201 and result.status_code != 202:
LOG.warning('Finalizing of Ambari cluster state failed. {0}'.
format(result.text))
raise ex.HadoopProvisionError('Unable to finalize Ambari state.')
def start_services(self, cluster_name, cluster_spec, ambari_info):
LOG.info('Starting Hadoop services ...')
LOG.info('Cluster name: {0}, Ambari server address: {1}'
.format(cluster_name, ambari_info.get_address()))
start_url = ('http://{0}/api/v1/clusters/{1}/services?ServiceInfo/'
'state=INSTALLED'.format(
ambari_info.get_address(), cluster_name))
body = ('{"RequestInfo" : { "context" : "Start all services" },'
'"Body" : {"ServiceInfo": {"state" : "STARTED"}}}')
self._fire_service_start_notifications(
cluster_name, cluster_spec, ambari_info)
result = self._put(start_url, ambari_info, data=body)
if result.status_code == 202:
json_result = json.loads(result.text)
request_id = json_result['Requests']['id']
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name,
request_id), ambari_info)
if success:
LOG.info(
"Successfully started Hadoop cluster '{0}'.".format(
cluster_name))
else:
LOG.critical('Failed to start Hadoop cluster.')
raise ex.HadoopProvisionError(
'Start of Hadoop services failed.')
elif result.status_code != 200:
LOG.error(
'Start command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise ex.HadoopProvisionError(
'Start of Hadoop services failed.')
def _exec_ambari_command(self, ambari_info, body, cmd_uri):
LOG.debug('PUT URI: {0}'.format(cmd_uri))
result = self._put(cmd_uri, ambari_info, data=body)
if result.status_code == 202:
LOG.debug(
'PUT response: {0}'.format(result.text))
json_result = json.loads(result.text)
href = json_result['href'] + '/tasks?fields=Tasks/status'
success = self._wait_for_async_request(href, ambari_info)
if success:
LOG.info(
"Successfully changed state of Hadoop components ")
else:
LOG.critical('Failed to change state of Hadoop '
'components')
raise RuntimeError('Failed to change state of Hadoop '
'components')
else:
LOG.error(
'Command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise RuntimeError('Hadoop/Ambari command failed.')
def _get_host_list(self, servers):
host_list = [server.instance.fqdn().lower() for server in servers]
return ",".join(host_list)
def _install_and_start_components(self, cluster_name, servers,
ambari_info, cluster_spec):
auth = (ambari_info.user, ambari_info.password)
self._install_components(ambari_info, auth, cluster_name, servers)
self.handler.install_swift_integration(servers)
self._start_components(ambari_info, auth, cluster_name,
servers, cluster_spec)
def _install_components(self, ambari_info, auth, cluster_name, servers):
LOG.info('Starting Hadoop components while scaling up')
LOG.info('Cluster name {0}, Ambari server ip {1}'
.format(cluster_name, ambari_info.get_address()))
# query for the host components on the given hosts that are in the
# INIT state
#TODO(jspeidel): provide request context
body = '{"HostRoles": {"state" : "INSTALLED"}}'
install_uri = ('http://{0}/api/v1/clusters/{'
'1}/host_components?HostRoles/state=INIT&'
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers)))
self._exec_ambari_command(ambari_info, body, install_uri)
def _start_components(self, ambari_info, auth, cluster_name, servers,
cluster_spec):
# query for all the host components in the INSTALLED state,
# then get a list of the client services in the list
installed_uri = ('http://{0}/api/v1/clusters/{'
'1}/host_components?HostRoles/state=INSTALLED&'
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers)))
result = self._get(installed_uri, ambari_info)
if result.status_code == 200:
LOG.debug(
'GET response: {0}'.format(result.text))
json_result = json.loads(result.text)
items = json_result['items']
client_set = cluster_spec.get_components_for_type('CLIENT')
inclusion_list = list(set([x['HostRoles']['component_name']
for x in items
if x['HostRoles']['component_name']
not in client_set]))
# query and start all non-client components on the given set of
# hosts
#TODO(jspeidel): Provide request context
body = '{"HostRoles": {"state" : "STARTED"}}'
start_uri = ('http://{0}/api/v1/clusters/{'
'1}/host_components?HostRoles/state=INSTALLED&'
'HostRoles/host_name.in({2})'
'&HostRoles/component_name.in({3})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers),
",".join(inclusion_list)))
self._exec_ambari_command(ambari_info, body, start_uri)
else:
raise ex.HadoopProvisionError(
'Unable to determine installed service '
'components in scaled instances. status'
' code returned = {0}'.format(result.status))
def wait_for_host_registrations(self, num_hosts, ambari_info):
LOG.info(
'Waiting for all Ambari agents to register with server ...')
url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address())
result = None
json_result = None
#TODO(jspeidel): timeout
while result is None or len(json_result['items']) < num_hosts:
context.sleep(5)
try:
result = self._get(url, ambari_info)
json_result = json.loads(result.text)
LOG.info('Registered Hosts: {0} of {1}'.format(
len(json_result['items']), num_hosts))
for hosts in json_result['items']:
LOG.debug('Registered Host: {0}'.format(
hosts['Hosts']['host_name']))
except requests.ConnectionError:
#TODO(jspeidel): max wait time
LOG.info('Waiting to connect to ambari server ...')
def update_ambari_admin_user(self, password, ambari_info):
old_pwd = ambari_info.password
user_url = 'http://{0}/api/v1/users/admin'.format(
ambari_info.get_address())
update_body = ('{{"Users":{{"roles":"admin","password":"{0}",'
'"old_password":"{1}"}} }}'.format(password, old_pwd))
result = self._put(user_url, ambari_info, data=update_body)
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to update Ambari admin user'
' credentials: {0}'.format(
result.text))
def add_ambari_user(self, user, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user.name)
create_body = ('{{"Users":{{"password":"{0}","roles":"{1}"}} }}'.
format(user.password, '%s' % ','.
join(map(str, user.groups))))
result = self._post(user_url, ambari_info, data=create_body)
if result.status_code != 201:
raise ex.HadoopProvisionError(
'Unable to create Ambari user: {0}'.format(result.text))
def delete_ambari_user(self, user_name, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user_name)
result = self._delete(user_url, ambari_info)
if result.status_code != 200:
raise ex.HadoopProvisionError(
'Unable to delete Ambari user: {0}'
' : {1}'.format(user_name, result.text))
def configure_scaled_cluster_instances(self, name, cluster_spec,
num_hosts, ambari_info):
self.wait_for_host_registrations(num_hosts, ambari_info)
self._add_configurations_to_cluster(
cluster_spec, ambari_info, name)
self._add_services_to_cluster(
cluster_spec, ambari_info, name)
self._add_components_to_services(
cluster_spec, ambari_info, name)
self._install_services(name, ambari_info)
def start_scaled_cluster_instances(self, name, cluster_spec, servers,
ambari_info):
self.start_services(name, cluster_spec, ambari_info)
self._add_hosts_and_components(
cluster_spec, servers, ambari_info, name)
self._install_and_start_components(
name, servers, ambari_info, cluster_spec)
def provision_cluster(self, cluster_spec, servers, ambari_info, name):
self._add_cluster(ambari_info, name)
self._add_configurations_to_cluster(cluster_spec, ambari_info, name)
self._add_services_to_cluster(cluster_spec, ambari_info, name)
self._add_components_to_services(cluster_spec, ambari_info, name)
self._add_hosts_and_components(
cluster_spec, servers, ambari_info, name)
self._install_services(name, ambari_info)
self.handler.install_swift_integration(servers)
def cleanup(self, ambari_info):
ambari_info.host.remote().close_http_sessions()
def _get_services_in_state(self, cluster_name, ambari_info, state):
services_url = ('http://{0}/api/v1/clusters/{1}/services?'
'ServiceInfo/state.in({2})'.format(
ambari_info.get_address(), cluster_name, state))
result = self._get(services_url, ambari_info)
json_result = json.loads(result.text)
services = []
for service in json_result['items']:
services.append(service['ServiceInfo']['service_name'])
return services
def _fire_service_start_notifications(self, cluster_name,
cluster_spec, ambari_info):
started_services = self._get_services_in_state(
cluster_name, ambari_info, 'STARTED')
for service in cluster_spec.services:
if service.deployed and not service.name in started_services:
service.pre_service_start(cluster_spec, ambari_info,
started_services)

View File

@ -183,3 +183,6 @@ class IDHProvider(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "oozie")
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']

View File

@ -70,6 +70,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_oozie_server(self, cluster):
pass
@plugins_base.optional
def get_resource_manager_uri(self, cluster):
pass
@plugins_base.required_with_default
def decommission_nodes(self, cluster, instances):
pass

View File

@ -59,6 +59,9 @@ class VanillaProvider(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return utils.get_instance(cluster, "oozie")
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']
def get_versions(self):
return ['1.2.1']

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from six.moves.urllib import parse as urlparse
from savanna import conductor as c
@ -36,13 +38,36 @@ def copy_from_local(r, source, target, hdfs_user):
def move_from_local(r, source, target, hdfs_user):
r.execute_command('sudo su - -c "hadoop dfs -moveFromLocal '
# using copyFromLocal followed by rm to address permission issues that
# arise when image user is not the same as hdfs user (permissions-wise).
# The moveFromLocal implementation actually is a copy and delete
# combination, so functionally the implementation is equivalent
r.execute_command('sudo su - -c "hadoop dfs -copyFromLocal '
'%s %s" %s' % (source, target, hdfs_user))
r.execute_command('sudo rm -f %s' % source)
def _dir_missing(path, hdfs_user, r):
ret_code, stdout = r.execute_command(
'sudo su - -c "hadoop dfs -test -e %s" %s' % (path, hdfs_user),
raise_when_error=False)
return ret_code == 1
def create_dir(r, dir_name, hdfs_user):
r.execute_command(
'sudo su - -c "hadoop dfs -mkdir %s" %s' % (dir_name, hdfs_user))
# there were significant differences between the 'mkdir' and 'mkdir -p'
# behaviors in Hadoop 1.2.0 vs. 2.2.0 forcing the creation of a
# manual implementation of 'mkdir -p'
comp_paths = dir_name.split(os.sep)
path = os.sep
for comp in comp_paths:
if len(comp) > 0:
path += comp + os.sep
if _dir_missing(path, hdfs_user, r):
r.execute_command(
'sudo su - -c "hadoop dfs -mkdir %s" %s' %
(path, hdfs_user))
def _get_cluster_hosts_information(host, cluster):

View File

@ -88,6 +88,11 @@ def _get_oozie_server(cluster):
return plugin.get_oozie_server(cluster)
def _get_resource_manager_path(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
return plugin.get_resource_manager_uri(cluster)
def cancel_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
@ -145,12 +150,12 @@ def run_job(job_execution):
path_to_workflow = upload_workflow_file(u.get_jobtracker(cluster),
wf_dir, wf_xml, hdfs_user)
jt_path = cluster['info']['MapReduce']['JobTracker']
rm_path = _get_resource_manager_path(cluster)
nn_path = cluster['info']['HDFS']['NameNode']
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/",
_get_oozie_server(cluster))
job_parameters = {"jobTracker": jt_path,
job_parameters = {"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,
"oozie.wf.application.path":
@ -182,6 +187,8 @@ def upload_job_files(where, job_dir, job, hdfs_user):
uploaded_paths.append(job_dir + '/' + main.name)
for lib in libs:
raw_data = dispatch.get_raw_binary(lib)
# HDFS 2.2.0 fails to put file if the lib dir does not exist
h.create_dir(r, job_dir + "/lib", hdfs_user)
h.put_file_to_hdfs(r, raw_data, lib.name, job_dir + "/lib",
hdfs_user)
uploaded_paths.append(job_dir + '/lib/' + lib.name)

View File

@ -41,13 +41,14 @@ def get_instance_info(*args, **kwargs):
return args[0].instance_info
def create_clusterspec():
def create_clusterspec(hdp_version='1.3.2'):
version_suffix = hdp_version.replace('.', '_')
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_1_3_2/resources/'
'default-cluster.template')
'plugins/hdp/versions/version_{0}/resources/'
'default-cluster.template'.format(version_suffix))
return cs.ClusterSpec(cluster_config_file)
return cs.ClusterSpec(cluster_config_file, version=hdp_version)
class InstanceInfo:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -24,8 +24,9 @@ class VersionManagerFactoryTest(unittest2.TestCase):
factory = versionhandlerfactory.VersionHandlerFactory.get_instance()
versions = factory.get_versions()
self.assertEqual(1, len(versions))
self.assertEqual(2, len(versions))
self.assertIn('1.3.2', versions)
self.assertIn('2.0.6', versions)
def test_get_version_handlers(self):
factory = versionhandlerfactory.VersionHandlerFactory.get_instance()

View File

@ -53,12 +53,15 @@ class TestJobManager(base.SavannaWithDbTestCase):
@mock.patch('savanna.utils.remote.get_remote')
@mock.patch('savanna.service.edp.hdfs_helper.put_file_to_hdfs')
@mock.patch('savanna.service.edp.hdfs_helper._dir_missing')
@mock.patch('savanna.utils.ssh_remote.InstanceInteropHelper')
@mock.patch('savanna.conductor.API.job_binary_internal_get_raw_data')
def test_upload_job_files(self, conductor_raw_data, helper, remote):
remote_class = mock.MagicMock()
def test_upload_job_files(self, conductor_raw_data, remote_class,
dir_missing, helper, remote):
remote_class.__exit__.return_value = 'closed'
remote.return_value = remote_class
helper.return_value = 'ok'
dir_missing.return_value = False
conductor_raw_data.return_value = 'ok'
job, _ = _create_all_stack('Pig')