Added rack topology configuration for hadoop cluster

Configuration includes topology for compute nodes and swift nodes.

Co-Authored-By: Alexander Kuznetsov <akuznetsov@mirantis.com>

Implements: blueprint rack-and-vms-location-should-pass-to-cluster

Change-Id: I0fdd6ef084507c3a1054bbd50e030f22534231b2
This commit is contained in:
Andrew Lazarev 2013-10-04 16:22:32 +04:00
parent 21632317d6
commit 1dffdda08b
16 changed files with 482 additions and 19 deletions

1
.gitignore vendored
View File

@ -33,6 +33,7 @@ nosetests.xml
pylint-report.txt
etc/local.cfg
etc/savanna/*.conf
etc/savanna/*.topology
etc/savanna.conf
ChangeLog
savanna/tests/integration/configs/config.py

View File

@ -10,6 +10,7 @@ include savanna/db/migration/alembic_migrations/versions/README
recursive-include savanna/locale *
include savanna/plugins/vanilla/resources/*.xml
include savanna/plugins/vanilla/resources/*.sh
include savanna/plugins/hdp/versions/1_3_2/resources/*.template
include savanna/plugins/hdp/versions/1_3_2/resources/*.json
include savanna/plugins/hdp/versions/2_0/resources/*.template
@ -18,6 +19,7 @@ include savanna/service/edp/resources/*.xml
include savanna/swift/resources/*.xml
include savanna/tests/unit/resources/*.xml
include savanna/tests/unit/resources/*.txt
include savanna/topology/resources/*.xml
exclude .gitignore
exclude .gitreview

View File

@ -0,0 +1,6 @@
edp-master-0001 /rack1
10.50.0.8 /rack1
edp-slave-0002 /rack1
10.50.0.5 /rack1
edp-slave-0001 /rack2
10.50.0.6 /rack2

View File

@ -206,6 +206,38 @@
#node_domain=novalocal
#
# Options defined in savanna.topology
#
# Enables data locality for hadoop cluster.
# Also enables data locality for swift used by hadoop.
# If enabled, 'compute_topology' and 'swift_topology'
# configuration parameters should point to openstack and swift
# topology correspondingly.
#enable_data_locality=False
# Enables four-level topology for data locality.
# Works only if corresponding plugin supports such mode.
#enable_hypervisor_awareness=True
# File with nova compute topology.
# It should contains mapping between nova computes and racks.
# File format:
# compute1 /rack1
# compute2 /rack2
# compute3 /rack2
#compute_topology_file=etc/savanna/compute.topology
# File with swift topology.
# It should contains mapping between swift nodes and racks.
# File format:
# node1 /rack1
# node2 /rack2
# node3 /rack2
#swift_topology_file=etc/savanna/swift.topology
[database]
#

View File

@ -0,0 +1,2 @@
10.10.1.86 /rack1
swift1 /rack1

View File

@ -13,15 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from savanna.openstack.common import log as logging
from savanna.plugins import provisioning as p
from savanna.plugins.vanilla import mysql_helper as m_h
from savanna.plugins.vanilla import oozie_helper as o_h
from savanna.swift import swift_helper as swift
from savanna.topology import topology_helper as topology
from savanna.utils import xmlutils as x
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CORE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/resources/core-default.xml')
@ -64,6 +68,10 @@ ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
ENABLE_MYSQL = p.Config('Enable MySQL', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
@ -129,6 +137,8 @@ def _initialise_configs():
configs.append(ENABLE_SWIFT)
configs.append(ENABLE_MYSQL)
if CONF.enable_data_locality:
configs.append(ENABLE_DATA_LOCALITY)
return configs
@ -141,7 +151,7 @@ def get_plugin_configs():
def get_general_configs(hive_hostname, passwd_hive_mysql):
return {
config = {
ENABLE_SWIFT.name: {
'default_value': ENABLE_SWIFT.default_value,
'conf': extract_name_values(swift.get_swift_configs())
@ -152,6 +162,14 @@ def get_general_configs(hive_hostname, passwd_hive_mysql):
hive_hostname, passwd_hive_mysql)
}
}
if CONF.enable_data_locality:
config.update({
ENABLE_DATA_LOCALITY.name: {
'default_value': ENABLE_DATA_LOCALITY.default_value,
'conf': extract_name_values(topology.vm_awareness_all_config())
}
})
return config
def generate_cfg_from_general(cfg, configs, general_config,
@ -222,10 +240,21 @@ def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname,
# applying swift configs if user enabled it
swift_xml_confs = swift.get_swift_configs()
cfg = generate_cfg_from_general(cfg, configs, general_cfg)
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT + swift_xml_confs
mapred_all = MAPRED_DEFAULT
if CONF.enable_data_locality:
cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
mapred_all += topology.vm_awareness_mapred_config()
xml_configs = {
'core-site': x.create_hadoop_xml(cfg, CORE_DEFAULT + swift_xml_confs),
'mapred-site': x.create_hadoop_xml(cfg, MAPRED_DEFAULT),
'core-site': x.create_hadoop_xml(cfg, core_all),
'mapred-site': x.create_hadoop_xml(cfg, mapred_all),
'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT)
}
@ -331,9 +360,19 @@ def _set_config(cfg, gen_cfg, name=None):
return cfg
def is_mysql_enable(cluster):
def _is_general_option_enabled(cluster, option):
for ng in cluster.node_groups:
conf = ng.configuration
if 'general' in conf and ENABLE_MYSQL.name in conf['general']:
return conf['general'][ENABLE_MYSQL.name]
return ENABLE_MYSQL.default_value
if 'general' in conf and option.name in conf['general']:
return conf['general'][option.name]
return option.default_value
def is_mysql_enable(cluster):
return _is_general_option_enabled(cluster, ENABLE_MYSQL)
def is_data_locality_enabled(cluster):
if not CONF.enable_data_locality:
return False
return _is_general_option_enabled(cluster, ENABLE_DATA_LOCALITY)

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from savanna import conductor
from savanna import context
from savanna.openstack.common import log as logging
@ -23,12 +25,14 @@ from savanna.plugins import provisioning as p
from savanna.plugins.vanilla import config_helper as c_helper
from savanna.plugins.vanilla import run_scripts as run
from savanna.plugins.vanilla import scaling as sc
from savanna.topology import topology_helper as th
from savanna.utils import files as f
from savanna.utils import remote
conductor = conductor.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VanillaProvider(p.ProvisioningPluginBase):
@ -193,6 +197,13 @@ class VanillaProvider(p.ProvisioningPluginBase):
oozie is not None and oozie.node_group.id == ng.id)
)
}
if c_helper.is_data_locality_enabled(cluster):
topology_data = th.generate_topology_map(
cluster, CONF.enable_hypervisor_awareness)
extra['topology_data'] = "\n".join(
[k + " " + v for k, v in topology_data.items()]) + "\n"
return extra
def decommission_nodes(self, cluster, instances):
@ -245,13 +256,25 @@ class VanillaProvider(p.ProvisioningPluginBase):
self._push_configs_to_nodes(cluster, extra, instances)
self._configure_master_nodes(cluster, extra, passwd_mysql)
def _push_configs_to_nodes(self, cluster, extra, instances):
def _push_configs_to_nodes(self, cluster, extra, new_instances):
all_instances = utils.get_instances(cluster)
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn('vanilla-configure-%s' % instance.instance_name,
self._push_configs_to_node, cluster, extra, instance)
for instance in all_instances:
if instance in new_instances:
tg.spawn('vanilla-configure-%s' % instance.instance_name,
self._push_configs_to_new_node, cluster,
extra, instance)
else:
tg.spawn('vanilla-reconfigure-%s' % instance.instance_name,
self._push_configs_to_existing_node, cluster,
extra, instance)
def _push_configs_to_node(self, cluster, extra, instance):
def _push_configs_to_existing_node(self, cluster, extra, instance):
if c_helper.is_data_locality_enabled(cluster):
with remote.get_remote(instance) as r:
self._write_topology_data(r, extra)
def _push_configs_to_new_node(self, cluster, extra, instance):
ng_extra = extra[instance.node_group.id]
files = {
@ -286,6 +309,19 @@ class VanillaProvider(p.ProvisioningPluginBase):
r.execute_command(key_cmd)
if c_helper.is_data_locality_enabled(cluster):
r.write_file_to(
'/etc/hadoop/topology.sh',
f.get_file_text(
'plugins/vanilla/resources/topology.sh'))
r.execute_command(
'sudo chmod +x /etc/hadoop/topology.sh'
)
self._write_topology_data(r, extra)
def _write_topology_data(self, r, extra):
r.write_file_to('/etc/hadoop/topology.data', extra['topology_data'])
def _configure_master_nodes(self, cluster, extra, passwd_mysql):
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)

View File

@ -0,0 +1,20 @@
#!/bin/bash
HADOOP_CONF=/etc/hadoop
while [ $# -gt 0 ] ; do
nodeArg=$1
exec< ${HADOOP_CONF}/topology.data
result=""
while read line ; do
ar=( $line )
if [ "${ar[0]}" = "$nodeArg" ] ; then
result="${ar[1]}"
fi
done
shift
if [ -z "$result" ] ; then
echo -n "/default/rack "
else
echo -n "$result "
fi
done

View File

@ -30,20 +30,19 @@
<name>fs.swift.service.savanna.public</name>
<value>true</value>
</property>
<property>
<name>fs.swift.service.savanna.auth.endpoint.prefix</name>
<value>/endpoints/AUTH_</value>
</property>
<!--Not mandatory savanna-provider-specific configs (fs.swift.service.savanna prefix)-->
<!--Location awareness is switched on by default -->
<property>
<name>fs.swift.service.savanna.region</name>
</property>
<property>
<name>fs.swift.service.savanna.apikey</name>
</property>
<property>
<name>fs.swift.service.savanna.location-aware</name>
<value>true</value>
</property>
<!--General configs, without "savanna" prefix.-->

View File

@ -46,8 +46,6 @@ class SwiftIntegrationTestCase(unittest2.TestCase):
result = h.get_swift_configs()
self.assertEqual(7, len(result))
self.assertIn({'name': "fs.swift.service.savanna.location-aware",
'value': 'true', 'description': ''}, result)
self.assertIn({'name': "fs.swift.service.savanna.tenant",
'value': 'test_tenant', 'description': ''}, result)
self.assertIn({'name': "fs.swift.service.savanna.http.port",

View File

View File

@ -0,0 +1,139 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest2
from savanna.conductor import objects as o
from savanna import context
import savanna.topology.topology_helper as th
class TopologyTestCase(unittest2.TestCase):
def setUp(self):
context.set_ctx(context.Context(None, None, None, None))
def test_core_config(self):
result = th.vm_awareness_core_config()
self.assertEqual(4, len(result))
for item in result:
del item['description']
className = 'org.apache.hadoop.net.NetworkTopologyWithNodeGroup'
self.assertIn({'name': "net.topology.impl",
'value': className},
result)
self.assertIn({'name': "net.topology.nodegroup.aware",
'value': 'true'},
result)
className = 'org.apache.hadoop.hdfs.server.namenode.' \
'BlockPlacementPolicyWithNodeGroup'
self.assertIn({'name': "dfs.block.replicator.classname",
'value': className},
result)
def test_map_red_config(self):
result = th.vm_awareness_mapred_config()
self.assertEqual(3, len(result))
for item in result:
del item['description']
self.assertIn({'name': "mapred.jobtracker.nodegroup.aware",
'value': 'true'},
result)
self.assertIn({'name': "mapred.task.cache.levels",
'value': '3'},
result)
className = 'org.apache.hadoop.mapred.JobSchedulableWithNodeGroup'
self.assertIn({'name': "mapred.jobtracker.jobSchedulable",
'value': className},
result)
@mock.patch('savanna.utils.openstack.nova.client')
@mock.patch('savanna.topology.topology_helper._read_compute_topology')
@mock.patch('savanna.topology.topology_helper._read_swift_topology')
def test_get_topology(self,
swift_topology,
compute_topology,
novaclient):
nova = mock.Mock()
novaclient.return_value = nova
r1 = mock.Mock()
r1.hostId = "o1"
r2 = mock.Mock()
r2.hostId = "o1"
r3 = mock.Mock()
r3.hostId = "o2"
nova.servers.get.side_effect = [r1, r2, r3, r1, r2, r3]
swift_topology.return_value = {"s1": "/r1"}
compute_topology.return_value = {"o1": "/r1", "o2": "/r2"}
i1 = o.Instance()
i1.instance_id = "i1"
i1.instance_name = "i1"
i1.internal_ip = "0.0.1.1"
i1.management_ip = "1.1.1.1"
i2 = o.Instance()
i2.instance_id = "i2"
i2.instance_name = "i2"
i2.management_ip = "1.1.1.2"
i2.internal_ip = "0.0.1.2"
i3 = o.Instance()
i3.instance_id = "i3"
i3.instance_name = "i3"
i3.internal_ip = "1.1.1.3"
i3.management_ip = "0.0.1.3"
ng1 = o.NodeGroup()
ng1.name = "1"
ng1.instances = [i1, i2]
ng2 = o.NodeGroup()
ng2.name = "2"
ng2.instances = [i3]
cluster = o.Cluster
cluster.node_groups = [ng1, ng2]
top = th.generate_topology_map(cluster, False)
self.assertEqual(top, {
"i1": "/r1",
"1.1.1.1": "/r1",
"0.0.1.1": "/r1",
"i2": "/r1",
"1.1.1.2": "/r1",
"0.0.1.2": "/r1",
"i3": "/r2",
"1.1.1.3": "/r2",
"0.0.1.3": "/r2",
"s1": "/r1"
})
top = th.generate_topology_map(cluster, True)
self.assertEqual(top, {
"i1": "/r1/o1",
"1.1.1.1": "/r1/o1",
"0.0.1.1": "/r1/o1",
"i2": "/r1/o1",
"1.1.1.2": "/r1/o1",
"0.0.1.2": "/r1/o1",
"i3": "/r2/o2",
"1.1.1.3": "/r2/o2",
"0.0.1.3": "/r2/o2",
"s1": "/r1"
})

View File

View File

@ -0,0 +1,24 @@
<configuration>
<property>
<name>net.topology.impl</name>
<value>org.apache.hadoop.net.NetworkTopologyWithNodeGroup</value>
<description>The implementation of NetworkTopology which is classic three layer one by default.</description>
</property>
<property>
<name>net.topology.nodegroup.aware</name>
<value>true</value>
<description>By default, network topology is not aware of nodegroup layer.</description>
</property>
<property>
<name>dfs.block.replicator.classname</name>
<value>org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup</value>
<description>The default implementation of ReplicationTargetChooser.</description>
</property>
<!--Location awareness for swift -->
<property>
<name>fs.swift.service.savanna.location-aware</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,17 @@
<configuration>
<property>
<name>mapred.jobtracker.nodegroup.aware</name>
<value>true</value>
<description>Identify if jobtracker is aware of nodegroup layer.</description>
</property>
<property>
<name>mapred.task.cache.levels</name>
<value>3</value>
<description>This is the max level of the task cache. For example, if the level is 2, the tasks cached are at the host level and at the rack level.</description>
</property>
<property>
<name>mapred.jobtracker.jobSchedulable</name>
<value>org.apache.hadoop.mapred.JobSchedulableWithNodeGroup</value>
<description>The class responsible for an entity in FairScheduler that can launch tasks.</description>
</property>
</configuration>

View File

@ -0,0 +1,148 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
from oslo.config import cfg
from savanna import context
from savanna import exceptions as ex
from savanna.openstack.common import log
from savanna.utils.openstack import nova
from savanna.utils import xmlutils as x
TOPOLOGY_CONFIG = {
"topology.node.switch.mapping.impl":
"org.apache.hadoop.net.ScriptBasedMapping",
"topology.script.file.name":
"/etc/hadoop/topology.sh"
}
LOG = log.getLogger(__name__)
opts = [
cfg.BoolOpt('enable_data_locality',
default=False,
help="""Enables data locality for hadoop cluster.
Also enables data locality for swift used by hadoop.
If enabled, 'compute_topology' and 'swift_topology'
configuration parameters should point to openstack and swift
topology correspondingly."""),
cfg.BoolOpt('enable_hypervisor_awareness',
default=True,
help="""Enables four-level topology for data locality.
Works only if corresponding plugin supports such mode."""),
cfg.StrOpt('compute_topology_file',
default='etc/savanna/compute.topology',
help="""File with nova compute topology.
It should contains mapping between nova computes and racks.
File format:
compute1 /rack1
compute2 /rack2
compute3 /rack2"""),
cfg.StrOpt('swift_topology_file',
default='etc/savanna/swift.topology',
help="""File with swift topology.
It should contains mapping between swift nodes and racks.
File format:
node1 /rack1
node2 /rack2
node3 /rack2""")
]
CONF = cfg.CONF
CONF.register_opts(opts)
def _read_swift_topology():
topology = {}
try:
with open(CONF.swift_topology_file) as f:
for line in f:
(host, path) = line.split()
topology[host] = path
except IOError:
raise ex.NotFoundException(
CONF.swift_topology_file,
"Unable to find file %s with swift topology")
return topology
def _read_compute_topology():
ctx = context.ctx()
tenant_id = str(ctx.tenant_id)
topology = {}
try:
with open(CONF.compute_topology_file) as f:
for line in f:
(host, path) = line.split()
#calulating host id based on tenant id and host
#using the same algorithm as in nova
#see nova/api/openstack/compute/views/servers.py
#def _get_host_id(instance):
sha_hash = hashlib.sha224(tenant_id + host)
topology[sha_hash.hexdigest()] = path
except IOError:
raise ex.NotFoundException(
CONF.compute_topology_file,
"Unable to find file %s with compute topology")
return topology
def generate_topology_map(cluster, is_node_awareness):
mapping = _read_compute_topology()
nova_client = nova.client()
topology_mapping = {}
for ng in cluster.node_groups:
for i in ng.instances:
#TODO(alazarev) get all servers info with one request
ni = nova_client.servers.get(i.instance_id)
hostId = ni.hostId
if hostId not in mapping:
raise ex.NotFoundException(
i.instance_id, "Was not able to find compute node "
"topology for VM %s")
rack = mapping[hostId]
if is_node_awareness:
rack += "/" + hostId
topology_mapping[i.instance_name] = rack
topology_mapping[i.management_ip] = rack
topology_mapping[i.internal_ip] = rack
topology_mapping.update(_read_swift_topology())
return topology_mapping
def vm_awareness_core_config():
c = x.load_hadoop_xml_defaults('topology/resources/core-template.xml')
result = [cfg for cfg in c if cfg['value']]
LOG.info("Vm awareness will add following configs in core-site "
"params: %s", result)
return result
def vm_awareness_mapred_config():
c = x.load_hadoop_xml_defaults('topology/resources/mapred-template.xml')
result = [cfg for cfg in c if cfg['value']]
LOG.info("Vm awareness will add following configs in map-red "
"params: %s", result)
return result
def vm_awareness_all_config():
return vm_awareness_core_config() + vm_awareness_mapred_config()