Adding IDH plugin basic implementation
Functionality: * install and provision Intel Hadoop Manager and Intel Hadoop * configure Intel Hadoop to use Cinder volumes as HDFS backend * manual cluster scaling support * Swift integration * cluster configs - the same configs as in Vanilla Hadoop * configs validation - the same as in Vanilla plugin This is initial version. It doesn't cover bugs filed here: https://bugs.launchpad.net/savanna/+bugs?field.searchtext=%7BIDH%5D Co-Authored-By: Andrew Lazarev <alazarev@mirantis.com> Implements blueprint idh-savanna-plugin Change-Id: I70363869b61d73222d247b566d499c2fd6fb9201
This commit is contained in:
parent
ffa575bfcc
commit
e0ee408149
@ -9,6 +9,7 @@ include savanna/db/migration/alembic_migrations/versions/README
|
||||
|
||||
recursive-include savanna/locale *
|
||||
|
||||
include savanna/plugins/intel/resources/*.xml
|
||||
include savanna/plugins/vanilla/resources/*.xml
|
||||
include savanna/plugins/vanilla/resources/*.sh
|
||||
include savanna/plugins/vanilla/resources/*.sql
|
||||
|
@ -182,3 +182,10 @@ class ThreadException(SavannaException):
|
||||
self.message = "An error occurred in thread '%s': %s" % (
|
||||
thread_description, str(e))
|
||||
self.code = "THREAD_EXCEPTION"
|
||||
|
||||
|
||||
class NotImplementedException(SavannaException):
|
||||
code = "NOT_IMPLEMENTED"
|
||||
|
||||
def __init__(self, feature):
|
||||
self.message = "Feature '%s' is not implemented" % feature
|
||||
|
0
savanna/plugins/intel/__init__.py
Normal file
0
savanna/plugins/intel/__init__.py
Normal file
0
savanna/plugins/intel/client/__init__.py
Normal file
0
savanna/plugins/intel/client/__init__.py
Normal file
33
savanna/plugins/intel/client/client.py
Normal file
33
savanna/plugins/intel/client/client.py
Normal file
@ -0,0 +1,33 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.plugins.intel.client import cluster
|
||||
from savanna.plugins.intel.client import nodes
|
||||
from savanna.plugins.intel.client import params
|
||||
from savanna.plugins.intel.client import rest as r
|
||||
from savanna.plugins.intel.client import services
|
||||
|
||||
|
||||
class IntelClient():
|
||||
def __init__(self, manager_ip, cluster_name):
|
||||
#TODO(alazarev) make credentials configurable (bug #1262881)
|
||||
self.rest = r.RESTClient(manager_ip, 'admin', 'admin')
|
||||
self.cluster_name = cluster_name
|
||||
self._ctx = self
|
||||
|
||||
self.cluster = cluster.Cluster(self)
|
||||
self.nodes = nodes.Nodes(self)
|
||||
self.params = params.Params(self)
|
||||
self.services = services.Services(self)
|
44
savanna/plugins/intel/client/cluster.py
Normal file
44
savanna/plugins/intel/client/cluster.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.plugins.intel.client import context as c
|
||||
from savanna.plugins.intel.client import session
|
||||
|
||||
|
||||
class Cluster(c.IntelContext):
|
||||
def create(self):
|
||||
url = '/cluster'
|
||||
data = {
|
||||
'name': self.cluster_name,
|
||||
'dnsresolution': True,
|
||||
'acceptlicense': True
|
||||
}
|
||||
|
||||
return self.rest.post(url, data)
|
||||
|
||||
def get(self):
|
||||
url = '/cluster/%s' % self.cluster_name
|
||||
return self.rest.get(url)
|
||||
|
||||
def install_software(self, nodes):
|
||||
_nodes = [{'hostname': host} for host in nodes]
|
||||
url = '/cluster/%s/nodes/commands/installsoftware' % self.cluster_name
|
||||
session_id = self.rest.post(url, _nodes)['sessionID']
|
||||
return session.wait(self, session_id)
|
||||
|
||||
def upload_authzkeyfile(self, authzkeyfile):
|
||||
url = '/cluster/%s/upload/authzkey' % self.cluster_name
|
||||
return self.rest.post(url,
|
||||
files={'file': authzkeyfile})['upload result']
|
21
savanna/plugins/intel/client/context.py
Normal file
21
savanna/plugins/intel/client/context.py
Normal file
@ -0,0 +1,21 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class IntelContext(object):
|
||||
def __init__(self, ctx):
|
||||
self._ctx = ctx._ctx
|
||||
self.cluster_name = ctx.cluster_name
|
||||
self.rest = ctx.rest
|
65
savanna/plugins/intel/client/nodes.py
Normal file
65
savanna/plugins/intel/client/nodes.py
Normal file
@ -0,0 +1,65 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.plugins.intel.client import context as c
|
||||
from savanna.plugins.intel.client import session
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
|
||||
|
||||
class Nodes(c.IntelContext):
|
||||
def add(self, nodes, rack, username, path_to_key, keypass=''):
|
||||
hosts = {
|
||||
'method': 'useauthzkeyfile',
|
||||
'nodeinfo': map(lambda host: {
|
||||
'hostname': host,
|
||||
'username': username,
|
||||
'passphrase': keypass,
|
||||
'authzkeyfile': path_to_key,
|
||||
'rackName': rack
|
||||
}, nodes)
|
||||
}
|
||||
|
||||
url = '/cluster/%s/nodes' % self.cluster_name
|
||||
resp = self.rest.post(url, hosts)['items']
|
||||
|
||||
for node_info in resp:
|
||||
if node_info['info'] != 'Connected':
|
||||
raise iex.IntelPluginException(
|
||||
'Error adding nodes: %s' % node_info['iporhostname'])
|
||||
|
||||
def get(self):
|
||||
url = '/cluster/%s/nodes' % self.cluster_name
|
||||
return self.rest.get(url)
|
||||
|
||||
def get_status(self, node):
|
||||
url = '/cluster/%s/nodes/%s' % (self.cluster_name, node)
|
||||
return self.rest.get(url)['status']
|
||||
|
||||
def delete(self, node):
|
||||
url = '/cluster/%s/nodes/%s' % (self.cluster_name, node)
|
||||
return self.rest.delete(url)
|
||||
|
||||
def config(self, force=False):
|
||||
url = ('/cluster/%s/nodes/commands/confignodes/%s'
|
||||
% (self.cluster_name, 'force' if force else 'noforce'))
|
||||
|
||||
session_id = self.rest.post(url)['sessionID']
|
||||
return session.wait(self, session_id)
|
||||
|
||||
def stop(self, nodes):
|
||||
url = '/cluster/%s/nodes/commands/stopnodes' % self.cluster_name
|
||||
data = [{'hostname': host} for host in nodes]
|
||||
|
||||
return self.rest.post(url, data)
|
76
savanna/plugins/intel/client/params.py
Normal file
76
savanna/plugins/intel/client/params.py
Normal file
@ -0,0 +1,76 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna import exceptions
|
||||
from savanna.plugins.intel.client import context as c
|
||||
|
||||
|
||||
class BaseParams(c.IntelContext):
|
||||
def __init__(self, ctx, service):
|
||||
super(BaseParams, self).__init__(ctx)
|
||||
self.service = service
|
||||
|
||||
def add(self, item, value, desc=''):
|
||||
data = {
|
||||
'editdesc': desc,
|
||||
'items': [
|
||||
{
|
||||
'type': self.service,
|
||||
'item': item,
|
||||
'value': value,
|
||||
'desc': desc
|
||||
}
|
||||
]
|
||||
}
|
||||
url = ('/cluster/%s/configuration/%s'
|
||||
% (self.cluster_name, self.service))
|
||||
return self.rest.post(url, data)
|
||||
|
||||
def update(self, item, value, desc='', nodes=None):
|
||||
data = {
|
||||
'editdesc': desc,
|
||||
'items': [
|
||||
{
|
||||
'type': self.service,
|
||||
'item': item,
|
||||
'value': value
|
||||
}
|
||||
]
|
||||
}
|
||||
if nodes:
|
||||
data = {
|
||||
'editdesc': desc,
|
||||
'items': map(lambda node: {
|
||||
'type': self.service,
|
||||
'item': item,
|
||||
'value': value,
|
||||
'hostname': node
|
||||
}, nodes)
|
||||
}
|
||||
|
||||
url = ('/cluster/%s/configuration/%s'
|
||||
% (self.cluster_name, self.service))
|
||||
return self.rest.put(url, data)
|
||||
|
||||
def get(self, hosts, item):
|
||||
raise exceptions.NotImplementedException("BaseParams.get")
|
||||
|
||||
|
||||
class Params(c.IntelContext):
|
||||
def __init__(self, ctx):
|
||||
super(Params, self).__init__(ctx)
|
||||
self.hadoop = BaseParams(self, 'hadoop')
|
||||
self.hdfs = BaseParams(self, 'hdfs')
|
||||
self.mapred = BaseParams(self, 'mapred')
|
78
savanna/plugins/intel/client/rest.py
Normal file
78
savanna/plugins/intel/client/rest.py
Normal file
@ -0,0 +1,78 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import requests
|
||||
from requests import auth
|
||||
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RESTClient():
|
||||
def __init__(self, manager_ip, auth_username, auth_password):
|
||||
#TODO(alazarev) make port configurable (bug #1262895)
|
||||
self.base_url = ('https://%s:9443/restapi/intelcloud/api/v1'
|
||||
% manager_ip)
|
||||
LOG.debug("Connecting to manager with URL of %s", self.base_url)
|
||||
|
||||
self.auth = auth.HTTPBasicAuth(auth_username, auth_password)
|
||||
|
||||
def get(self, url):
|
||||
url = self.base_url + url
|
||||
LOG.debug("Sending GET to URL of %s", url)
|
||||
r = requests.get(url, verify=False, auth=self.auth)
|
||||
return self._check_response(r)
|
||||
|
||||
def post(self, url, data=None, files=None):
|
||||
url = self.base_url + url
|
||||
LOG.debug("Sending POST to URL '%s' (%s files): %s", url,
|
||||
len(files) if files else 0,
|
||||
data if data else 'no data')
|
||||
r = requests.post(url, data=json.dumps(data) if data else None,
|
||||
verify=False, auth=self.auth, files=files)
|
||||
return self._check_response(r)
|
||||
|
||||
def delete(self, url):
|
||||
url = self.base_url + url
|
||||
LOG.debug("Sending DELETE to URL of %s", url)
|
||||
r = requests.delete(url, verify=False, auth=self.auth)
|
||||
return self._check_response(r)
|
||||
|
||||
def put(self, url, data=None):
|
||||
url = self.base_url + url
|
||||
if data:
|
||||
LOG.debug("Sending PUT to URL of %s: %s", url, data)
|
||||
r = requests.put(url, data=json.dumps(data), verify=False,
|
||||
auth=self.auth)
|
||||
else:
|
||||
LOG.debug("Sending PUT to URL of %s with no data", url)
|
||||
r = requests.put(url, verify=False, auth=self.auth)
|
||||
|
||||
return self._check_response(r)
|
||||
|
||||
def _check_response(self, resp):
|
||||
LOG.debug("Response with HTTP code %s, and content of %s",
|
||||
resp.status_code, resp.text)
|
||||
if not resp.ok:
|
||||
raise iex.IntelPluginException(
|
||||
"Request to manager returned with code '%s', reason '%s' "
|
||||
"and message '%s'" % (resp.status_code, resp.reason,
|
||||
json.loads(resp.text)['message']))
|
||||
else:
|
||||
return json.loads(resp.text)
|
137
savanna/plugins/intel/client/services.py
Normal file
137
savanna/plugins/intel/client/services.py
Normal file
@ -0,0 +1,137 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna import context
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.intel.client import context as c
|
||||
from savanna.plugins.intel.client import session
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseService(c.IntelContext):
|
||||
def __init__(self, ctx, service_name):
|
||||
super(BaseService, self).__init__(ctx)
|
||||
self.service = service_name
|
||||
|
||||
def start(self):
|
||||
url = ('/cluster/%s/services/%s/commands/start'
|
||||
% (self.cluster_name, self.service))
|
||||
|
||||
self.rest.post(url)
|
||||
|
||||
timeout = 120
|
||||
cur_time = 0
|
||||
while cur_time < timeout:
|
||||
context.sleep(2)
|
||||
if self.status() == 'running':
|
||||
break
|
||||
else:
|
||||
cur_time += 2
|
||||
else:
|
||||
raise iex.IntelPluginException(
|
||||
"Service '%s' has failed to start in %s seconds"
|
||||
% (self.service, timeout))
|
||||
|
||||
def stop(self):
|
||||
url = ('/cluster/%s/services/%s/commands/stop'
|
||||
% (self.cluster_name, self.service))
|
||||
|
||||
return self.rest.post(url)
|
||||
|
||||
def status(self):
|
||||
url = '/cluster/%s/services' % self.cluster_name
|
||||
statuses = self.rest.get(url)['items']
|
||||
for st in statuses:
|
||||
if st['serviceName'] == self.service:
|
||||
return st['status']
|
||||
|
||||
raise iex.IntelPluginException(
|
||||
"Service '%s' is not installed on cluster '%s'"
|
||||
% (self.service, self.cluster_name))
|
||||
|
||||
def get_nodes(self):
|
||||
url = '/cluster/%s/services/%s' % (self.cluster_name, self.service)
|
||||
return self.rest.get(url)
|
||||
|
||||
def add_nodes(self, role, nodes):
|
||||
url = ('/cluster/%s/services/%s/roles'
|
||||
% (self.cluster_name, self.service))
|
||||
|
||||
data = map(lambda host: {
|
||||
'rolename': role,
|
||||
'hostname': host
|
||||
}, nodes)
|
||||
|
||||
return self.rest.post(url, data)
|
||||
|
||||
|
||||
class HDFSService(BaseService):
|
||||
def format(self, force=False):
|
||||
url = ('/cluster/%s/services/hdfs/commands/hdfsformat/%s'
|
||||
% (self.cluster_name, 'force' if force else 'noforce'))
|
||||
|
||||
session_id = self.rest.post(url)['sessionID']
|
||||
return session.wait(self, session_id)
|
||||
|
||||
def decommission_nodes(self, nodes, force=False):
|
||||
url = ('/cluster/%s/nodes/commands/decommissionnodes/%s'
|
||||
% (self.cluster_name, 'force' if force else 'noforce'))
|
||||
data = map(lambda host: {
|
||||
'hostname': host
|
||||
}, nodes)
|
||||
|
||||
return self.rest.post(url, data)
|
||||
|
||||
def get_datanodes_status(self):
|
||||
url = '/cluster/%s/nodes/commands/datanodes/status' % self.cluster_name
|
||||
return self.rest.get(url)['items']
|
||||
|
||||
def get_datanode_status(self, datanode):
|
||||
stats = self.get_datanodes_status()
|
||||
for stat in stats:
|
||||
if stat['hostname'] == datanode:
|
||||
return stat['status'].strip()
|
||||
|
||||
raise iex.IntelPluginException(
|
||||
"Datanode service is is not installed on node '%s'" % datanode)
|
||||
|
||||
|
||||
class Services(c.IntelContext):
|
||||
def __init__(self, ctx):
|
||||
super(Services, self).__init__(ctx)
|
||||
self.hdfs = HDFSService(self, 'hdfs')
|
||||
self.mapred = BaseService(self, 'mapred')
|
||||
self.hive = BaseService(self, 'hive')
|
||||
self.oozie = BaseService(self, 'oozie')
|
||||
|
||||
def add(self, services):
|
||||
_services = map(lambda service: {
|
||||
'serviceName': service,
|
||||
'type': service
|
||||
}, services)
|
||||
url = '/cluster/%s/services' % self.cluster_name
|
||||
|
||||
return self.rest.post(url, _services)
|
||||
|
||||
def get_services(self):
|
||||
url = '/cluster/%s/services' % self.cluster_name
|
||||
|
||||
return self.rest.get(url)
|
||||
|
||||
def delete_service(self, service):
|
||||
url = '/cluster/%s/services/%s' % (self.cluster_name, service)
|
||||
return self.rest.delete(url)
|
49
savanna/plugins/intel/client/session.py
Normal file
49
savanna/plugins/intel/client/session.py
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna import context
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get(ctx, session_id):
|
||||
url = '/cluster/%s/session/%s' % (ctx.cluster_name, session_id)
|
||||
return ctx.rest.get(url)
|
||||
|
||||
|
||||
def wait(ctx, session_id):
|
||||
#TODO(lazarev) add check on savanna cluster state (exit on delete)
|
||||
#TODO(alazarev) make configurable (bug #1262897)
|
||||
timeout = 4*60*60 # 4 hours
|
||||
cur_time = 0
|
||||
while cur_time < timeout:
|
||||
info_items = get(ctx, session_id)['items']
|
||||
for item in info_items:
|
||||
progress = item['nodeprogress']
|
||||
if progress['info'].strip() == '_ALLFINISH':
|
||||
return
|
||||
else:
|
||||
context.sleep(10)
|
||||
cur_time += 10
|
||||
|
||||
debug_msg = 'Hostname: %s\nInfo: %s'
|
||||
debug_msg = debug_msg % (progress['hostname'], progress['info'])
|
||||
LOG.debug(debug_msg)
|
||||
else:
|
||||
raise iex.IntelPluginException(
|
||||
"Cluster '%s' has failed to start in %s minutes"
|
||||
% (ctx.cluster_name, timeout / 60))
|
128
savanna/plugins/intel/config_helper.py
Normal file
128
savanna/plugins/intel/config_helper.py
Normal file
@ -0,0 +1,128 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.plugins import provisioning as p
|
||||
from savanna.utils import xmlutils as x
|
||||
|
||||
|
||||
CORE_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
|
||||
'plugins/intel/resources/hadoop-default.xml')
|
||||
|
||||
HDFS_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
|
||||
'plugins/intel/resources/hdfs-default.xml')
|
||||
|
||||
MAPRED_DEFAULT = x.load_hadoop_xml_defaults_with_type_and_locale(
|
||||
'plugins/intel/resources/mapred-default.xml')
|
||||
|
||||
XML_CONFS = {
|
||||
"Hadoop": [CORE_DEFAULT],
|
||||
"HDFS": [HDFS_DEFAULT],
|
||||
"MapReduce": [MAPRED_DEFAULT]
|
||||
}
|
||||
|
||||
IDH_TARBALL_URL = p.Config('IDH tarball URL', 'general', 'cluster', priority=1,
|
||||
default_value='http://repo1.intelhadoop.com:3424/'
|
||||
'setup/setup-intelhadoop-'
|
||||
'2.5.1-en-evaluation.RHEL.tar.gz')
|
||||
|
||||
OS_REPO_URL = p.Config('OS repository URL', 'general', 'cluster', priority=1,
|
||||
is_optional=True,
|
||||
default_value='http://mirror.centos.org/'
|
||||
'centos-6/6/os/x86_64')
|
||||
|
||||
IDH_REPO_URL = p.Config('IDH repository URL', 'general', 'cluster',
|
||||
priority=1, is_optional=True,
|
||||
default_value='http://repo1.intelhadoop.com:3424'
|
||||
'/evaluation/en/RHEL/2.5.1/rpm')
|
||||
|
||||
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
|
||||
config_type="bool", priority=1,
|
||||
default_value=True, is_optional=True)
|
||||
|
||||
HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir',
|
||||
'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir']
|
||||
|
||||
CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication',
|
||||
'dfs.replication.min', 'dfs.replication.max',
|
||||
'io.file.buffer.size', 'mapreduce.job.counters.max',
|
||||
'mapred.output.compress', 'io.compression.codecs',
|
||||
'mapred.output.compression.codec',
|
||||
'mapred.output.compression.type',
|
||||
'mapred.compress.map.output',
|
||||
'mapred.map.output.compression.codec']
|
||||
|
||||
PRIORITY_1_CONFS = ['dfs.datanode.du.reserved',
|
||||
'dfs.datanode.failed.volumes.tolerated',
|
||||
'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count',
|
||||
'dfs.namenode.handler.count', 'mapred.child.java.opts',
|
||||
'mapred.jobtracker.maxtasks.per.job',
|
||||
'mapred.job.tracker.handler.count',
|
||||
'mapred.map.child.java.opts',
|
||||
'mapred.reduce.child.java.opts',
|
||||
'io.sort.mb', 'mapred.tasktracker.map.tasks.maximum',
|
||||
'mapred.tasktracker.reduce.tasks.maximum']
|
||||
|
||||
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
|
||||
|
||||
CFG_TYPE = {
|
||||
"Boolean": "bool",
|
||||
"String": "string",
|
||||
"Integer": "int",
|
||||
"Choose": "string",
|
||||
"Class": "string",
|
||||
"Directory": "string",
|
||||
"Float": "string",
|
||||
"Int_range": "string",
|
||||
}
|
||||
|
||||
|
||||
def _initialise_configs():
|
||||
configs = []
|
||||
|
||||
for service, config_lists in XML_CONFS.iteritems():
|
||||
for config_list in config_lists:
|
||||
for config in config_list:
|
||||
if config['name'] not in HIDDEN_CONFS:
|
||||
cfg = p.Config(
|
||||
config['name'], service, "cluster", is_optional=True,
|
||||
config_type="string",
|
||||
default_value=str(config['value']),
|
||||
description=config['description'])
|
||||
|
||||
if config.get('type'):
|
||||
cfg.config_type = CFG_TYPE[config['type']]
|
||||
if cfg.config_type == 'bool':
|
||||
cfg.default_value = cfg.default_value == 'true'
|
||||
if cfg.config_type == 'int':
|
||||
if cfg.default_value:
|
||||
cfg.default_value = int(cfg.default_value)
|
||||
else:
|
||||
cfg.config_type = 'string'
|
||||
if config['name'] in PRIORITY_1_CONFS:
|
||||
cfg.priority = 1
|
||||
configs.append(cfg)
|
||||
|
||||
configs.append(IDH_TARBALL_URL)
|
||||
configs.append(IDH_REPO_URL)
|
||||
configs.append(OS_REPO_URL)
|
||||
configs.append(ENABLE_SWIFT)
|
||||
return configs
|
||||
|
||||
|
||||
PLUGIN_CONFIGS = _initialise_configs()
|
||||
|
||||
|
||||
def get_plugin_configs():
|
||||
return PLUGIN_CONFIGS
|
31
savanna/plugins/intel/exceptions.py
Normal file
31
savanna/plugins/intel/exceptions.py
Normal file
@ -0,0 +1,31 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import savanna.exceptions as e
|
||||
|
||||
|
||||
class NotSingleManagerException(e.SavannaException):
|
||||
message = "Intel hadoop cluster should contain only 1 Intel " \
|
||||
"Manager instance. Actual manager count is %s"
|
||||
|
||||
def __init__(self, mng_count):
|
||||
self.message = self.message % mng_count
|
||||
self.code = "NOT_SINGLE_MANAGER"
|
||||
|
||||
|
||||
class IntelPluginException(e.SavannaException):
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
self.code = "INTEL_PLUGIN_EXCEPTION"
|
411
savanna/plugins/intel/installer.py
Normal file
411
savanna/plugins/intel/installer.py
Normal file
@ -0,0 +1,411 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
import telnetlib
|
||||
|
||||
from savanna import conductor
|
||||
from savanna import context
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.general import utils as u
|
||||
from savanna.plugins.intel.client import client as c
|
||||
from savanna.plugins.intel import config_helper as c_helper
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
from savanna.swift import swift_helper as swift
|
||||
from savanna.utils import crypto
|
||||
|
||||
|
||||
conductor = conductor.API
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_INST_CONF_TEMPLATE = """
|
||||
network_interface=eth0
|
||||
mode=silent
|
||||
accept_jdk_license=accept
|
||||
how_to_setup_os_repo=2
|
||||
os_repo=%s
|
||||
os_repo_username=
|
||||
os_repo_password=
|
||||
os_repo_proxy=
|
||||
how_to_setup_idh_repo=1
|
||||
idh_repo=%s
|
||||
idh_repo_username=
|
||||
idh_repo_password=
|
||||
idh_repo_proxy=
|
||||
firewall_selinux_setting=1"""
|
||||
|
||||
SWIFT_LIB_URL = \
|
||||
'http://savanna-files.mirantis.com/hadoop-swift/hadoop-swift-latest.jar'
|
||||
|
||||
|
||||
def install_manager(cluster):
|
||||
LOG.info("Starting Install Manager Process")
|
||||
mng_instance = u.get_instance(cluster, 'manager')
|
||||
|
||||
idh_tarball_path = \
|
||||
cluster.cluster_configs['general'].get('IDH tarball URL')
|
||||
if not idh_tarball_path:
|
||||
idh_tarball_path = c_helper.IDH_TARBALL_URL.default_value
|
||||
|
||||
idh_tarball_filename = idh_tarball_path.rsplit('/', 1)[-1]
|
||||
idh_dir = idh_tarball_filename[:idh_tarball_filename.find('.tar.gz')]
|
||||
LOG.info("IDH tgz will be retrieved from: \'%s\'", idh_tarball_path)
|
||||
|
||||
idh_repo = cluster.cluster_configs['general'].get('IDH repository URL')
|
||||
if not idh_repo:
|
||||
idh_repo = c_helper.IDH_REPO_URL.default_value
|
||||
|
||||
os_repo = cluster.cluster_configs['general'].get('OS repository URL')
|
||||
if not os_repo:
|
||||
os_repo = c_helper.OS_REPO_URL.default_value
|
||||
|
||||
idh_install_cmd = 'sudo ./%s/install.sh --mode=silent 2>&1' % idh_dir
|
||||
|
||||
with mng_instance.remote() as r:
|
||||
LOG.info("Download IDH manager ")
|
||||
try:
|
||||
r.execute_command('curl -O %s 2>&1' % idh_tarball_path)
|
||||
except Exception as e:
|
||||
raise RuntimeError("Unable to download IDH manager from %s",
|
||||
idh_tarball_path, e)
|
||||
|
||||
# unpack archive
|
||||
LOG.info("Unpack manager %s ", idh_tarball_filename)
|
||||
try:
|
||||
r.execute_command('tar xzf %s 2>&1' % idh_tarball_filename)
|
||||
except Exception as e:
|
||||
raise RuntimeError("Unable to unpack tgz %s",
|
||||
idh_tarball_filename, e)
|
||||
|
||||
# install idh
|
||||
LOG.debug("Install manager with %s : ", idh_install_cmd)
|
||||
inst_conf = _INST_CONF_TEMPLATE % (os_repo, idh_repo)
|
||||
r.write_file_to('%s/ui-installer/conf' % idh_dir, inst_conf)
|
||||
#TODO(alazarev) make timeout configurable (bug #1262897)
|
||||
r.execute_command(idh_install_cmd, timeout=3600)
|
||||
|
||||
# fix nginx persimmions bug
|
||||
r.execute_command('sudo chmod o+x /var/lib/nginx/ /var/lib/nginx/tmp '
|
||||
'/var/lib/nginx/tmp/client_body')
|
||||
|
||||
# waiting start idh manager
|
||||
#TODO(alazarev) make timeout configurable (bug #1262897)
|
||||
timeout = 600
|
||||
LOG.debug("Waiting %s seconds for Manager to start : ", timeout)
|
||||
while timeout:
|
||||
try:
|
||||
telnetlib.Telnet(mng_instance.management_ip, 9443)
|
||||
break
|
||||
except IOError:
|
||||
timeout -= 2
|
||||
context.sleep(2)
|
||||
else:
|
||||
message = ("IDH Manager failed to start in %s minutes on node '%s' "
|
||||
"of cluster '%s'"
|
||||
% (timeout / 60, mng_instance.management_ip, cluster.name))
|
||||
LOG.error(message)
|
||||
raise iex.IntelPluginException(message)
|
||||
|
||||
|
||||
def configure_os(cluster):
|
||||
instances = u.get_instances(cluster)
|
||||
configure_os_from_instances(cluster, instances)
|
||||
|
||||
|
||||
def create_hadoop_ssh_keys(cluster):
|
||||
private_key, public_key = crypto.generate_key_pair()
|
||||
extra = {
|
||||
'hadoop_private_ssh_key': private_key,
|
||||
'hadoop_public_ssh_key': public_key
|
||||
}
|
||||
return conductor.cluster_update(context.ctx(), cluster, {'extra': extra})
|
||||
|
||||
|
||||
def configure_os_from_instances(cluster, instances):
|
||||
for instance in instances:
|
||||
with instance.remote() as remote:
|
||||
LOG.debug("Configuring OS settings on %s : ", instance.hostname())
|
||||
|
||||
# configure hostname, RedHat/Centos specific
|
||||
remote.replace_remote_string('/etc/sysconfig/network',
|
||||
'HOSTNAME=.*',
|
||||
'HOSTNAME=%s' % instance.hostname())
|
||||
# disable selinux and iptables, because Intel distribution requires
|
||||
# this to be off
|
||||
remote.execute_command('sudo /usr/sbin/setenforce 0')
|
||||
remote.replace_remote_string('/etc/selinux/config',
|
||||
'SELINUX=.*', 'SELINUX=disabled')
|
||||
# disable iptables
|
||||
remote.execute_command('sudo /sbin/service iptables stop')
|
||||
remote.execute_command('sudo /sbin/chkconfig iptables off')
|
||||
|
||||
# create 'hadoop' user
|
||||
remote.write_files_to({
|
||||
'id_rsa': cluster.extra.get('hadoop_private_ssh_key'),
|
||||
'authorized_keys': cluster.extra.get('hadoop_public_ssh_key')
|
||||
})
|
||||
remote.execute_command(
|
||||
'sudo useradd hadoop && '
|
||||
'sudo sh -c \'echo "hadoop ALL=(ALL) NOPASSWD:ALL" '
|
||||
'>> /etc/sudoers\' && '
|
||||
'sudo mkdir -p /home/hadoop/.ssh/ && '
|
||||
'sudo mv id_rsa authorized_keys /home/hadoop/.ssh && '
|
||||
'sudo chown -R hadoop:hadoop /home/hadoop/.ssh && '
|
||||
'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}')
|
||||
|
||||
swift_enable = \
|
||||
cluster.cluster_configs['general'].get('Enable Swift')
|
||||
if not swift_enable:
|
||||
swift_enable = c_helper.ENABLE_SWIFT.default_value
|
||||
|
||||
if swift_enable:
|
||||
swift_lib_path = '/usr/lib/hadoop/lib/hadoop-swift-latest.jar'
|
||||
cmd = ('sudo curl \'%s\' -o %s --create-dirs'
|
||||
% (SWIFT_LIB_URL, swift_lib_path))
|
||||
remote.execute_command(cmd)
|
||||
|
||||
|
||||
def _configure_services(client, cluster):
|
||||
nn_host = u.get_namenode(cluster).hostname()
|
||||
snn = u.get_secondarynamenodes(cluster)
|
||||
snn_host = snn[0].hostname() if snn else None
|
||||
jt_host = u.get_jobtracker(cluster).hostname()
|
||||
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
|
||||
tt_hosts = [tt.hostname() for tt in u.get_tasktrackers(cluster)]
|
||||
|
||||
oozie_host = u.get_oozie(cluster).hostname() if u.get_oozie(
|
||||
cluster) else None
|
||||
hive_host = u.get_hiveserver(cluster).hostname() if u.get_hiveserver(
|
||||
cluster) else None
|
||||
|
||||
services = []
|
||||
if u.get_namenode(cluster):
|
||||
services += ['hdfs']
|
||||
|
||||
if u.get_jobtracker(cluster):
|
||||
services += ['mapred']
|
||||
|
||||
if oozie_host:
|
||||
services += ['oozie']
|
||||
|
||||
if hive_host:
|
||||
services += ['hive']
|
||||
|
||||
LOG.debug("Add services: %s" % ', '.join(services))
|
||||
client.services.add(services)
|
||||
|
||||
LOG.debug("Assign roles to hosts")
|
||||
client.services.hdfs.add_nodes('PrimaryNameNode', [nn_host])
|
||||
|
||||
client.services.hdfs.add_nodes('DataNode', dn_hosts)
|
||||
if snn:
|
||||
client.services.hdfs.add_nodes('SecondaryNameNode', [snn_host])
|
||||
|
||||
if oozie_host:
|
||||
client.services.oozie.add_nodes('Oozie', [oozie_host])
|
||||
|
||||
if hive_host:
|
||||
client.services.hive.add_nodes('HiveServer', [hive_host])
|
||||
|
||||
client.services.mapred.add_nodes('JobTracker', [jt_host])
|
||||
client.services.mapred.add_nodes('TaskTracker', tt_hosts)
|
||||
|
||||
|
||||
def _configure_storage(client, cluster):
|
||||
datanode_ng = u.get_node_groups(cluster, 'datanode')[0]
|
||||
storage_paths = datanode_ng.storage_paths()
|
||||
dn_hosts = [i.hostname() for i in u.get_datanodes(cluster)]
|
||||
|
||||
name_dir_param = ",".join(
|
||||
[st_path + '/dfs/name' for st_path in storage_paths])
|
||||
data_dir_param = ",".join(
|
||||
[st_path + '/dfs/data' for st_path in storage_paths])
|
||||
client.params.hdfs.update('dfs.name.dir', name_dir_param)
|
||||
client.params.hdfs.update('dfs.data.dir', data_dir_param, nodes=dn_hosts)
|
||||
|
||||
|
||||
def _configure_swift(client, cluster):
|
||||
swift_enable = cluster.cluster_configs['general'].get('Enable Swift')
|
||||
if swift_enable is None or swift_enable:
|
||||
swift_configs = swift.get_swift_configs()
|
||||
for conf in swift_configs:
|
||||
client.params.hadoop.add(conf['name'], conf['value'])
|
||||
|
||||
|
||||
def _add_user_params(client, cluster):
|
||||
for p in six.iteritems(cluster.cluster_configs["Hadoop"]):
|
||||
client.params.hadoop.update(p[0], p[1])
|
||||
|
||||
for p in six.iteritems(cluster.cluster_configs["HDFS"]):
|
||||
client.params.hdfs.update(p[0], p[1])
|
||||
|
||||
for p in six.iteritems(cluster.cluster_configs["MapReduce"]):
|
||||
client.params.mapred.update(p[0], p[1])
|
||||
|
||||
|
||||
def install_cluster(cluster):
|
||||
mng_instance = u.get_instance(cluster, 'manager')
|
||||
mng_ip = mng_instance.management_ip
|
||||
|
||||
all_hosts = list(set([i.hostname() for i in u.get_instances(cluster)]))
|
||||
|
||||
client = c.IntelClient(mng_ip, cluster.name)
|
||||
|
||||
LOG.info("Create cluster")
|
||||
client.cluster.create()
|
||||
|
||||
LOG.info("Add nodes to cluster")
|
||||
rack = '/Default'
|
||||
client.nodes.add(all_hosts, rack, 'hadoop',
|
||||
'/home/hadoop/.ssh/id_rsa')
|
||||
|
||||
LOG.info("Install software")
|
||||
client.cluster.install_software(all_hosts)
|
||||
|
||||
LOG.info("Configure services")
|
||||
_configure_services(client, cluster)
|
||||
|
||||
LOG.info("Deploy cluster")
|
||||
client.nodes.config(force=True)
|
||||
|
||||
LOG.info("Provisioning configs")
|
||||
# cinder and ephemeral drive support
|
||||
_configure_storage(client, cluster)
|
||||
# swift support
|
||||
_configure_swift(client, cluster)
|
||||
# user configs
|
||||
_add_user_params(client, cluster)
|
||||
|
||||
LOG.info("Format HDFS")
|
||||
client.services.hdfs.format()
|
||||
|
||||
|
||||
def start_cluster(cluster):
|
||||
client = c.IntelClient(
|
||||
u.get_instance(cluster, 'manager').management_ip, cluster.name)
|
||||
|
||||
LOG.debug("Starting hadoop services")
|
||||
client.services.hdfs.start()
|
||||
|
||||
client.services.mapred.start()
|
||||
if u.get_hiveserver(cluster):
|
||||
client.services.hive.start()
|
||||
|
||||
if u.get_oozie(cluster):
|
||||
client.services.oozie.start()
|
||||
|
||||
|
||||
def scale_cluster(cluster, instances):
|
||||
scale_ins_hosts = [i.hostname() for i in instances]
|
||||
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
|
||||
tt_hosts = [tt.hostname() for tt in u.get_tasktrackers(cluster)]
|
||||
to_scale_dn = []
|
||||
to_scale_tt = []
|
||||
for i in scale_ins_hosts:
|
||||
if i in dn_hosts:
|
||||
to_scale_dn.append(i)
|
||||
|
||||
if i in tt_hosts:
|
||||
to_scale_tt.append(i)
|
||||
|
||||
mng_ip = u.get_instance(cluster, 'manager').management_ip
|
||||
client = c.IntelClient(mng_ip, cluster.name)
|
||||
rack = '/Default'
|
||||
client.nodes.add(scale_ins_hosts, rack, 'hadoop',
|
||||
cluster.extra['manager_authzkeyfile_path'])
|
||||
client.cluster.install_software(scale_ins_hosts)
|
||||
|
||||
if to_scale_tt:
|
||||
client.services.mapred.add_nodes('TaskTracker', to_scale_tt)
|
||||
|
||||
if to_scale_dn:
|
||||
client.services.hdfs.add_nodes('DataNode', to_scale_dn)
|
||||
|
||||
client.nodes.config()
|
||||
|
||||
if to_scale_dn:
|
||||
client.services.hdfs.start()
|
||||
|
||||
if to_scale_tt:
|
||||
client.services.mapred.start()
|
||||
|
||||
|
||||
def decommission_nodes(cluster, instances):
|
||||
dec_hosts = [i.hostname() for i in instances]
|
||||
dn_hosts = [dn.hostname() for dn in u.get_datanodes(cluster)]
|
||||
tt_hosts = [dn.hostname() for dn in u.get_tasktrackers(cluster)]
|
||||
|
||||
mng_ip = u.get_instances(cluster, 'manager')[0].management_ip
|
||||
client = c.IntelClient(mng_ip, cluster.name)
|
||||
|
||||
dec_dn_hosts = []
|
||||
for dec_host in dec_hosts:
|
||||
if dec_host in dn_hosts:
|
||||
dec_dn_hosts.append(dec_host)
|
||||
|
||||
if dec_dn_hosts:
|
||||
client.services.hdfs.decommission_nodes(dec_dn_hosts)
|
||||
|
||||
#TODO(alazarev) make timeout configurable (bug #1262897)
|
||||
timeout = 14400 # 4 hours
|
||||
cur_time = 0
|
||||
for host in dec_dn_hosts:
|
||||
while cur_time < timeout:
|
||||
if client.services.hdfs.get_datanode_status(
|
||||
host) == 'Decomissioned':
|
||||
break
|
||||
context.sleep(5)
|
||||
cur_time += 5
|
||||
else:
|
||||
LOG.warn("Failed to decomission node '%s' of cluster '%s' "
|
||||
"in %s minutes" % (host, cluster.name, timeout/60))
|
||||
|
||||
client.nodes.stop(dec_hosts)
|
||||
|
||||
# wait stop services
|
||||
#TODO(alazarev) make timeout configurable (bug #1262897)
|
||||
timeout = 600 # 10 minutes
|
||||
cur_time = 0
|
||||
for instance in instances:
|
||||
while cur_time < timeout:
|
||||
stopped = True
|
||||
if instance.hostname() in dn_hosts:
|
||||
code, out = instance.remote().execute_command(
|
||||
'sudo /sbin/service hadoop-datanode status',
|
||||
raise_when_error=False)
|
||||
if out.strip() != 'datanode is stopped':
|
||||
stopped = False
|
||||
if out.strip() == 'datanode dead but pid file exists':
|
||||
instance.remote().execute_command(
|
||||
'sudo rm -f '
|
||||
'/var/run/hadoop/hadoop-hadoop-datanode.pid')
|
||||
if instance.hostname() in tt_hosts:
|
||||
code, out = instance.remote().execute_command(
|
||||
'sudo /sbin/service hadoop-tasktracker status',
|
||||
raise_when_error=False)
|
||||
if out.strip() != 'tasktracker is stopped':
|
||||
stopped = False
|
||||
if stopped:
|
||||
break
|
||||
else:
|
||||
context.sleep(5)
|
||||
cur_time += 5
|
||||
else:
|
||||
LOG.warn("Failed to stop services on node '%s' of cluster '%s' "
|
||||
"in %s minutes" % (instance, cluster.name, timeout/60))
|
||||
|
||||
for node in dec_hosts:
|
||||
LOG.info("Deleting node '%s' on cluster '%s'" % (node, cluster.name))
|
||||
client.nodes.delete(node)
|
173
savanna/plugins/intel/plugin.py
Normal file
173
savanna/plugins/intel/plugin.py
Normal file
@ -0,0 +1,173 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna import conductor
|
||||
from savanna import context
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.plugins.general import exceptions as ex
|
||||
from savanna.plugins.general import utils as u
|
||||
from savanna.plugins.intel import config_helper as c_helper
|
||||
from savanna.plugins.intel import exceptions as i_ex
|
||||
from savanna.plugins.intel import installer as ins
|
||||
from savanna.plugins import provisioning as p
|
||||
|
||||
conductor = conductor.API
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class IDHProvider(p.ProvisioningPluginBase):
|
||||
def __init__(self):
|
||||
self.processes = {
|
||||
"Manager": ["manager"],
|
||||
"HDFS": ["namenode", "datanode", "secondarynamenode"],
|
||||
"MapReduce": ["jobtracker", "tasktracker"],
|
||||
"Hadoop": [] # for hadoop parameters in UI
|
||||
}
|
||||
|
||||
def get_description(self):
|
||||
return \
|
||||
'The IDH OpenStack plugin works with project ' \
|
||||
'Savanna to automate the deployment of the Intel Distribution ' \
|
||||
'of Apache Hadoop on OpenStack based ' \
|
||||
'public & private clouds'
|
||||
|
||||
def get_node_processes(self, hadoop_version):
|
||||
return self.processes
|
||||
|
||||
def get_versions(self):
|
||||
return ['2.5.1']
|
||||
|
||||
def get_title(self):
|
||||
return "Intel(R) Distribution for Apache Hadoop* Software"
|
||||
|
||||
def get_configs(self, hadoop_version):
|
||||
return c_helper.get_plugin_configs()
|
||||
|
||||
def get_hive_config_path(self):
|
||||
return '/etc/hive/conf/hive-site.xml'
|
||||
|
||||
def configure_cluster(self, cluster):
|
||||
LOG.info("Configure IDH cluster")
|
||||
cluster = ins.create_hadoop_ssh_keys(cluster)
|
||||
ins.configure_os(cluster)
|
||||
ins.install_manager(cluster)
|
||||
ins.install_cluster(cluster)
|
||||
|
||||
def start_cluster(self, cluster):
|
||||
LOG.info("Start IDH cluster")
|
||||
ins.start_cluster(cluster)
|
||||
self._set_cluster_info(cluster)
|
||||
|
||||
def validate(self, cluster):
|
||||
nn_count = sum([ng.count for ng
|
||||
in u.get_node_groups(cluster, 'namenode')])
|
||||
if nn_count != 1:
|
||||
raise ex.NotSingleNameNodeException(nn_count)
|
||||
|
||||
jt_count = sum([ng.count for ng
|
||||
in u.get_node_groups(cluster, 'jobtracker')])
|
||||
if jt_count > 1:
|
||||
raise ex.NotSingleJobTrackerException(jt_count)
|
||||
|
||||
tt_count = sum([ng.count for ng
|
||||
in u.get_node_groups(cluster, 'tasktracker')])
|
||||
if jt_count == 0 and tt_count > 0:
|
||||
raise ex.TaskTrackersWithoutJobTracker()
|
||||
|
||||
mng_count = sum([ng.count for ng
|
||||
in u.get_node_groups(cluster, 'manager')])
|
||||
if mng_count != 1:
|
||||
raise i_ex.NotSingleManagerException(mng_count)
|
||||
|
||||
def scale_cluster(self, cluster, instances):
|
||||
ins.configure_os_from_instances(cluster, instances)
|
||||
ins.scale_cluster(cluster, instances)
|
||||
|
||||
def decommission_nodes(self, cluster, instances):
|
||||
ins.decommission_nodes(cluster, instances)
|
||||
|
||||
def validate_scaling(self, cluster, existing, additional):
|
||||
self._validate_additional_ng_scaling(cluster, additional)
|
||||
self._validate_existing_ng_scaling(cluster, existing)
|
||||
|
||||
def _get_scalable_processes(self):
|
||||
return ["datanode", "tasktracker"]
|
||||
|
||||
def _get_by_id(self, lst, id):
|
||||
for obj in lst:
|
||||
if obj.id == id:
|
||||
return obj
|
||||
|
||||
def _validate_additional_ng_scaling(self, cluster, additional):
|
||||
jt = u.get_jobtracker(cluster)
|
||||
scalable_processes = self._get_scalable_processes()
|
||||
|
||||
for ng_id in additional:
|
||||
ng = self._get_by_id(cluster.node_groups, ng_id)
|
||||
if not set(ng.node_processes).issubset(scalable_processes):
|
||||
raise ex.NodeGroupCannotBeScaled(
|
||||
ng.name, "Intel plugin cannot scale nodegroup"
|
||||
" with processes: " +
|
||||
' '.join(ng.node_processes))
|
||||
if not jt and 'tasktracker' in ng.node_processes:
|
||||
raise ex.NodeGroupCannotBeScaled(
|
||||
ng.name, "Intel plugin cannot scale node group with "
|
||||
"processes which have no master-processes run "
|
||||
"in cluster")
|
||||
|
||||
def _validate_existing_ng_scaling(self, cluster, existing):
|
||||
scalable_processes = self._get_scalable_processes()
|
||||
dn_to_delete = 0
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in existing:
|
||||
if ng.count > existing[ng.id] and "datanode" in \
|
||||
ng.node_processes:
|
||||
dn_to_delete += ng.count - existing[ng.id]
|
||||
if not set(ng.node_processes).issubset(scalable_processes):
|
||||
raise ex.NodeGroupCannotBeScaled(
|
||||
ng.name, "Intel plugin cannot scale nodegroup"
|
||||
" with processes: " +
|
||||
' '.join(ng.node_processes))
|
||||
|
||||
def _set_cluster_info(self, cluster):
|
||||
mng = u.get_instances(cluster, 'manager')[0]
|
||||
nn = u.get_namenode(cluster)
|
||||
jt = u.get_jobtracker(cluster)
|
||||
oozie = u.get_oozie(cluster)
|
||||
|
||||
#TODO(alazarev) make port configurable (bug #1262895)
|
||||
info = {'IDH Manager': {
|
||||
'Web UI': 'https://%s:9443' % mng.management_ip
|
||||
}}
|
||||
|
||||
if jt:
|
||||
#TODO(alazarev) make port configurable (bug #1262895)
|
||||
info['MapReduce'] = {
|
||||
'Web UI': 'http://%s:50030' % jt.management_ip
|
||||
}
|
||||
if nn:
|
||||
#TODO(alazarev) make port configurable (bug #1262895)
|
||||
info['HDFS'] = {
|
||||
'Web UI': 'http://%s:50070' % nn.management_ip
|
||||
}
|
||||
|
||||
if oozie:
|
||||
#TODO(alazarev) make port configurable (bug #1262895)
|
||||
info['JobFlow'] = {
|
||||
'Oozie': 'http://%s:11000' % oozie.management_ip
|
||||
}
|
||||
|
||||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
103
savanna/plugins/intel/resources/configuration.xsd
Normal file
103
savanna/plugins/intel/resources/configuration.xsd
Normal file
@ -0,0 +1,103 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
|
||||
<xs:complexType name="display">
|
||||
<xs:sequence>
|
||||
<xs:element name="en">
|
||||
<xs:simpleType>
|
||||
<xs:restriction base="xs:string">
|
||||
<xs:minLength value="1" />
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
</xs:sequence>
|
||||
</xs:complexType>
|
||||
<xs:simpleType name="valueType">
|
||||
<xs:restriction base="xs:string">
|
||||
<xs:enumeration value="Boolean" />
|
||||
<xs:enumeration value="Integer" />
|
||||
<xs:enumeration value="Float" />
|
||||
<xs:enumeration value="IP" />
|
||||
<xs:enumeration value="Port" />
|
||||
<xs:enumeration value="IPWithPort" />
|
||||
<xs:enumeration value="IPWithMask" />
|
||||
<xs:enumeration value="URL" />
|
||||
<xs:enumeration value="String" />
|
||||
<xs:enumeration value="MepRedCapacity" />
|
||||
<xs:enumeration value="HBaseClientScannerCaching" />
|
||||
<xs:enumeration value="Class" />
|
||||
<xs:enumeration value="Choose" />
|
||||
<xs:enumeration value="Directory" />
|
||||
<xs:enumeration value="Int_range" />
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
<xs:element name="configuration">
|
||||
<xs:complexType mixed="true">
|
||||
<xs:sequence>
|
||||
<xs:element name="property" maxOccurs="unbounded">
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element name="name" type="xs:string" />
|
||||
<xs:element name="value" type="xs:string" />
|
||||
<xs:element name="intel_default" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="recommendation" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="valuetype" type="valueType" />
|
||||
<xs:element name="group" type="xs:string" />
|
||||
<xs:element name="definition" type="display" />
|
||||
<xs:element name="description" type="display" />
|
||||
<xs:element name="global" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="allowempty" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="readonly" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="hide" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="automatic" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="enable" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="reserved" type="xs:boolean" minOccurs="0" />
|
||||
<xs:element name="radios" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="script" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="type" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="form" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="chooselist" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="implementation" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="sectionname" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="refor" minOccurs="0">
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element name="refand">
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element name="value" type="xs:string" />
|
||||
<xs:element name="valuetype" type="valueType" />
|
||||
<xs:element name="index" type="xs:string" />
|
||||
</xs:all>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:all>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:all>
|
||||
<xs:attribute name="skipInDoc" type="xs:boolean" />
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element name="briefsection" minOccurs="0" maxOccurs="unbounded">
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element name="sectionname" type="xs:string" />
|
||||
<xs:element name="name_en" type="xs:string" />
|
||||
<xs:element name="description_en" type="xs:string" minOccurs="0" />
|
||||
<xs:element name="autoexpand" type="xs:boolean" />
|
||||
<xs:element name="showdescription" type="xs:boolean" />
|
||||
</xs:all>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element name="group" minOccurs="1" maxOccurs="unbounded">
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element name="id" type="xs:string" />
|
||||
<xs:element name="name_en" type="xs:string" />
|
||||
<xs:element name="description_en" type="xs:string" />
|
||||
</xs:all>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:sequence>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:schema>
|
1371
savanna/plugins/intel/resources/hadoop-default.xml
Normal file
1371
savanna/plugins/intel/resources/hadoop-default.xml
Normal file
File diff suppressed because it is too large
Load Diff
1193
savanna/plugins/intel/resources/hdfs-default.xml
Normal file
1193
savanna/plugins/intel/resources/hdfs-default.xml
Normal file
File diff suppressed because it is too large
Load Diff
2678
savanna/plugins/intel/resources/mapred-default.xml
Normal file
2678
savanna/plugins/intel/resources/mapred-default.xml
Normal file
File diff suppressed because it is too large
Load Diff
0
savanna/tests/unit/plugins/intel/__init__.py
Normal file
0
savanna/tests/unit/plugins/intel/__init__.py
Normal file
0
savanna/tests/unit/plugins/intel/client/__init__.py
Normal file
0
savanna/tests/unit/plugins/intel/client/__init__.py
Normal file
28
savanna/tests/unit/plugins/intel/client/response.py
Normal file
28
savanna/tests/unit/plugins/intel/client/response.py
Normal file
@ -0,0 +1,28 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class Response:
|
||||
def __init__(self, data=None, ok=True, status_code=200):
|
||||
self.text = json.dumps(data)
|
||||
self.ok = ok
|
||||
self.status_code = status_code
|
||||
self.reason = None
|
||||
|
||||
|
||||
def make_resp(data=None, ok=True, status_code=200):
|
||||
return Response(data, ok, status_code)
|
310
savanna/tests/unit/plugins/intel/client/test_client.py
Normal file
310
savanna/tests/unit/plugins/intel/client/test_client.py
Normal file
@ -0,0 +1,310 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from savanna import exceptions as ex
|
||||
from savanna.plugins.intel.client import client as c
|
||||
from savanna.plugins.intel import exceptions as iex
|
||||
from savanna.tests.unit import base
|
||||
from savanna.tests.unit.plugins.intel.client import response as r
|
||||
|
||||
|
||||
SESSION_POST_DATA = {'sessionID': '123'}
|
||||
SESSION_GET_DATA = {"items": [
|
||||
{
|
||||
"nodeprogress": {
|
||||
"hostname": 'host',
|
||||
'info': '_ALLFINISH\n'
|
||||
}
|
||||
}
|
||||
]}
|
||||
|
||||
|
||||
class TestClient(base.DbTestCase):
|
||||
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_cluster_op(self, get, post):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
data = {'lelik': 'bolik'}
|
||||
|
||||
post.return_value = r.make_resp(data)
|
||||
self.assertEqual(client.cluster.create(), data)
|
||||
|
||||
get.return_value = r.make_resp(data)
|
||||
self.assertEqual(client.cluster.get(), data)
|
||||
|
||||
post.return_value = r.make_resp(SESSION_POST_DATA)
|
||||
get.return_value = r.make_resp(SESSION_GET_DATA)
|
||||
client.cluster.install_software(['bla-bla'])
|
||||
|
||||
self.assertEqual(post.call_count, 2)
|
||||
self.assertEqual(get.call_count, 2)
|
||||
|
||||
@mock.patch('requests.delete')
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_nodes_op(self, get, post, delete):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
# add
|
||||
post.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"iporhostname": "n1",
|
||||
"info": "Connected"
|
||||
},
|
||||
{
|
||||
"iporhostname": "n2",
|
||||
"info": "Connected"
|
||||
}
|
||||
]
|
||||
})
|
||||
client.nodes.add(['n1', 'n2'], 'hadoop', '/Def', '/tmp/key')
|
||||
post.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"iporhostname": "n1",
|
||||
"info": "bla-bla"
|
||||
}
|
||||
]
|
||||
})
|
||||
self.assertRaises(iex.IntelPluginException, client.nodes.add,
|
||||
['n1'], 'hadoop', '/Def', '/tmp/key')
|
||||
|
||||
# config
|
||||
post.return_value = r.make_resp(SESSION_POST_DATA)
|
||||
get.return_value = r.make_resp(SESSION_GET_DATA)
|
||||
client.nodes.config()
|
||||
|
||||
# delete
|
||||
delete.return_value = r.make_resp()
|
||||
client.nodes.delete(['n1'])
|
||||
|
||||
# get
|
||||
get.return_value = r.make_resp()
|
||||
client.nodes.get()
|
||||
|
||||
# get_status
|
||||
get.return_value = r.make_resp(data={"status": "running"})
|
||||
client.nodes.get_status(['n1'])
|
||||
|
||||
# stop_nodes
|
||||
post.return_value = r.make_resp()
|
||||
client.nodes.stop(['n1'])
|
||||
|
||||
self.assertEqual(delete.call_count, 1)
|
||||
self.assertEqual(post.call_count, 4)
|
||||
self.assertEqual(get.call_count, 3)
|
||||
|
||||
@mock.patch('requests.put')
|
||||
@mock.patch('requests.post')
|
||||
def test_params_op(self, post, put):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
post.return_value = r.make_resp()
|
||||
put.return_value = r.make_resp()
|
||||
|
||||
# add
|
||||
client.params.hdfs.add('lelik', 'bolik')
|
||||
client.params.hadoop.add('lelik', 'bolik')
|
||||
client.params.mapred.add('lelik', 'bolik')
|
||||
|
||||
# get
|
||||
self.assertRaises(ex.NotImplementedException, client.params.hdfs.get,
|
||||
['n1'], 'lelik')
|
||||
self.assertRaises(ex.NotImplementedException, client.params.hadoop.get,
|
||||
['n1'], 'lelik')
|
||||
self.assertRaises(ex.NotImplementedException, client.params.mapred.get,
|
||||
['n1'], 'lelik')
|
||||
|
||||
# update
|
||||
client.params.hdfs.update('lelik', 'bolik', nodes=['n1'])
|
||||
client.params.hdfs.update('lelik', 'bolik')
|
||||
client.params.hadoop.update('lelik', 'bolik', nodes=['n1'])
|
||||
client.params.hadoop.update('lelik', 'bolik')
|
||||
client.params.mapred.update('lelik', 'bolik', nodes=['n1'])
|
||||
client.params.mapred.update('lelik', 'bolik')
|
||||
|
||||
self.assertEqual(post.call_count, 3)
|
||||
self.assertEqual(put.call_count, 6)
|
||||
|
||||
@mock.patch('savanna.context.sleep', lambda x: None)
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_base_services_op(self, get, post):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
# start
|
||||
post.return_value = r.make_resp()
|
||||
get.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"serviceName": "hdfs",
|
||||
"status": "running"
|
||||
},
|
||||
{
|
||||
"serviceName": "mapred",
|
||||
"status": "running"
|
||||
}
|
||||
]})
|
||||
client.services.hdfs.start()
|
||||
client.services.mapred.start()
|
||||
|
||||
get.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"serviceName": "hdfs",
|
||||
"status": "stopped"
|
||||
},
|
||||
{
|
||||
"serviceName": "mapred",
|
||||
"status": "stopped"
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.hdfs.start)
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.mapred.start)
|
||||
|
||||
# stop
|
||||
post.return_value = r.make_resp()
|
||||
client.services.hdfs.stop()
|
||||
client.services.mapred.stop()
|
||||
|
||||
# service
|
||||
get.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"serviceName": "bla-bla",
|
||||
"status": "fail"
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.hdfs.status)
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.mapred.status)
|
||||
|
||||
# get_nodes
|
||||
get.return_value = r.make_resp()
|
||||
client.services.hdfs.get_nodes()
|
||||
client.services.mapred.get_nodes()
|
||||
|
||||
# add_nodes
|
||||
post.return_value = r.make_resp()
|
||||
client.services.hdfs.add_nodes('DataNode', ['n1', 'n2'])
|
||||
client.services.mapred.add_nodes('NameNode', ['n1', 'n2'])
|
||||
|
||||
self.assertEqual(get.call_count, 126)
|
||||
self.assertEqual(post.call_count, 8)
|
||||
|
||||
@mock.patch('requests.delete')
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_services_op(self, get, post, delete):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
# add
|
||||
post.return_value = r.make_resp()
|
||||
client.services.add(['hdfs', 'mapred'])
|
||||
|
||||
# get_services
|
||||
get.return_value = r.make_resp()
|
||||
client.services.get_services()
|
||||
|
||||
# delete_service
|
||||
delete.return_value = r.make_resp()
|
||||
client.services.delete_service('hdfs')
|
||||
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_hdfs_services_op(self, get, post):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
# format
|
||||
get.return_value = r.make_resp(SESSION_GET_DATA)
|
||||
post.return_value = r.make_resp(SESSION_POST_DATA)
|
||||
client.services.hdfs.format()
|
||||
|
||||
# decommission
|
||||
post.return_value = r.make_resp()
|
||||
client.services.hdfs.decommission_nodes(['n1'])
|
||||
|
||||
# get status
|
||||
get.return_value = r.make_resp(data={
|
||||
"items": [
|
||||
{
|
||||
"hostname": "n1",
|
||||
"status": "start"
|
||||
}
|
||||
]
|
||||
})
|
||||
client.services.hdfs.get_datanodes_status()
|
||||
self.assertEqual(client.services.hdfs.get_datanode_status('n1'),
|
||||
'start')
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.hdfs.get_datanode_status, 'n2')
|
||||
|
||||
self.assertEqual(get.call_count, 4)
|
||||
self.assertEqual(post.call_count, 2)
|
||||
|
||||
@mock.patch('savanna.context.sleep', lambda x: None)
|
||||
@mock.patch('requests.post')
|
||||
@mock.patch('requests.get')
|
||||
def test_session_op(self, get, post):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
|
||||
data1 = {
|
||||
"items": [
|
||||
{
|
||||
"nodeprogress": {
|
||||
"hostname": 'host',
|
||||
'info': 'info\n'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
data2 = {
|
||||
"items": [
|
||||
{
|
||||
"nodeprogress": {
|
||||
"hostname": 'host',
|
||||
'info': '_ALLFINISH\n'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
get.side_effect = (r.make_resp(data1), r.make_resp(data2))
|
||||
post.return_value = r.make_resp(SESSION_POST_DATA)
|
||||
|
||||
client.services.hdfs.format()
|
||||
|
||||
self.assertEqual(get.call_count, 2)
|
||||
self.assertEqual(post.call_count, 1)
|
||||
|
||||
@mock.patch('requests.get')
|
||||
def test_rest_client(self, get):
|
||||
client = c.IntelClient('qwe', 'rty')
|
||||
get.return_value = r.make_resp(ok=False, status_code=500, data={
|
||||
"message": "message"
|
||||
})
|
||||
self.assertRaises(iex.IntelPluginException,
|
||||
client.services.get_services)
|
61
savanna/tests/unit/plugins/intel/test_plugin.py
Normal file
61
savanna/tests/unit/plugins/intel/test_plugin.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.plugins.general import exceptions as g_ex
|
||||
from savanna.plugins.intel import config_helper as c_helper
|
||||
from savanna.plugins.intel import exceptions as i_ex
|
||||
from savanna.plugins.intel import plugin as p
|
||||
from savanna.tests.unit import base
|
||||
from savanna.tests.unit.plugins.intel import test_utils as tu
|
||||
|
||||
|
||||
class TestIDHPlugin(base.DbTestCase):
|
||||
def test_get_configs(self):
|
||||
plugin = p.IDHProvider()
|
||||
configs = plugin.get_configs('2.5.0')
|
||||
|
||||
self.assertIn(c_helper.IDH_REPO_URL, configs)
|
||||
self.assertIn(c_helper.IDH_TARBALL_URL, configs)
|
||||
self.assertIn(c_helper.OS_REPO_URL, configs)
|
||||
|
||||
def test_validate(self):
|
||||
plugin = p.IDHProvider()
|
||||
|
||||
ng_mng = tu.make_ng_dict('mng', 'f1', ['manager'], 1)
|
||||
ng_nn = tu.make_ng_dict('nn', 'f1', ['namenode'], 1)
|
||||
ng_jt = tu.make_ng_dict('jt', 'f1', ['jobtracker'], 1)
|
||||
ng_dn = tu.make_ng_dict('dn', 'f1', ['datanode'], 2)
|
||||
ng_tt = tu.make_ng_dict('tt', 'f1', ['tasktracker'], 2)
|
||||
|
||||
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
|
||||
[ng_nn] + [ng_dn])
|
||||
self.assertRaises(i_ex.NotSingleManagerException, plugin.validate, cl)
|
||||
|
||||
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0', [ng_mng])
|
||||
self.assertRaises(g_ex.NotSingleNameNodeException, plugin.validate, cl)
|
||||
|
||||
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
|
||||
[ng_mng] + [ng_nn] * 2)
|
||||
self.assertRaises(g_ex.NotSingleNameNodeException, plugin.validate, cl)
|
||||
|
||||
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
|
||||
[ng_mng] + [ng_nn] + [ng_tt])
|
||||
self.assertRaises(g_ex.TaskTrackersWithoutJobTracker,
|
||||
plugin.validate, cl)
|
||||
|
||||
cl = tu.create_cluster('cl1', 't1', 'intel', '2.5.0',
|
||||
[ng_mng] + [ng_nn] + [ng_jt] * 2 + [ng_tt])
|
||||
self.assertRaises(g_ex.NotSingleJobTrackerException,
|
||||
plugin.validate, cl)
|
32
savanna/tests/unit/plugins/intel/test_utils.py
Normal file
32
savanna/tests/unit/plugins/intel/test_utils.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from savanna.conductor import resource as r
|
||||
|
||||
|
||||
def create_cluster(name, tenant, plugin, version, node_groups, **kwargs):
|
||||
dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin,
|
||||
'hadoop_version': version, 'node_groups': node_groups}
|
||||
dct.update(kwargs)
|
||||
return r.ClusterResource(dct)
|
||||
|
||||
|
||||
def make_ng_dict(name, flavor, processes, count, instances=[]):
|
||||
return {'name': name, 'flavor_id': flavor, 'node_processes': processes,
|
||||
'count': count, 'instances': instances}
|
||||
|
||||
|
||||
def make_inst_dict(inst_id, inst_name):
|
||||
return {'instance_id': inst_id, 'instance_name': inst_name}
|
@ -0,0 +1,43 @@
|
||||
<?xml version="1.0"?>
|
||||
<configuration>
|
||||
<!--Common hadoop property-->
|
||||
<property>
|
||||
<name>name1</name>
|
||||
<value>value1</value>
|
||||
<valuetype>String</valuetype>
|
||||
<description>
|
||||
<en>descr1</en>
|
||||
</description>
|
||||
</property>
|
||||
<!--Common hadoop property without 'valuetype' tag-->
|
||||
<property>
|
||||
<name>name2</name>
|
||||
<value>value2</value>
|
||||
<description>
|
||||
<en>descr2</en>
|
||||
</description>
|
||||
</property>
|
||||
<!--Common hadoop property 3 without text in 'value' tag-->
|
||||
<property>
|
||||
<name>name3</name>
|
||||
<value></value>
|
||||
<valuetype>String</valuetype>
|
||||
<description>
|
||||
<en>descr3</en>
|
||||
</description>
|
||||
</property>
|
||||
<!--Common hadoop property 4 without 'value' tag-->
|
||||
<property>
|
||||
<name>name4</name>
|
||||
<valuetype>String</valuetype>
|
||||
<description>
|
||||
<en>descr4</en>
|
||||
</description>
|
||||
</property>
|
||||
<!--Common hadoop property 5 without description-->
|
||||
<property>
|
||||
<name>name5</name>
|
||||
<value>value5</value>
|
||||
<valuetype>String</valuetype>
|
||||
</property>
|
||||
</configuration>
|
@ -36,6 +36,24 @@ class XMLUtilsTestCase(unittest2.TestCase):
|
||||
x.load_hadoop_xml_defaults(
|
||||
'tests/unit/resources/test-default.xml'))
|
||||
|
||||
def test_load_xml_defaults_with_type_and_locale(self):
|
||||
expected = [
|
||||
{'name': u'name1', 'value': u'value1', 'type': u'String',
|
||||
'description': 'descr1'},
|
||||
{'name': u'name2', 'value': u'value2', 'type': u'',
|
||||
'description': 'descr2'},
|
||||
{'name': u'name3', 'value': '', 'type': u'String',
|
||||
'description': 'descr3'},
|
||||
{'name': u'name4', 'value': '', 'type': u'String',
|
||||
'description': 'descr4'},
|
||||
{'name': u'name5', 'value': u'value5', 'type': u'String',
|
||||
'description': ''}]
|
||||
actual = x.load_hadoop_xml_defaults_with_type_and_locale(
|
||||
'tests/unit/resources/test-default-with-type-and-locale.xml')
|
||||
self.assertListEqual(
|
||||
expected,
|
||||
actual)
|
||||
|
||||
def test_adjust_description(self):
|
||||
self.assertEqual(x._adjust_field("\n"), "")
|
||||
self.assertEqual(x._adjust_field("\n "), "")
|
||||
|
@ -36,6 +36,27 @@ def load_hadoop_xml_defaults(file_name):
|
||||
return configs
|
||||
|
||||
|
||||
def load_hadoop_xml_defaults_with_type_and_locale(file_name):
|
||||
doc = load_xml_document(file_name)
|
||||
configs = []
|
||||
prop = doc.getElementsByTagName('property')
|
||||
for elements in prop:
|
||||
configs.append({
|
||||
'name': _get_text_from_node(elements, 'name'),
|
||||
'value': _get_text_from_node(elements, 'value'),
|
||||
'type': _get_text_from_node(elements, 'valuetype'),
|
||||
'description': _adjust_field(
|
||||
_get_text_from_node(
|
||||
_get_node_element(elements, 'description'), 'en'))
|
||||
})
|
||||
return configs
|
||||
|
||||
|
||||
def _get_node_element(element, name):
|
||||
element = element.getElementsByTagName(name)
|
||||
return element[0] if element and element[0].hasChildNodes() else None
|
||||
|
||||
|
||||
def create_hadoop_xml(configs, config_filter=None):
|
||||
doc = xml.Document()
|
||||
|
||||
@ -68,7 +89,7 @@ def load_xml_document(file_name):
|
||||
|
||||
|
||||
def _get_text_from_node(element, name):
|
||||
element = element.getElementsByTagName(name)
|
||||
element = element.getElementsByTagName(name) if element else None
|
||||
return element[0].firstChild.nodeValue if (
|
||||
element and element[0].hasChildNodes()) else ''
|
||||
|
||||
|
@ -36,6 +36,7 @@ console_scripts =
|
||||
savanna.cluster.plugins =
|
||||
vanilla = savanna.plugins.vanilla.plugin:VanillaProvider
|
||||
hdp = savanna.plugins.hdp.ambariplugin:AmbariPlugin
|
||||
idh = savanna.plugins.intel.plugin:IDHProvider
|
||||
|
||||
savanna.infrastructure.engine =
|
||||
savanna = savanna.service.instances:SavannaInfrastructureEngine
|
||||
|
Loading…
Reference in New Issue
Block a user