Merge "Enable swift integration"

This commit is contained in:
Jenkins 2013-09-21 04:08:06 +00:00 committed by Gerrit Code Review
commit 25237c9708
6 changed files with 206 additions and 8 deletions

View File

@ -65,6 +65,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
if provisioned:
installed = self._install_services(cluster.name, ambari_info)
if installed:
# install the swift integration on the servers
self._install_swift_integration(servers)
LOG.info("Install of Hadoop stack successful.")
# add service urls
self._set_cluster_info(cluster, cluster_spec, ambari_info)
@ -99,6 +101,10 @@ class AmbariPlugin(p.ProvisioningPluginBase):
return node_processes
def _install_swift_integration(self, servers):
for server in servers:
server.install_swift_integration()
def convert(self, config, plugin_name, version, template_name,
cluster_template_create):
normalized_config = clusterspec.ClusterSpec(config).normalize()
@ -469,6 +475,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
self._install_components(ambari_info, auth, cluster_name, servers)
self._install_swift_integration(servers)
self._start_components(ambari_info, auth, cluster_name, servers)
def _exec_ambari_command(self, auth, body, cmd_uri):

View File

@ -14,6 +14,8 @@
# limitations under the License.
from savanna.plugins.hdp import baseprocessor as b
from savanna.plugins import provisioning as p
from savanna.swift import swift_helper as h
class BlueprintProcessor(b.BaseProcessor):
@ -24,6 +26,7 @@ class BlueprintProcessor(b.BaseProcessor):
def process_user_inputs(self, user_inputs):
context = {}
self._append_swift_inputs(user_inputs)
for ui in user_inputs:
for handler in self.ui_handlers:
if handler.apply_user_input(ui, self.blueprint, context):
@ -61,6 +64,15 @@ class BlueprintProcessor(b.BaseProcessor):
self.blueprint['host_role_mappings'].append(
host_role_mapping)
def _append_swift_inputs(self, user_inputs):
swift_props = h.get_swift_configs()
for prop in swift_props:
config = p.Config(prop['name'], 'general', 'cluster')
setattr(config, 'tag', 'core-site')
ui = p.UserInput(config, prop['value'])
user_inputs.append(ui)
class StandardConfigHandler(b.BaseProcessor):
def apply_user_input(self, ui, blueprint, ctx):

View File

@ -20,6 +20,9 @@ from savanna.plugins.hdp import savannautils
AMBARI_RPM = 'http://s3.amazonaws.com/public-repo-1.hortonworks.com/' \
'ambari/centos6/1.x/updates/1.2.5.17/ambari.repo'
HADOOP_SWIFT_RPM = 'https://s3.amazonaws.com/public-repo-1.hortonworks.com/' \
'savanna/swift/hadoop-swift-1.0-1.x86_64.rpm'
LOG = logging.getLogger(__name__)
@ -50,6 +53,15 @@ class HadoopServer:
r.execute_command(rpm_cmd)
r.execute_command('yum -y install epel-release')
@savannautils.inject_remote('r')
def install_swift_integration(self, r):
LOG.info(
"{0}: Installing swift integration ..."
.format(self.instance.hostname))
rpm_cmd = 'rpm -Uvh ' + HADOOP_SWIFT_RPM
r.execute_command(rpm_cmd)
@savannautils.inject_remote('r')
def _setup_and_start_ambari_server(self, port, r):
LOG.info(

View File

@ -155,6 +155,105 @@
"is_optional": true,
"name": "hadoop.security.auth_to_local",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "string",
"default_value": "org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem",
"description": "Swift FileSystem implementation",
"is_optional": true,
"name": "fs.swift.impl",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "15000",
"description": "timeout for all connections",
"is_optional": true,
"name": "fs.swift.connect.timeout",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "60000",
"description": "how long the connection waits for responses from servers",
"is_optional": true,
"name": "fs.swift.socket.timeout",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "3",
"description": "connection retry count for all connections",
"is_optional": true,
"name": "fs.swift.connect.retry.count",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "0",
"description": "delay in millis between bulk (delete, rename, copy operations)",
"is_optional": true,
"name": "fs.swift.connect.throttle.delay",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "32768",
"description": "blocksize for filesystem (kb)",
"is_optional": true,
"name": "fs.swift.blocksize",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "4718592",
"description": "the partition size for uploads (kb)",
"is_optional": true,
"name": "fs.swift.partsize",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "64",
"description": "request size for reads in KB",
"is_optional": true,
"name": "fs.swift.requestsize",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "bool",
"default_value": true,
"description": "savanna provider public attribute",
"is_optional": true,
"name": "fs.swift.service.savanna.public",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "8080",
"description": " ",
"is_optional": true,
"name": "fs.swift.service.savanna.http.port",
"scope": "cluster"
},
{
"applicable_target": "general",
"config_type": "int",
"default_value": "443",
"description": " ",
"is_optional": true,
"name": "fs.swift.service.savanna.https.port",
"scope": "cluster"
}
]
},

View File

