Merge "Adding IDH plugin basic implementation"

This commit is contained in:
Jenkins 2014-01-10 21:07:39 +00:00 committed by Gerrit Code Review
commit 93e9d5bdc8
30 changed files with 7114 additions and 1 deletions

View File

@ -9,6 +9,7 @@ include savanna/db/migration/alembic_migrations/versions/README
recursive-include savanna/locale *
include savanna/plugins/intel/resources/*.xml
include savanna/plugins/vanilla/resources/*.xml
include savanna/plugins/vanilla/resources/*.sh
include savanna/plugins/vanilla/resources/*.sql

View File

@ -182,3 +182,10 @@ class ThreadException(SavannaException):
self.message = "An error occurred in thread '%s': %s" % (
thread_description, str(e))
self.code = "THREAD_EXCEPTION"
class NotImplementedException(SavannaException):
code = "NOT_IMPLEMENTED"
def __init__(self, feature):
self.message = "Feature '%s' is not implemented" % feature

View File

View File

View File

@ -0,0 +1,33 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.intel.client import cluster
from savanna.plugins.intel.client import nodes
from savanna.plugins.intel.client import params
from savanna.plugins.intel.client import rest as r
from savanna.plugins.intel.client import services
class IntelClient():
def __init__(self, manager_ip, cluster_name):
#TODO(alazarev) make credentials configurable (bug #1262881)
self.rest = r.RESTClient(manager_ip, 'admin', 'admin')
self.cluster_name = cluster_name
self._ctx = self
self.cluster = cluster.Cluster(self)
self.nodes = nodes.Nodes(self)
self.params = params.Params(self)
self.services = services.Services(self)

View File

@ -0,0 +1,44 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.intel.client import context as c
from savanna.plugins.intel.client import session
class Cluster(c.IntelContext):
def create(self):
url = '/cluster'
data = {
'name': self.cluster_name,
'dnsresolution': True,
'acceptlicense': True
}
return self.rest.post(url, data)
def get(self):
url = '/cluster/%s' % self.cluster_name
return self.rest.get(url)
def install_software(self, nodes):
_nodes = [{'hostname': host} for host in nodes]
url = '/cluster/%s/nodes/commands/installsoftware' % self.cluster_name
session_id = self.rest.post(url, _nodes)['sessionID']
return session.wait(self, session_id)
def upload_authzkeyfile(self, authzkeyfile):
url = '/cluster/%s/upload/authzkey' % self.cluster_name
return self.rest.post(url,
files={'file': authzkeyfile})['upload result']

View File

@ -0,0 +1,21 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class IntelContext(object):
def __init__(self, ctx):
self._ctx = ctx._ctx
self.cluster_name = ctx.cluster_name
self.rest = ctx.rest

View File

@ -0,0 +1,65 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.intel.client import context as c
from savanna.plugins.intel.client import session
from savanna.plugins.intel import exceptions as iex
class Nodes(c.IntelContext):
def add(self, nodes, rack, username, path_to_key, keypass=''):
hosts = {
'method': 'useauthzkeyfile',
'nodeinfo': map(lambda host: {
'hostname': host,
'username': username,
'passphrase': keypass,
'authzkeyfile': path_to_key,
'rackName': rack
}, nodes)
}
url = '/cluster/%s/nodes' % self.cluster_name
resp = self.rest.post(url, hosts)['items']
for node_info in resp:
if node_info['info'] != 'Connected':
raise iex.IntelPluginException(
'Error adding nodes: %s' % node_info['iporhostname'])
def get(self):
url = '/cluster/%s/nodes' % self.cluster_name
return self.rest.get(url)
def get_status(self, node):
url = '/cluster/%s/nodes/%s' % (self.cluster_name, node)
return self.rest.get(url)['status']
def delete(self, node):
url = '/cluster/%s/nodes/%s' % (self.cluster_name, node)
return self.rest.delete(url)
def config(self, force=False):
url = ('/cluster/%s/nodes/commands/confignodes/%s'
% (self.cluster_name, 'force' if force else 'noforce'))
session_id = self.rest.post(url)['sessionID']
return session.wait(self, session_id)
def stop(self, nodes):
url = '/cluster/%s/nodes/commands/stopnodes' % self.cluster_name
data = [{'hostname': host} for host in nodes]
return self.rest.post(url, data)

View File

@ -0,0 +1,76 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import exceptions
from savanna.plugins.intel.client import context as c
class BaseParams(c.IntelContext):
def __init__(self, ctx, service):
super(BaseParams, self).__init__(ctx)
self.service = service
def add(self, item, value, desc=''):
data = {
'editdesc': desc,
'items': [
{
'type': self.service,
'item': item,
'value': value,
'desc': desc
}
]
}
url = ('/cluster/%s/configuration/%s'
% (self.cluster_name, self.service))
return self.rest.post(url, data)
def update(self, item, value, desc='', nodes=None):
data = {
'editdesc': desc,
'items': [
{
'type': self.service,
'item': item,
'value': value
}
]
}
if nodes:
data = {
'editdesc': desc,
'items': map(lambda node: {
'type': self.service,
'item': item,
'value': value,
'hostname': node
}, nodes)
}
url = ('/cluster/%s/configuration/%s'
% (self.cluster_name, self.service))
return self.rest.put(url, data)
def get(self, hosts, item):
raise exceptions.NotImplementedException("BaseParams.get")
class Params(c.IntelContext):
def __init__(self, ctx):
super(Params, self).__init__(ctx)
self.hadoop = BaseParams(self, 'hadoop')
self.hdfs = BaseParams(self, 'hdfs')
self.mapred = BaseParams(self, 'mapred')

View File

@ -0,0 +1,78 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
from requests import auth
from savanna.openstack.common import log as logging
from savanna.plugins.intel import exceptions as iex
LOG = logging.getLogger(__name__)
class RESTClient():
def __init__(self, manager_ip, auth_username, auth_password):
#TODO(alazarev) make port configurable (bug #1262895)
self.base_url = ('https://%s:9443/restapi/intelcloud/api/v1'
% manager_ip)
LOG.debug("Connecting to manager with URL of %s", self.base_url)
self.auth = auth.HTTPBasicAuth(auth_username, auth_password)
def get(self, url):
url = self.base_url + url
LOG.debug("Sending GET to URL of %s", url)
r = requests.get(url, verify=False, auth=self.auth)
return self._check_response(r)
def post(self, url, data=None, files=None):
url = self.base_url + url
LOG.debug("Sending POST to URL '%s' (%s files): %s", url,
len(files) if files else 0,
data if data else 'no data')
r = requests.post(url, data=json.dumps(data) if data else None,
verify=False, auth=self.auth, files=files)
return self._check_response(r)
def delete(self, url):
url = self.base_url + url
LOG.debug("Sending DELETE to URL of %s", url)
r = requests.delete(url, verify=False, auth=self.auth)
return self._check_response(r)
def put(self, url, data=None):
url = self.base_url + url
if data:
LOG.debug("Sending PUT to URL of %s: %s", url, data)
r = requests.put(url, data=json.dumps(data), verify=False,
auth=self.auth)
else:
LOG.debug("Sending PUT to URL of %s with no data", url)
r = requests.put(url, verify=False, auth=self.auth)
return self._check_response(r)
def _check_response(self, resp):
LOG.debug("Response with HTTP code %s, and content of %s",
resp.status_code, resp.text)
if not resp.ok:
raise iex.IntelPluginException(
"Request to manager returned with code '%s', reason '%s' "
"and message '%s'" % (resp.status_code, resp.reason,
json.loads(resp.text)['message']))
else:
return json.loads(resp.text)

View File

@ -0,0 +1,137 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import context
from savanna.openstack.common import log as logging
from savanna.plugins.intel.client import context as c
from savanna.plugins.intel.client import session
from savanna.plugins.intel import exceptions as iex
LOG = logging.getLogger(__name__)
class BaseService(c.IntelContext):
def __init__(self, ctx, service_name):
super(BaseService, self).__init__(ctx)
self.service = service_name
def start(self):
url = ('/cluster/%s/services/%s/commands/start'
% (self.cluster_name, self.service))
self.rest.post(url)
timeout = 120
cur_time = 0
while cur_time < timeout:
context.sleep(2)
if self.status() == 'running':
break
else:
cur_time += 2
else:
raise iex.IntelPluginException(
"Service '%s' has failed to start in %s seconds"
% (self.service, timeout))
def stop(self):
url = ('/cluster/%s/services/%s/commands/stop'
% (self.cluster_name, self.service))
return self.rest.post(url)
def status(self):
url = '/cluster/%s/services' % self.cluster_name
statuses = self.rest.get(url)['items']
for st in statuses:
if st['serviceName'] == self.service:
return st['status']
raise iex.IntelPluginException(
"Service '%s' is not installed on cluster '%s'"
% (self.service, self.cluster_name))
def get_nodes(self):
url = '/cluster/%s/services/%s' % (self.cluster_name, self.service)
return self.rest.get(url)
def add_nodes(self, role, nodes):
url = ('/cluster/%s/services/%s/roles'
% (self.cluster_name, self.service))
data = map(lambda host: {
'rolename': role,
'hostname': host
}, nodes)
return self.rest.post(url, data)
class HDFSService(BaseService):
def format(self, force=False):
url = ('/cluster/%s/services/hdfs/commands/hdfsformat/%s'
% (self.cluster_name, 'force' if force else 'noforce'))
session_id = self.rest.post(url)['sessionID']
return session.wait(self, session_id)
def decommission_nodes(self, nodes, force=False):
url = ('/cluster/%s/nodes/commands/decommissionnodes/%s'
% (self.cluster_name, 'force' if force else 'noforce'))
data = map(lambda host: {
'hostname': host
}, nodes)
return self.rest.post(url, data)
def get_datanodes_status(self):
url = '/cluster/%s/nodes/commands/datanodes/status' % self.cluster_name
return self.rest.get(url)['items']
def get_datanode_status(self, datanode):
stats = self.get_datanodes_status()
for stat in stats:
if stat['hostname'] == datanode:
return stat['status'].strip()
raise iex.IntelPluginException(
"Datanode service is is not installed on node '%s'" % datanode)
class Services(c.IntelContext):
def __init__(self, ctx):
super(Services, self).__init__(ctx)
self.hdfs = HDFSService(self, 'hdfs')
self.mapred = BaseService(self, 'mapred')
self.hive = BaseService(self, 'hive')
self.oozie = BaseService(self, 'oozie')
def add(self, services):
_services = map(lambda service: {
'serviceName': service,
'type': service
}, services)
url = '/cluster/%s/services' % self.cluster_name
return self.rest.post(url, _services)
def get_services(self):
url = '/cluster/%s/services' % self.cluster_name
return self.rest.get(url)
def delete_service(self, service):
url = '/cluster/%s/services/%s' % (self.cluster_name, service)
return self.rest.delete(url)

View File

@ -0,0 +1,49 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import context
from savanna.openstack.common import log as logging
from savanna.plugins.intel import exceptions as iex
LOG = logging.getLogger(__name__)
def get(ctx, session_id):
url = '/cluster/%s/session/%s' % (ctx.cluster_name, session_id)
return ctx.rest.get(url)
def wait(ctx, session_id):
#TODO(lazarev) add check on savanna cluster state (exit on delete)
#TODO(alazarev) make configurable (bug #1262897)
timeout = 4*60*60 # 4 hours
cur_time = 0
while cur_time < timeout:
info_items = get(ctx, session_id)['items']
for item in info_items:
progress = item['nodeprogress']
if progress['info'].strip() == '_ALLFINISH':
return
else:
context.sleep(10)
cur_time += 10
debug_msg = 'Hostname: %s\nInfo: %s'
debug_msg = debug_msg % (progress['hostname'], progress['info'])
LOG.debug(debug_msg)
else:
raise iex.IntelPluginException(
"Cluster '%s' has failed to start in %s minutes"
% (ctx.cluster_name, timeout / 60))

View File

@ -0,0 +1,128 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins import provisioning as p
from savanna.utils import xmlutils as x
CORE_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
'plugins/intel/resources/hadoop-default.xml')
HDFS_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
'plugins/intel/resources/hdfs-default.xml')
MAPRED_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
'plugins/intel/resources/mapred-default.xml')
XML_CONFS = {
"Hadoop": [CORE_DEFAULT],
"HDFS": [HDFS_DEFAULT],
"MapReduce": [MAPRED_DEFAULT]
}
IDH_TARBALL_URL = p.Config('IDH tarball URL', 'general', 'cluster', priority=1,
default_value='http://repo1.intelhadoop.com:3424/'
'setup/setup-intelhadoop-'
'2.5.1-en-evaluation.RHEL.tar.gz')
OS_REPO_URL = p.Config('OS repository URL', 'general', 'cluster', priority=1,
is_optional=True,
default_value='http://mirror.centos.org/'
'centos-6/6/os/x86_64')
IDH_REPO_URL = p.Config('IDH repository URL', 'general', 'cluster',
priority=1, is_optional=True,
default_value='http://repo1.intelhadoop.com:3424'
'/evaluation/en/RHEL/2.5.1/rpm')
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir',
'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir']
CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication',
'dfs.replication.min', 'dfs.replication.max',
'io.file.buffer.size', 'mapreduce.job.counters.max',
'mapred.output.compress', 'io.compression.codecs',
'mapred.output.compression.codec',
'mapred.output.compression.type',
'mapred.compress.map.output',
'mapred.map.output.compression.codec']
PRIORITY_1_CONFS = ['dfs.datanode.du.reserved',
'dfs.datanode.failed.volumes.tolerated',
'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count',
'dfs.namenode.handler.count', 'mapred.child.java.opts',
'mapred.jobtracker.maxtasks.per.job',
'mapred.job.tracker.handler.count',
'mapred.map.child.java.opts',
'mapred.reduce.child.java.opts',
'io.sort.mb', 'mapred.tasktracker.map.tasks.maximum',
'mapred.tasktracker.reduce.tasks.maximum']
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
CFG_TYPE = {
"Boolean": "bool",
"String": "string",
"Integer": "int",
"Choose": "string",
"Class": "string",
"Directory": "string",
"Float": "string",
"Int_range": "string",
}
def _initialise_configs():
configs = []
for service, config_lists in XML_CONFS.iteritems():
for config_list in config_lists:
for config in config_list:
if config['name'] not in HIDDEN_CONFS:
cfg = p.Config(
config['name'], service, "cluster", is_optional=True,
config_type="string",
default_value=str(config['value']),
description=config['description'])
if config.get('type'):
cfg.config_type = CFG_TYPE[config['type']]
if cfg.config_type == 'bool':
cfg.default_value = cfg.default_value == 'true'
if cfg.config_type == 'int':
if cfg.default_value:
cfg.default_value = int(cfg.default_value)
else:
cfg.config_type = 'string'
if config['name'] in PRIORITY_1_CONFS:
cfg.priority = 1
configs.append(cfg)
configs.append(IDH_TARBALL_URL)
configs.append(IDH_REPO_URL)
configs.append(OS_REPO_URL)
configs.append(ENABLE_SWIFT)
return configs
PLUGIN_CONFIGS = _initialise_configs()
def get_plugin_configs():
return PLUGIN_CONFIGS

View File

@ -0,0 +1,31 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.exceptions as e
class NotSingleManagerException(e.SavannaException):
message = "Intel hadoop cluster should contain only 1 Intel " \
"Manager instance. Actual manager count is %s"
def __init__(self, mng_count):
self.message = self.message % mng_count
self.code = "NOT_SINGLE_MANAGER"
class IntelPluginException(e.SavannaException):
def __init__(self, message):
self.message = message
self.code = "INTEL_PLUGIN_EXCEPTION"

View File

@ -0,0 +1,411 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
import telnetlib
from savanna import conductor
from savanna import context
from savanna.openstack.common import log as logging
from savanna.plugins.general import utils as u
from savanna.plugins.intel.client import client as c
from savanna.plugins.intel import config_helper as c_helper
from savanna.plugins.intel import exceptions as iex
from savanna.swift import swift_helper as swift
from savanna.utils import crypto
conductor = conductor.API
LOG = logging.getLogger(__name__)
_INST_CONF_TEMPLATE = """
network_interface=eth0
mode=silent
accept_jdk_license=accept
how_to_setup_os_repo=2
os_repo=%s
os_repo_username=
os_repo_password=
os_repo_proxy=
how_to_setup_idh_repo=1
idh_repo=%s
idh_repo_username=
idh_repo_password=
idh_repo_proxy=
firewall_selinux_setting=1"""
SWIFT_LIB_URL = \
'http://savanna-files.mirantis.com/hadoop-swift/hadoop-swift-latest.jar'
def install_manager(cluster):
LOG.info("Starting Install Manager Process")
mng_instance = u.get_instance(cluster, 'manager')
idh_tarball_path = \
cluster.cluster_configs['general'].get('IDH tarball URL')
if not idh_tarball_path:
idh_tarball_path = c_helper.IDH_TARBALL_URL.default_value
idh_tarball_filename = idh_tarball_path.rsplit('/', 1)[-1]
idh_dir = idh_tarball_filename[:idh_tarball_filename.find('.tar.gz')]
LOG.info("IDH tgz will be retrieved from: \'%s\'", idh_tarball_path)
idh_repo = cluster.cluster_configs['general'].get('IDH repository URL')
if not idh_repo:
idh_repo = c_helper.IDH_REPO_URL.default_value
os_repo = cluster.cluster_configs['general'].get('OS repository URL')
if not os_repo:
os_repo = c_helper.OS_REPO_URL.default_value
idh_install_cmd = 'sudo ./%s/install.sh --mode=silent 2>&1' % idh_dir
with mng_instance.remote() as r:
LOG.info("Download IDH manager ")
try:
r.execute_command('curl -O %s 2>&1' % idh_tarball_path)
except Exception as e:
raise RuntimeError("Unable to download IDH manager from %s",
idh_tarball_path, e)
# unpack archive
LOG.info("Unpack manager %s ", idh_tarball_filename)
try:
r.execute_command('tar xzf %s 2>&1' % idh_tarball_filename)
except Exception as e:
raise RuntimeError("Unable to unpack tgz %s",
idh_tarball_filename, e)
# install idh
LOG.debug("Install manager with %s : ", idh_install_cmd)
inst_conf = _INST_CONF_TEMPLATE % (os_repo, idh_repo)
r.write_file_to('%s/ui-installer/conf' % idh_dir, inst_conf)
#TODO(alazarev) make timeout configurable (bug #1262897)
r.execute_command(idh_install_cmd, timeout=3600)
# fix nginx persimmions bug
r.execute_command('sudo chmod o+x /var/lib/nginx/ /var/lib/nginx/tmp '
'/var/lib/nginx/tmp/client_body')
# waiting start idh manager
#TODO(alazarev) make timeout configurable (bug #1262897)
timeout = 600
LOG.debug("Waiting %s seconds for Manager to start : ", timeout)
while timeout:
try:
telnetlib.Telnet(mng_instance.management_ip, 9443)
break
except IOError:
timeout -= 2
context.sleep(2)
else:
message = ("IDH Manager failed to start in %s minutes on node '%s' "
"of cluster '%s'"
% (timeout / 60, mng_instance.management_ip, cluster.name))
LOG.error(message)
raise iex.IntelPluginException(message)
def configure_os(cluster):
instances = u.get_instances(cluster)
configure_os_from_instances(cluster, instances)
def create_hadoop_ssh_keys(cluster):
private_key, public_key = crypto.generate_key_pair()
extra = {
'hadoop_private_ssh_key': private_key,
'hadoop_public_ssh_key': public_key
}
return conductor.cluster_update(context.ctx(), cluster, {'extra': extra})
def configure_os_from_instances(cluster, instances):
for instance in instances:
with instance.remote() as remote:
LOG.debug("Configuring OS settings on %s : ", instance.hostname())
# configure hostname, RedHat/Centos specific
remote.replace_remote_string('/etc/sysconfig/network',
'HOSTNAME=.*',
'HOSTNAME=%s' % instance.hostname())
# disable selinux and iptables, because Intel distribution requires
# this to be off
remote.execute_command('sudo /usr/sbin/setenforce 0')
remote.replace_remote_string('/etc/selinux/config',
'SELINUX=.*', 'SELINUX=disabled')
# disable iptables
remote.execute_command('sudo /sbin/service iptables stop')
remote.execute_command('sudo /sbin/chkconfig iptables off')
# create 'hadoop' user
remote.write_files_to({
'id_rsa': cluster.extra.get('hadoop_private_ssh_key'),
'authorized_keys': cluster.extra.get('hadoop_public_ssh_key')
})
remote.execute_command(
'sudo useradd hadoop && '
'sudo sh -c \'echo "hadoop ALL=(ALL) NOPASSWD:ALL" '
'>> /etc/sudoers\' && '
'sudo mkdir -p /home/hadoop/.ssh/ && '
'sudo mv id_rsa authorized_keys /home/hadoop/.ssh && '
'sudo chown -R hadoop:hadoop /home/hadoop/.ssh && '
'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}')
swift_enable = \
cluster.cluster_configs['general'].get('Enable Swift')
if not swift_enable:
swift_enable = c_helper.ENABLE_SWIFT.default_value
if swift_enable:
swift_lib_path = '/usr/lib/hadoop/lib/hadoop-swift-latest.jar'
cmd = ('sudo curl \'%s\' -o %s --create-dirs'
% (SWIFT_LIB_URL, swift_lib_path))
remote.execute_command(cmd)
def _configure_services(client, cluster):
nn_host = u.get_namenode(cluster).hostname()
snn = u.get_secondarynamenodes(cluster)
snn_host = snn[0].hostname() if snn else None
jt_host = u.get_jobtracker(cluster).hostname()
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
tt_hosts = [tt.hostname() for tt in u.get_tasktrackers(cluster)]
oozie_host = u.get_oozie(cluster).hostname() if u.get_oozie(
cluster) else None
hive_host = u.get_hiveserver(cluster).hostname() if u.get_hiveserver(
cluster) else None
services = []
if u.get_namenode(cluster):
services += ['hdfs']
if u.get_jobtracker(cluster):
services += ['mapred']
if oozie_host:
services += ['oozie']
if hive_host:
services += ['hive']
LOG.debug("Add services: %s" % ', '.join(services))
client.services.add(services)
LOG.debug("Assign roles to hosts")
client.services.hdfs.add_nodes('PrimaryNameNode', [nn_host])
client.services.hdfs.add_nodes('DataNode', dn_hosts)
if snn:
client.services.hdfs.add_nodes('SecondaryNameNode', [snn_host])
if oozie_host:
client.services.oozie.add_nodes('Oozie', [oozie_host])
if hive_host:
client.services.hive.add_nodes('HiveServer', [hive_host])
client.services.mapred.add_nodes('JobTracker', [jt_host])
client.services.mapred.add_nodes('TaskTracker', tt_hosts)
def _configure_storage(client, cluster):
datanode_ng = u.get_node_groups(cluster, 'datanode')[0]
storage_paths = datanode_ng.storage_paths()
dn_hosts = [i.hostname() for i in u.get_datanodes(cluster)]
name_dir_param = ",".join(
[st_path + '/dfs/name' for st_path in storage_paths])
data_dir_param = ",".join(
[st_path + '/dfs/data' for st_path in storage_paths])
client.params.hdfs.update('dfs.name.dir', name_dir_param)
client.params.hdfs.update('dfs.data.dir', data_dir_param, nodes=dn_hosts)
def _configure_swift(client, cluster):
swift_enable = cluster.cluster_configs['general'].get('Enable Swift')
if swift_enable is None or swift_enable:
swift_configs = swift.get_swift_configs()
for conf in swift_configs:
client.params.hadoop.add(conf['name'], conf['value'])
def _add_user_params(client, cluster):
for p in six.iteritems(cluster.cluster_configs["Hadoop"]):
client.params.hadoop.update(p[0], p[1])
for p in six.iteritems(cluster.cluster_configs["HDFS"]):
client.params.hdfs.update(p[0], p[1])
for p in six.iteritems(cluster.cluster_configs["MapReduce"]):
client.params.mapred.update(p[0], p[1])
def install_cluster(cluster):
mng_instance = u.get_instance(cluster, 'manager')
mng_ip = mng_instance.management_ip
all_hosts = list(set([i.hostname() for i in u.get_instances(cluster)]))
client = c.IntelClient(mng_ip, cluster.name)
LOG.info("Create cluster")
client.cluster.create()
LOG.info("Add nodes to cluster")
rack = '/Default'
client.nodes.add(all_hosts, rack, 'hadoop',
'/home/hadoop/.ssh/id_rsa')
LOG.info("Install software")
client.cluster.install_software(all_hosts)
LOG.info("Configure services")
_configure_services(client, cluster)
LOG.info("Deploy cluster")
client.nodes.config(force=True)
LOG.info("Provisioning configs")
# cinder and ephemeral drive support
_configure_storage(client, cluster)
# swift support
_configure_swift(client, cluster)
# user configs
_add_user_params(client, cluster)
LOG.info("Format HDFS")
client.services.hdfs.format()
def start_cluster(cluster):
client = c.IntelClient(
u.get_instance(cluster, 'manager').management_ip, cluster.name)
LOG.debug("Starting hadoop services")
client.services.hdfs.start()
client.services.mapred.start()
if u.get_hiveserver(cluster):
client.services.hive.start()
if u.get_oozie(cluster):
client.services.oozie.start()
def scale_cluster(cluster, instances):
scale_ins_hosts = [i.hostname() for i in instances]
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
tt_hosts = [tt.hostname() for tt in u.get_tasktrackers(cluster)]
to_scale_dn = []
to_scale_tt = []
for i in scale_ins_hosts:
if i in dn_hosts:
to_scale_dn.append(i)
if i in tt_hosts:
to_scale_tt.append(i)
mng_ip = u.get_instance(cluster, 'manager').management_ip
client = c.IntelClient(mng_ip, cluster.name)
rack = '/Default'
client.nodes.add(scale_ins_hosts, rack, 'hadoop',
cluster.extra['manager_authzkeyfile_path'])
client.cluster.install_software(scale_ins_hosts)
if to_scale_tt:
client.services.mapred.add_nodes('TaskTracker', to_scale_tt)
if to_scale_dn:
client.services.hdfs.add_nodes('DataNode', to_scale_dn)
client.nodes.config()
if to_scale_dn:
client.services.hdfs.start()
if to_scale_tt:
client.services.mapred.start()
def decommission_nodes(cluster, instances):
dec_hosts = [i.hostname() for i in instances]
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
tt_hosts = [dn.hostname() for dn in u.get_tasktrackers(cluster)]
mng_ip = u.get_instances(cluster, 'manager')[0].management_ip
client = c.IntelClient(mng_ip, cluster.name)
dec_dn_hosts = []
for dec_host in dec_hosts:
if dec_host in dn_hosts:
dec_dn_hosts.append(dec_host)
if dec_dn_hosts:
client.services.hdfs.decommission_nodes(dec_dn_hosts)
#TODO(alazarev) make timeout configurable (bug #1262897)
timeout = 14400 # 4 hours
cur_time = 0
for host in dec_dn_hosts:
while cur_time < timeout:
if client.services.hdfs.get_datanode_status(
host) == 'Decomissioned':
break
context.sleep(5)
cur_time += 5
else:
LOG.warn("Failed to decomission node '%s' of cluster '%s' "
"in %s minutes" % (host, cluster.name, timeout/60))
client.nodes.stop(dec_hosts)
# wait stop services
#TODO(alazarev) make timeout configurable (bug #1262897)
timeout = 600 # 10 minutes
cur_time = 0
for instance in instances:
while cur_time < timeout:
stopped = True
if instance.hostname() in dn_hosts:
code, out = instance.remote().execute_command(
'sudo /sbin/service hadoop-datanode status',
raise_when_error=False)
if out.strip() != 'datanode is stopped':
stopped = False
if out.strip() == 'datanode dead but pid file exists':
instance.remote().execute_command(
'sudo rm -f '
'/var/run/hadoop/hadoop-hadoop-datanode.pid')
if instance.hostname() in tt_hosts:
code, out = instance.remote().execute_command(
'sudo /sbin/service hadoop-tasktracker status',
raise_when_error=False)
if out.strip() != 'tasktracker is stopped':
stopped = False
if stopped:
break
else:
context.sleep(5)
cur_time += 5
else:
LOG.warn("Failed to stop services on node '%s' of cluster '%s' "
"in %s minutes" % (instance, cluster.name, timeout/60))
for node in dec_hosts:
LOG.info("Deleting node '%s' on cluster '%s'" % (node, cluster.name))
client.nodes.delete(node)

View File

@ -0,0 +1,173 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import conductor
from savanna import context
from savanna.openstack.common import log as logging
from savanna.plugins.general import exceptions as ex
from savanna.plugins.general import utils as u
from savanna.plugins.intel import config_helper as c_helper
from savanna.plugins.intel import exceptions as i_ex
from savanna.plugins.intel import installer as ins
from savanna.plugins import provisioning as p
conductor = conductor.API
LOG = logging.getLogger(__name__)
class IDHProvider(p.ProvisioningPluginBase):
def __init__(self):
self.processes = {
"Manager": ["manager"],
"HDFS": ["namenode", "datanode", "secondarynamenode"],
"MapReduce": ["jobtracker", "tasktracker"],
"Hadoop": [] # for hadoop parameters in UI
}
def get_description(self):
return \
'The IDH OpenStack plugin works with project ' \
'Savanna to automate the deployment of the Intel Distribution ' \
'of Apache Hadoop on OpenStack based ' \
'public & private clouds'
def get_node_processes(self, hadoop_version):
return self.processes
def get_versions(self):
return ['2.5.1']
def get_title(self):
return "Intel(R) Distribution for Apache Hadoop* Software"
def get_configs(self, hadoop_version):
return c_helper.get_plugin_configs()
def get_hive_config_path(self):
return '/etc/hive/conf/hive-site.xml'
def configure_cluster(self, cluster):
LOG.info("Configure IDH cluster")
cluster = ins.create_hadoop_ssh_keys(cluster)
ins.configure_os(cluster)
ins.install_manager(cluster)
ins.install_cluster(cluster)
def start_cluster(self, cluster):
LOG.info("Start IDH cluster")
ins.start_cluster(cluster)
self._set_cluster_info(cluster)
def validate(self, cluster):
nn_count = sum([ng.count for ng
in u.get_node_groups(cluster, 'namenode')])
if nn_count != 1:
raise ex.NotSingleNameNodeException(nn_count)
jt_count = sum([ng.count for ng
in u.get_node_groups(cluster, 'jobtracker')])
if jt_count > 1:
raise ex.NotSingleJobTrackerException(jt_count)
tt_count = sum([ng.count for ng
in u.get_node_groups(cluster, 'tasktracker')])
if jt_count == 0 and tt_count > 0:
raise ex.TaskTrackersWithoutJobTracker()
mng_count = sum([ng.count for ng
in u.get_node_groups(cluster, 'manager')])
if mng_count != 1:
raise i_ex.NotSingleManagerException(mng_count)
def scale_cluster(self, cluster, instances):
ins.configure_os_from_instances(cluster, instances)
ins.scale_cluster(cluster, instances)
def decommission_nodes(self, cluster, instances):
ins.decommission_nodes(cluster, instances)
def validate_scaling(self, cluster, existing, additional):
self._validate_additional_ng_scaling(cluster, additional)
self._validate_existing_ng_scaling(cluster, existing)
def _get_scalable_processes(self):
return ["datanode", "tasktracker"]
def _get_by_id(self, lst, id):
for obj in lst:
if obj.id == id:
return obj
def _validate_additional_ng_scaling(self, cluster, additional):
jt = u.get_jobtracker(cluster)
scalable_processes = self._get_scalable_processes()
for ng_id in additional:
ng = self._get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Intel plugin cannot scale nodegroup"
" with processes: " +
' '.join(ng.node_processes))
if not jt and 'tasktracker' in ng.node_processes:
raise ex.NodeGroupCannotBeScaled(
ng.name, "Intel plugin cannot scale node group with "
"processes which have no master-processes run "
"in cluster")
def _validate_existing_ng_scaling(self, cluster, existing):
scalable_processes = self._get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.id in existing:
if ng.count > existing[ng.id] and "datanode" in \
ng.node_processes:
dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Intel plugin cannot scale nodegroup"
" with processes: " +
' '.join(ng.node_processes))
def _set_cluster_info(self, cluster):
mng = u.get_instances(cluster, 'manager')[0]
nn = u.get_namenode(cluster)
jt = u.get_jobtracker(cluster)
oozie = u.get_oozie(cluster)
#TODO(alazarev) make port configurable (bug #1262895)
info = {'IDH Manager': {
'Web UI': 'https://%s:9443' % mng.management_ip
}}
if jt:
#TODO(alazarev) make port configurable (bug #1262895)
info['MapReduce'] = {
'Web UI': 'http://%s:50030' % jt.management_ip
}
if nn:
#TODO(alazarev) make port configurable (bug #1262895)
info['HDFS'] = {
'Web UI': 'http://%s:50070' % nn.management_ip
}
if oozie:
#TODO(alazarev) make port configurable (bug #1262895)
info['JobFlow'] = {
'Oozie': 'http://%s:11000' % oozie.management_ip
}
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info})

View File

@ -0,0 +1,103 @@
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:complexType name="display">
<xs:sequence>
<xs:element name="en">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:minLength value="1" />
</xs:restriction>
</xs:simpleType>
</xs:element>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="valueType">
<xs:restriction base="xs:string">
<xs:enumeration value="Boolean" />
<xs:enumeration value="Integer" />
<xs:enumeration value="Float" />
<xs:enumeration value="IP" />
<xs:enumeration value="Port" />
<xs:enumeration value="IPWithPort" />
<xs:enumeration value="IPWithMask" />
<xs:enumeration value="URL" />
<xs:enumeration value="String" />
<xs:enumeration value="MepRedCapacity" />
<xs:enumeration value="HBaseClientScannerCaching" />
<xs:enumeration value="Class" />
<xs:enumeration value="Choose" />
<xs:enumeration value="Directory" />
<xs:enumeration value="Int_range" />
</xs:restriction>
</xs:simpleType>
<xs:element name="configuration">
<xs:complexType mixed="true">
<xs:sequence>
<xs:element name="property" maxOccurs="unbounded">
<xs:complexType>
<xs:all>
<xs:element name="name" type="xs:string" />
<xs:element name="value" type="xs:string" />
<xs:element name="intel_default" type="xs:string" minOccurs="0" />
<xs:element name="recommendation" type="xs:string" minOccurs="0" />
<xs:element name="valuetype" type="valueType" />
<xs:element name="group" type="xs:string" />
<xs:element name="definition" type="display" />
<xs:element name="description" type="display" />
<xs:element name="global" type="xs:boolean" minOccurs="0" />
<xs:element name="allowempty" type="xs:boolean" minOccurs="0" />
<xs:element name="readonly" type="xs:boolean" minOccurs="0" />
<xs:element name="hide" type="xs:boolean" minOccurs="0" />
<xs:element name="automatic" type="xs:boolean" minOccurs="0" />
<xs:element name="enable" type="xs:boolean" minOccurs="0" />
<xs:element name="reserved" type="xs:boolean" minOccurs="0" />
<xs:element name="radios" type="xs:string" minOccurs="0" />
<xs:element name="script" type="xs:string" minOccurs="0" />
<xs:element name="type" type="xs:string" minOccurs="0" />
<xs:element name="form" type="xs:string" minOccurs="0" />
<xs:element name="chooselist" type="xs:string" minOccurs="0" />
<xs:element name="implementation" type="xs:string" minOccurs="0" />
<xs:element name="sectionname" type="xs:string" minOccurs="0" />
<xs:element name="refor" minOccurs="0">
<xs:complexType>
<xs:all>
<xs:element name="refand">
<xs:complexType>
<xs:all>
<xs:element name="value" type="xs:string" />
<xs:element name="valuetype" type="valueType" />
<xs:element name="index" type="xs:string" />
</xs:all>
</xs:complexType>
</xs:element>
</xs:all>
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="skipInDoc" type="xs:boolean" />
</xs:complexType>
</xs:element>
<xs:element name="briefsection" minOccurs="0" maxOccurs="unbounded">
<xs:complexType>
<xs:all>
<xs:element name="sectionname" type="xs:string" />
<xs:element name="name_en" type="xs:string" />
<xs:element name="description_en" type="xs:string" minOccurs="0" />
<xs:element name="autoexpand" type="xs:boolean" />
<xs:element name="showdescription" type="xs:boolean" />
</xs:all>
</xs:complexType>
</xs:element>
<xs:element name="group" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:all>
<xs:element name="id" type="xs:string" />
<xs:element name="name_en" type="xs:string" />
<xs:element name="description_en" type="xs:string" />
</xs:all>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
class Response:
def __init__(self, data=None, ok=True, status_code=200):
self.text = json.dumps(data)
self.ok = ok
self.status_code = status_code
self.reason = None
def make_resp(data=None, ok=True, status_code=200):
return Response(data, ok, status_code)

View File

@ -0,0 +1,310 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from savanna import exceptions as ex
from savanna.plugins.intel.client import client as c
from savanna.plugins.intel import exceptions as iex
from savanna.tests.unit import base
from savanna.tests.unit.plugins.intel.client import response as r
SESSION_POST_DATA = {'sessionID': '123'}
SESSION_GET_DATA = {"items": [
{
"nodeprogress": {
"hostname": 'host',
'info': '_ALLFINISH\n'
}
}
]}
class TestClient(base.DbTestCase):
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_cluster_op(self, get, post):
client = c.IntelClient('qwe', 'rty')
data = {'lelik': 'bolik'}
post.return_value = r.make_resp(data)
self.assertEqual(client.cluster.create(), data)
get.return_value = r.make_resp(data)
self.assertEqual(client.cluster.get(), data)
post.return_value = r.make_resp(SESSION_POST_DATA)
get.return_value = r.make_resp(SESSION_GET_DATA)
client.cluster.install_software(['bla-bla'])
self.assertEqual(post.call_count, 2)
self.assertEqual(get.call_count, 2)
@mock.patch('requests.delete')
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_nodes_op(self, get, post, delete):
client = c.IntelClient('qwe', 'rty')
# add
post.return_value = r.make_resp(data={
"items": [
{
"iporhostname": "n1",
"info": "Connected"
},
{
"iporhostname": "n2",
"info": "Connected"
}
]
})
client.nodes.add(['n1', 'n2'], 'hadoop', '/Def', '/tmp/key')
post.return_value = r.make_resp(data={
"items": [
{
"iporhostname": "n1",
"info": "bla-bla"
}
]
})
self.assertRaises(iex.IntelPluginException, client.nodes.add,
['n1'], 'hadoop', '/Def', '/tmp/key')
# config
post.return_value = r.make_resp(SESSION_POST_DATA)
get.return_value = r.make_resp(SESSION_GET_DATA)
client.nodes.config()
# delete
delete.return_value = r.make_resp()
client.nodes.delete(['n1'])
# get
get.return_value = r.make_resp()
client.nodes.get()
# get_status
get.return_value = r.make_resp(data={"status": "running"})
client.nodes.get_status(['n1'])
# stop_nodes
post.return_value = r.make_resp()
client.nodes.stop(['n1'])
self.assertEqual(delete.call_count, 1)
self.assertEqual(post.call_count, 4)
self.assertEqual(get.call_count, 3)
@mock.patch('requests.put')
@mock.patch('requests.post')
def test_params_op(self, post, put):
client = c.IntelClient('qwe', 'rty')
post.return_value = r.make_resp()
put.return_value = r.make_resp()
# add
client.params.hdfs.add('lelik', 'bolik')
client.params.hadoop.add('lelik', 'bolik')
client.params.mapred.add('lelik', 'bolik')
# get
self.assertRaises(ex.NotImplementedException, client.params.hdfs.get,
['n1'], 'lelik')
self.assertRaises(ex.NotImplementedException, client.params.hadoop.get,
['n1'], 'lelik')
self.assertRaises(ex.NotImplementedException, client.params.mapred.get,
['n1'], 'lelik')
# update
client.params.hdfs.update('lelik', 'bolik', nodes=['n1'])
client.params.hdfs.update('lelik', 'bolik')
client.params.hadoop.update('lelik', 'bolik', nodes=['n1'])
client.params.hadoop.update('lelik', 'bolik')
client.params.mapred.update('lelik', 'bolik', nodes=['n1'])
client.params.mapred.update('lelik', 'bolik')
self.assertEqual(post.call_count, 3)
self.assertEqual(put.call_count, 6)
@mock.patch('savanna.context.sleep', lambda x: None)
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_base_services_op(self, get, post):
client = c.IntelClient('qwe', 'rty')
# start
post.return_value = r.make_resp()
get.return_value = r.make_resp(data={
"items": [
{
"serviceName": "hdfs",
"status": "running"
},
{
"serviceName": "mapred",
"status": "running"
}
]})
client.services.hdfs.start()
client.services.mapred.start()
get.return_value = r.make_resp(data={
"items": [
{
"serviceName": "hdfs",
"status": "stopped"
},
{
"serviceName": "mapred",
"status": "stopped"
}
]
})
self.assertRaises(iex.IntelPluginException,
client.services.hdfs.start)
self.assertRaises(iex.IntelPluginException,
client.services.mapred.start)
# stop
post.return_value = r.make_resp()
client.services.hdfs.stop()
client.services.mapred.stop()
# service
get.return_value = r.make_resp(data={
"items": [
{
"serviceName": "bla-bla",
"status": "fail"
}
]
})
self.assertRaises(iex.IntelPluginException,
client.services.hdfs.status)
self.assertRaises(iex.IntelPluginException,
client.services.mapred.status)
# get_nodes
get.return_value = r.make_resp()
client.services.hdfs.get_nodes()
client.services.mapred.get_nodes()
# add_nodes
post.return_value = r.make_resp()
client.services.hdfs.add_nodes('DataNode', ['n1', 'n2'])
client.services.mapred.add_nodes('NameNode', ['n1', 'n2'])
self.assertEqual(get.call_count, 126)
self.assertEqual(post.call_count, 8)
@mock.patch('requests.delete')
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_services_op(self, get, post, delete):
client = c.IntelClient('qwe', 'rty')
# add
post.return_value = r.make_resp()
client.services.add(['hdfs', 'mapred'])
# get_services
get.return_value = r.make_resp()
client.services.get_services()
# delete_service
delete.return_value = r.make_resp()
client.services.delete_service('hdfs')
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_hdfs_services_op(self, get, post):
client = c.IntelClient('qwe', 'rty')
# format
get.return_value = r.make_resp(SESSION_GET_DATA)
post.return_value = r.make_resp(SESSION_POST_DATA)
client.services.hdfs.format()
# decommission
post.return_value = r.make_resp()
client.services.hdfs.decommission_nodes(['n1'])
# get status
get.return_value = r.make_resp(data={
"items": [
{
"hostname": "n1",
"status": "start"
}
]
})
client.services.hdfs.get_datanodes_status()
self.assertEqual(client.services.hdfs.get_datanode_status('n1'),
'start')
self.assertRaises(iex.IntelPluginException,
client.services.hdfs.get_datanode_status, 'n2')
self.assertEqual(get.call_count, 4)
self.assertEqual(post.call_count, 2)
@mock.patch('savanna.context.sleep', lambda x: None)
@mock.patch('requests.post')
@mock.patch('requests.get')
def test_session_op(self, get, post):
client = c.IntelClient('qwe', 'rty')
data1 = {
"items": [
{
"nodeprogress": {
"hostname": 'host',
'info': 'info\n'
}
}
]
}
data2 = {
"items": [
{
"nodeprogress": {
"hostname": 'host',
'info': '_ALLFINISH\n'
}
}
]
}
get.side_effect = (r.make_resp(data1), r.make_resp(data2))
post.return_value = r.make_resp(SESSION_POST_DATA)
client.services.hdfs.format()
self.assertEqual(get.call_count, 2)
self.assertEqual(post.call_count, 1)
@mock.patch('requests.get')
def test_rest_client(self, get):
client = c.IntelClient('qwe', 'rty')
get.return_value = r.make_resp(ok=False, status_code=500, data={
"message": "message"
})
self.assertRaises(iex.IntelPluginException,
client.services.get_services)

View File

@ -0,0 +1,61 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.general import exceptions as g_ex
from savanna.plugins.intel import config_helper as c_helper
from savanna.plugins.intel import exceptions as i_ex
from savanna.plugins.intel import plugin as p
from savanna.tests.unit import base
from savanna.tests.unit.plugins.intel import test_utils as tu
class TestIDHPlugin(base.DbTestCase):
def test_get_configs(self):
plugin = p.IDHProvider()
configs = plugin.get_configs('2.5.0')
self.assertIn(c_helper.IDH_REPO_URL, configs)
self.assertIn(c_helper.IDH_TARBALL_URL, configs)
self.assertIn(c_helper.OS_REPO_URL, configs)
def test_validate(self):
plugin = p.IDHProvider()
ng_mng = tu.make_ng_dict('mng', 'f1', ['manager'], 1)
ng_nn = tu.make_ng_dict('nn', 'f1', ['namenode'], 1)
ng_jt = tu.make_ng_dict('jt', 'f1', ['jobtracker'], 1)
ng_dn = tu.make_ng_dict('dn', 'f1', ['datanode'], 2)
ng_tt = tu.make_ng_dict('tt', 'f1', ['tasktracker'], 2)
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
[ng_nn] + [ng_dn])
self.assertRaises(i_ex.NotSingleManagerException, plugin.validate, cl)
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0', [ng_mng])
self.assertRaises(g_ex.NotSingleNameNodeException, plugin.validate, cl)
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
[ng_mng] + [ng_nn] * 2)
self.assertRaises(g_ex.NotSingleNameNodeException, plugin.validate, cl)
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
[ng_mng] + [ng_nn] + [ng_tt])
self.assertRaises(g_ex.TaskTrackersWithoutJobTracker,
plugin.validate, cl)
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
[ng_mng] + [ng_nn] + [ng_jt] * 2 + [ng_tt])
self.assertRaises(g_ex.NotSingleJobTrackerException,
plugin.validate, cl)

View File

@ -0,0 +1,32 @@
# Copyright (c) 2013 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.conductor import resource as r
def create_cluster(name, tenant, plugin, version, node_groups, **kwargs):
dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin,
'hadoop_version': version, 'node_groups': node_groups}
dct.update(kwargs)
return r.ClusterResource(dct)
def make_ng_dict(name, flavor, processes, count, instances=[]):
return {'name': name, 'flavor_id': flavor, 'node_processes': processes,
'count': count, 'instances': instances}
def make_inst_dict(inst_id, inst_name):
return {'instance_id': inst_id, 'instance_name': inst_name}

View File

@ -0,0 +1,43 @@
<?xml version="1.0"?>
<configuration>
<!--Common hadoop property-->
<property>
<name>name1</name>
<value>value1</value>
<valuetype>String</valuetype>
<description>
<en>descr1</en>
</description>
</property>
<!--Common hadoop property without 'valuetype' tag-->
<property>
<name>name2</name>
<value>value2</value>
<description>
<en>descr2</en>
</description>
</property>
<!--Common hadoop property 3 without text in 'value' tag-->
<property>
<name>name3</name>
<value></value>
<valuetype>String</valuetype>
<description>
<en>descr3</en>
</description>
</property>
<!--Common hadoop property 4 without 'value' tag-->
<property>
<name>name4</name>
<valuetype>String</valuetype>
<description>
<en>descr4</en>
</description>
</property>
<!--Common hadoop property 5 without description-->
<property>
<name>name5</name>
<value>value5</value>
<valuetype>String</valuetype>
</property>
</configuration>

View File

@ -36,6 +36,24 @@ class XMLUtilsTestCase(unittest2.TestCase):
x.load_hadoop_xml_defaults(
'tests/unit/resources/test-default.xml'))
def test_load_xml_defaults_with_type_and_locale(self):
expected = [
{'name': u'name1', 'value': u'value1', 'type': u'String',
'description': 'descr1'},
{'name': u'name2', 'value': u'value2', 'type': u'',
'description': 'descr2'},
{'name': u'name3', 'value': '', 'type': u'String',
'description': 'descr3'},
{'name': u'name4', 'value': '', 'type': u'String',
'description': 'descr4'},
{'name': u'name5', 'value': u'value5', 'type': u'String',
'description': ''}]
actual = x.load_hadoop_xml_defaults_with_type_and_locale(
'tests/unit/resources/test-default-with-type-and-locale.xml')
self.assertListEqual(
expected,
actual)
def test_adjust_description(self):
self.assertEqual(x._adjust_field("\n"), "")
self.assertEqual(x._adjust_field("\n "), "")

View File

@ -36,6 +36,27 @@ def load_hadoop_xml_defaults(file_name):
return configs
def load_hadoop_xml_defaults_with_type_and_locale(file_name):
doc = load_xml_document(file_name)
configs = []
prop = doc.getElementsByTagName('property')
for elements in prop:
configs.append({
'name': _get_text_from_node(elements, 'name'),
'value': _get_text_from_node(elements, 'value'),
'type': _get_text_from_node(elements, 'valuetype'),
'description': _adjust_field(
_get_text_from_node(
_get_node_element(elements, 'description'), 'en'))
})
return configs
def _get_node_element(element, name):
element = element.getElementsByTagName(name)
return element[0] if element and element[0].hasChildNodes() else None
def create_hadoop_xml(configs, config_filter=None):
doc = xml.Document()
@ -74,7 +95,7 @@ def load_xml_document(file_name, strip=False):
def _get_text_from_node(element, name):
element = element.getElementsByTagName(name)
element = element.getElementsByTagName(name) if element else None
return element[0].firstChild.nodeValue if (
element and element[0].hasChildNodes()) else ''

View File

@ -36,6 +36,7 @@ console_scripts =
savanna.cluster.plugins =
vanilla = savanna.plugins.vanilla.plugin:VanillaProvider
hdp = savanna.plugins.hdp.ambariplugin:AmbariPlugin
idh = savanna.plugins.intel.plugin:IDHProvider
savanna.infrastructure.engine =
savanna = savanna.service.instances:SavannaInfrastructureEngine