Merge "Add support for cinder to HDP plugin"
This commit is contained in:
commit
813ebbd5b6
@ -172,6 +172,7 @@ class ClusterSpec():
|
||||
node_group.count = ng.count
|
||||
node_group.id = ng.id
|
||||
node_group.components = ng.node_processes[:]
|
||||
node_group.storage_paths = ng.storage_paths
|
||||
for instance in ng.instances:
|
||||
node_group.instances.add(Instance(instance.fqdn,
|
||||
instance.management_ip,
|
||||
@ -230,6 +231,7 @@ class NodeGroup():
|
||||
self.cardinality = None
|
||||
self.count = None
|
||||
self.instances = set()
|
||||
self.storage_paths = []
|
||||
|
||||
def add_component(self, component):
|
||||
self.components.append(component)
|
||||
|
@ -69,6 +69,21 @@ class Service(object):
|
||||
for prop in props:
|
||||
config[prop] = config[prop].replace(token, value)
|
||||
|
||||
def _get_common_paths(self, node_groups):
|
||||
if len(node_groups) == 1:
|
||||
paths = node_groups[0].storage_paths
|
||||
else:
|
||||
sets = [set(ng.storage_paths) for ng in node_groups]
|
||||
paths = list(set.intersection(*sets))
|
||||
|
||||
if len(paths) > 1 and '/mnt' in paths:
|
||||
paths.remove('/mnt')
|
||||
|
||||
return paths
|
||||
|
||||
def _generate_storage_path(self, storage_paths, path):
|
||||
return ",".join([p + path for p in storage_paths])
|
||||
|
||||
|
||||
class HdfsService(Service):
|
||||
def __init__(self):
|
||||
@ -105,6 +120,26 @@ class HdfsService(Service):
|
||||
for prop in self._get_swift_properties():
|
||||
core_site_config[prop['name']] = prop['value']
|
||||
|
||||
# process storage paths to accommodate ephemeral or cinder storage
|
||||
nn_ng = cluster_spec.get_node_groups_containing_component(
|
||||
'NAMENODE')[0]
|
||||
dn_node_groups = cluster_spec.get_node_groups_containing_component(
|
||||
'DATANODE')
|
||||
common_paths = []
|
||||
if dn_node_groups:
|
||||
common_paths = self._get_common_paths(dn_node_groups)
|
||||
hdfs_site_config = cluster_spec.configurations['hdfs-site']
|
||||
global_config = cluster_spec.configurations['global']
|
||||
hdfs_site_config['dfs.name.dir'] = self._generate_storage_path(
|
||||
nn_ng.storage_paths, '/hadoop/hdfs/namenode')
|
||||
global_config['dfs_name_dir'] = self._generate_storage_path(
|
||||
nn_ng.storage_paths, '/hadoop/hdfs/namenode')
|
||||
if common_paths:
|
||||
hdfs_site_config['dfs.data.dir'] = self._generate_storage_path(
|
||||
common_paths, '/hadoop/hdfs/data')
|
||||
global_config['dfs_data_dir'] = self._generate_storage_path(
|
||||
common_paths, '/hadoop/hdfs/data')
|
||||
|
||||
def register_service_urls(self, cluster_spec, url_info):
|
||||
namenode_ip = cluster_spec.determine_component_hosts(
|
||||
'NAMENODE').pop().management_ip
|
||||
@ -152,6 +187,20 @@ class MapReduceService(Service):
|
||||
self._replace_config_token(
|
||||
cluster_spec, '%JT_HOST%', jt_hosts.pop().fqdn, props)
|
||||
|
||||
# process storage paths to accommodate ephemeral or cinder storage
|
||||
# NOTE: mapred.system.dir is an HDFS namespace path (not a filesystem
|
||||
# path) so the default path should suffice
|
||||
tt_node_groups = cluster_spec.get_node_groups_containing_component(
|
||||
'TASKTRACKER')
|
||||
if tt_node_groups:
|
||||
mapred_site_config = cluster_spec.configurations['mapred-site']
|
||||
global_config = cluster_spec.configurations['global']
|
||||
common_paths = self._get_common_paths(tt_node_groups)
|
||||
mapred_site_config['mapred.local.dir'] = \
|
||||
self._generate_storage_path(common_paths, '/hadoop/mapred')
|
||||
global_config['mapred_local_dir'] = self._generate_storage_path(
|
||||
common_paths, '/hadoop/mapred')
|
||||
|
||||
def register_service_urls(self, cluster_spec, url_info):
|
||||
jobtracker_ip = cluster_spec.determine_component_hosts(
|
||||
'JOBTRACKER').pop().management_ip
|
||||
|
@ -290,8 +290,8 @@
|
||||
{
|
||||
"applicable_target": "HDFS",
|
||||
"config_type": "string",
|
||||
"default_value": "TODO-DFS-NAME-DIR",
|
||||
"description": "Determines where on the local filesystem the DFS name node\n should store the name table. If this is a comma-delimited list\n of directories then the name table is replicated in all of the\n directories, for redundancy.",
|
||||
"default_value": "/mnt/hadoop/hdfs/namenode",
|
||||
"description": "Determines where on the local filesystem the DFS name node\n should store the name table. If this is a comma-delimited list\n of directories then the name table is replicated in all of the\n directories, for redundancy. Note that this value will be changed by the runtime if cinder volumes are specified for then node group configured with the NAMENODE node process",
|
||||
"is_optional": true,
|
||||
"name": "dfs.name.dir",
|
||||
"scope": "cluster"
|
||||
@ -335,8 +335,8 @@
|
||||
{
|
||||
"applicable_target": "HDFS",
|
||||
"config_type": "string",
|
||||
"default_value": "TODO-DFS-DATA-DIR",
|
||||
"description": "Determines where on the local filesystem an DFS data node\n should store its blocks. If this is a comma-delimited\n list of directories, then data will be stored in all named\n directories, typically on different devices.\n Directories that do not exist are ignored.",
|
||||
"default_value": "/mnt/hadoop/hdfs/data",
|
||||
"description": "Determines where on the local filesystem an DFS data node\n should store its blocks. If this is a comma-delimited\n list of directories, then data will be stored in all named\n directories, typically on different devices.\n Directories that do not exist are ignored. Note that this value will be changed by the runtime if cinder volumes are specified for node groups configured with the DATANODE node process",
|
||||
"is_optional": true,
|
||||
"name": "dfs.data.dir",
|
||||
"scope": "cluster"
|
||||
@ -844,8 +844,8 @@
|
||||
{
|
||||
"applicable_target": "MAPREDUCE",
|
||||
"config_type": "string",
|
||||
"default_value": "/hadoop/mapred",
|
||||
"description": " ",
|
||||
"default_value": "/mnt/hadoop/mapred",
|
||||
"description": "The local directory where MapReduce stores intermediate data files. May be a comma-separated list of directories on different devices in order to spread disk i/o. Directories that do not exist are ignored. Note that this value will be changed by the runtime if cinder volumes are specified for node groups configured with the TASKTRACKER node process",
|
||||
"is_optional": true,
|
||||
"name": "mapred.local.dir",
|
||||
"scope": "cluster"
|
||||
|
@ -437,7 +437,7 @@
|
||||
{ "name" : "mapred.jobtracker.blacklist.fault-timeout-window", "value" : "180" },
|
||||
{ "name" : "mapred.jobtracker.blacklist.fault-bucket-width", "value" : "15" },
|
||||
{ "name" : "mapred.queue.names", "value" : "default" },
|
||||
{ "name" : "mapred.local.dir", "value" : "/hadoop/mapred" },
|
||||
{ "name" : "mapred.local.dir", "value" : "/mnt/hadoop/mapred" },
|
||||
{ "name" : "mapred.jobtracker.taskScheduler", "value" : "org.apache.hadoop.mapred.CapacityTaskScheduler" },
|
||||
{ "name" : "mapred.tasktracker.map.tasks.maximum", "value" : "4" },
|
||||
{ "name" : "mapred.tasktracker.reduce.tasks.maximum", "value" : "2" },
|
||||
@ -495,11 +495,11 @@
|
||||
{ "name" : "dfs.access.time.precision", "value" : "0" },
|
||||
{ "name" : "dfs.cluster.administrators", "value" : " hdfs" },
|
||||
{ "name" : "ipc.server.read.threadpool.size", "value" : "5" },
|
||||
{ "name" : "dfs.name.dir", "value" : "/hadoop/hdfs/namenode" },
|
||||
{ "name" : "dfs.name.dir", "value" : "/mnt/hadoop/hdfs/namenode" },
|
||||
{ "name" : "dfs.webhdfs.enabled", "value" : "false" },
|
||||
{ "name" : "dfs.datanode.failed.volumes.tolerated", "value" : "0" },
|
||||
{ "name" : "dfs.block.local-path-access.user", "value" : "hbase" },
|
||||
{ "name" : "dfs.data.dir", "value" : "/hadoop/hdfs/data" },
|
||||
{ "name" : "dfs.data.dir", "value" : "/mnt/hadoop/hdfs/data" },
|
||||
{ "name" : "dfs.hosts.exclude", "value" : "/etc/hadoop/dfs.exclude" },
|
||||
{ "name" : "dfs.hosts", "value" : "/etc/hadoop/dfs.include" },
|
||||
{ "name" : "dfs.replication", "value" : "3" },
|
||||
|
@ -65,11 +65,13 @@ class TestNodeGroup:
|
||||
def __init__(self, name, instances, node_processes, count=1):
|
||||
self.name = name
|
||||
self.instances = instances
|
||||
for i in instances:
|
||||
i.node_group = self
|
||||
if instances:
|
||||
for i in instances:
|
||||
i.node_group = self
|
||||
self.node_processes = node_processes
|
||||
self.count = count
|
||||
self.id = name
|
||||
self.storage_paths = []
|
||||
|
||||
|
||||
class TestUserInputConfig:
|
||||
|
@ -240,7 +240,7 @@ class AmbariPluginTest(unittest2.TestCase):
|
||||
'222.11.1111')
|
||||
|
||||
node_group = base.TestNodeGroup(
|
||||
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE",
|
||||
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
|
||||
"JOBTRACKER", "TASKTRACKER"])
|
||||
cluster = base.TestCluster([node_group])
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file)
|
||||
|
@ -47,7 +47,8 @@ class ClusterSpecTest(unittest2.TestCase):
|
||||
"SECONDARY_NAMENODE", "GANGLIA_SERVER",
|
||||
"GANGLIA_MONITOR", "NAGIOS_SERVER",
|
||||
"AMBARI_SERVER", "AMBARI_AGENT"])
|
||||
node_group2 = TestNodeGroup('slave', [server2], ['TASKTRACKER'])
|
||||
node_group2 = TestNodeGroup('slave', [server2], ['TASKTRACKER',
|
||||
'DATANODE'])
|
||||
cluster = base.TestCluster([node_group1, node_group2])
|
||||
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file)
|
||||
@ -221,15 +222,21 @@ class ClusterSpecTest(unittest2.TestCase):
|
||||
"MAPREDUCE_CLIENT", "OOZIE_CLIENT",
|
||||
"AMBARI_AGENT"])
|
||||
|
||||
user_input_config = TestUserInputConfig(
|
||||
'core-site', 'cluster', 'fs.default.name')
|
||||
user_input = ui(user_input_config, 'hdfs://nn_dif_host.novalocal:8020')
|
||||
|
||||
cluster = base.TestCluster([master_ng, jt_ng, nn_ng, snn_ng, hive_ng,
|
||||
hive_ms_ng, hive_mysql_ng,
|
||||
hcat_ng, zk_ng, oozie_ng, slave_ng])
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file)
|
||||
cluster_config.create_operational_config(cluster, [])
|
||||
cluster_config.create_operational_config(cluster, [user_input])
|
||||
config = cluster_config.configurations
|
||||
|
||||
# for this value, validating that user inputs override configured
|
||||
# values, whether they are processed by runtime or not
|
||||
self.assertEqual(config['core-site']['fs.default.name'],
|
||||
'hdfs://nn_host.novalocal:8020')
|
||||
'hdfs://nn_dif_host.novalocal:8020')
|
||||
|
||||
self.assertEqual(config['mapred-site']['mapred.job.tracker'],
|
||||
'jt_host.novalocal:50300')
|
||||
@ -475,7 +482,7 @@ class ClusterSpecTest(unittest2.TestCase):
|
||||
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
|
||||
|
||||
user_input_config = TestUserInputConfig(
|
||||
'global', 'general', 'dfs_name_dir')
|
||||
'global', 'general', 'fs_checkpoint_dir')
|
||||
user_input = ui(user_input_config, '/some/new/path')
|
||||
|
||||
server1 = base.TestServer('host1', 'test-master', '11111', 3,
|
||||
@ -496,7 +503,7 @@ class ClusterSpecTest(unittest2.TestCase):
|
||||
cluster_config = cs.ClusterSpec(cluster_config_file)
|
||||
cluster_config.create_operational_config(cluster, [user_input])
|
||||
self.assertEqual('/some/new/path', cluster_config.configurations
|
||||
['global']['dfs_name_dir'])
|
||||
['global']['fs_checkpoint_dir'])
|
||||
|
||||
def test_new_config_item_in_top_level_within_blueprint(self, patched):
|
||||
cluster_config_file = pkg.resource_string(
|
||||
@ -1355,6 +1362,7 @@ class TestNodeGroup:
|
||||
self.node_processes = node_processes
|
||||
self.count = count
|
||||
self.id = name
|
||||
self.storage_paths = ['']
|
||||
|
||||
|
||||
class TestUserInputConfig:
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import savanna.plugins.hdp.services as s
|
||||
from savanna.tests.unit.plugins.hdp.hdp_test_base import TestNodeGroup
|
||||
import unittest2
|
||||
|
||||
|
||||
@ -84,3 +85,28 @@ class ServicesTest(unittest2.TestCase):
|
||||
self.assertEqual(expected_configs,
|
||||
expected_configs & service.configurations)
|
||||
self.assertTrue(service.is_mandatory())
|
||||
|
||||
def test_get_storage_paths(self):
|
||||
service = s.create_service('AMBARI')
|
||||
ng1 = TestNodeGroup(None, None, None)
|
||||
ng1.storage_paths = ['/mnt', '/volume/disk1']
|
||||
ng2 = TestNodeGroup(None, None, None)
|
||||
ng2.storage_paths = ['/mnt']
|
||||
|
||||
paths = service._get_common_paths([ng1, ng2])
|
||||
self.assertEqual(['/mnt'], paths)
|
||||
|
||||
ng3 = TestNodeGroup(None, None, None)
|
||||
ng1.storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2']
|
||||
ng2.storage_paths = ['/mnt']
|
||||
ng3.storage_paths = ['/mnt', '/volume/disk1']
|
||||
|
||||
paths = service._get_common_paths([ng1, ng2, ng3])
|
||||
self.assertEqual(['/mnt'], paths)
|
||||
|
||||
ng1.storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2']
|
||||
ng2.storage_paths = ['/mnt', '/volume/disk1']
|
||||
ng3.storage_paths = ['/mnt', '/volume/disk1']
|
||||
|
||||
paths = service._get_common_paths([ng1, ng2, ng3])
|
||||
self.assertEqual(['/volume/disk1'], paths)
|
||||
|
Loading…
Reference in New Issue
Block a user