@ -242,7 +242,20 @@
{ "name" : "fs.checkpoint.dir", "value" : "/hadoop/hdfs/namesecondary" },
{ "name" : "fs.checkpoint.period", "value" : "21600" },
{ "name" : "fs.checkpoint.size", "value" : "0.5" },
{ "name" : "fs.checkpoint.edits.dir", "value" : "/hadoop/hdfs/namesecondary" }
{ "name" : "fs.checkpoint.edits.dir", "value" : "/hadoop/hdfs/namesecondary" },
{ "name" : "fs.swift.impl", "value" : "org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem" },
{ "name" : "fs.swift.connect.timeout", "value" : "15000" },
{ "name" : "fs.swift.socket.timeout", "value" : "60000" },
{ "name" : "fs.swift.connect.retry.count", "value" : "3" },
{ "name" : "fs.swift.connect.throttle.delay", "value" : "0" },
{ "name" : "fs.swift.blocksize", "value" : "32768" },
{ "name" : "fs.swift.partsize", "value" : "4718592" },
{ "name" : "fs.swift.requestsize", "value" : "64" },
{ "name" : "fs.swift.service.savanna.public", "value" : "true" },
{ "name" : "fs.swift.service.savanna.http.port", "value" : "8080" },
{ "name" : "fs.swift.service.savanna.https.port", "value" : "443" },
{ "name" : "fs.swift.service.savanna.auth.url", "value" : "None" },
{ "name" : "fs.swift.service.savanna.tenant", "value" : "None"}
]
},
{

View File

@ -15,6 +15,7 @@
import collections as c
import json
import mock
import os
import unittest2
@ -40,7 +41,9 @@ class BlueprintProcessorTest(unittest2.TestCase):
return elem
def test_existing_config_item_in_top_level_within_blueprint(self):
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_existing_config_item_in_top_level_within_blueprint(self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(
json.load(open(
os.path.join(os.path.realpath('./unit/plugins'), 'hdp',
@ -80,8 +83,10 @@ class BlueprintProcessorTest(unittest2.TestCase):
self.assertEqual('/some/new/path', prop_value,
'prop value is wrong post config processing')
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_insert_new_config_item_into_existing_top_level_configuration(
self):
self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(
json.load(open(
os.path.join(os.path.realpath('./unit/plugins'), 'hdp',
@ -116,8 +121,10 @@ class BlueprintProcessorTest(unittest2.TestCase):
'namenode_heapsize')
self.assertIsNotNone(prop_dict, 'no matching property should be found')
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_insert_new_config_item_into_newly_created_top_level_configuration(
self):
self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(
json.load(open(
os.path.join(os.path.realpath('./unit/plugins'), 'hdp',
@ -145,14 +152,16 @@ class BlueprintProcessorTest(unittest2.TestCase):
# process the input configuration
processor.process_user_inputs(configs_list)
configs = self._xpath_get(processor.blueprint, '/configurations')
self.assertEqual(2, len(configs), 'no config section added')
self.assertEqual(3, len(configs), 'no config section added')
self.assertEqual('mapred-site', configs[1]['name'],
'section should exist')
self.assertEqual('mapred.job.tracker.handler.count',
configs[1]['properties'][0]['name'],
'property not added')
def test_update_ambari_admin_user(self):
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_update_ambari_admin_user(self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(json.load(open(os.path.join(
os.path.realpath('./unit/plugins'), 'hdp', 'resources',
'sample-ambari-blueprint.json'), 'r')))
@ -176,7 +185,9 @@ class BlueprintProcessorTest(unittest2.TestCase):
self.assertEqual('new-user', services[2]['users'][0]['name'])
def test_update_ambari_admin_password(self):
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_update_ambari_admin_password(self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(json.load(open(os.path.join(
os.path.realpath('./unit/plugins'), 'hdp', 'resources',
'sample-ambari-blueprint.json'), 'r')))
@ -200,7 +211,9 @@ class BlueprintProcessorTest(unittest2.TestCase):
self.assertEqual('new-pwd', services[2]['users'][0]['password'])
def test_update_ambari_admin_user_and_password(self):
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_update_ambari_admin_user_and_password(self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(json.load(open(os.path.join(
os.path.realpath('./unit/plugins'), 'hdp', 'resources',
'sample-ambari-blueprint.json'), 'r')))
@ -518,6 +531,39 @@ class BlueprintProcessorTest(unittest2.TestCase):
self.assertDictEqual(expected_slave_dict, host_mappings[1],
'second mapping does not match')
@mock.patch('savanna.swift.swift_helper.get_swift_configs')
def test_swift_props_added(self, helper):
helper.side_effect = my_get_configs
processor = bp.BlueprintProcessor(json.load(open(os.path.join(
os.path.realpath('./unit/plugins'), 'hdp', 'resources',
'sample-ambari-blueprint.json'), 'r')))
config_items = [
{"value": "512m",
"config": {
"name": "namenode_heapsize",
"description": "heap size",
"default_value": "256m",
"config_type": "string",
"is_optional": "true",
"applicable_target": "general",
"tag": "global",
"scope": "cluster"}
}
]
configs_list = self.json2obj(json.dumps(config_items))
processor.process_user_inputs(configs_list)
configurations = self._xpath_get(processor.blueprint,
'/configurations')
core_site_section = processor._find_blueprint_section(configurations,
'name',
'core-site')
self.assertIsNotNone(core_site_section)
props = core_site_section['properties']
self.assertEqual(3, len(props))
def _json_object_hook(self, d):
return c.namedtuple('X', d.keys())(*d.values())
@ -535,3 +581,11 @@ def _create_ng(name, flavor, node_processes, count, image):
}
return r.NodeGroupResource(dct)
def my_get_configs(*args, **kwargs):
return [{"name": "fs.swift.service.savanna.auth.url",
"value": "someURI"},
{"name": "fs.swift.service.savanna.tenant",
"value": "admin"},
{"name": "fs.swift.service.savanna.http.port", "value": 8080}]