Add base services support for HDP 2.2 / 2.3

This patch added support of base Hadoop services:

HDFS:
* NameNode
* DataNode
* SecondaryNameNode

YARN:
* ResourceManager
* NodeManager
* MapReduce History Server
* YARN Timeline Server

ZooKeeper:
* ZooKeeper Server

Supported features:
* Cinder
* Configs provisioning
* Swift integration

Also added scenario tests for plugin.

partially implements bp: hdp-22-support

Change-Id: I2e1188735132881cef1026624cca580ccc7bb2ee
This commit is contained in:
Sergey Reshetnyak 2015-05-22 15:02:53 +03:00
parent 3869ed88a0
commit 8d0eba8d0f
14 changed files with 3168 additions and 7 deletions

View File

@ -14,6 +14,7 @@ recursive-include sahara/locale *
recursive-include sahara/db/migration/alembic_migrations * recursive-include sahara/db/migration/alembic_migrations *
recursive-include sahara/plugins/default_templates *.json recursive-include sahara/plugins/default_templates *.json
include sahara/plugins/ambari/resources/*.json
include sahara/plugins/default_templates/template.conf include sahara/plugins/default_templates/template.conf
include sahara/plugins/cdh/v5/resources/cdh_config.py include sahara/plugins/cdh/v5/resources/cdh_config.py
include sahara/plugins/cdh/v5/resources/*.sh include sahara/plugins/cdh/v5/resources/*.sh

View File

@ -0,0 +1,36 @@
clusters:
- plugin_name: ambari
plugin_version: '2.3'
image: ${ambari_2_1_image}
node_group_templates:
- name: master
flavor: ${medium_flavor_id}
node_processes:
- Ambari
- MapReduce History Server
- NameNode
- ResourceManager
- SecondaryNameNode
- YARN Timeline Server
- ZooKeeper
auto_security_group: true
- name: worker
flavor: ${ci_flavor_id}
node_processes:
- DataNode
- NodeManager
volumes_per_node: 2
volumes_size: 2
auto_security_group: true
cluster_template:
name: ambari21
node_group_templates:
master: 1
worker: 3
cluster_configs:
HDFS:
dfs.datanode.du.reserved: 0
cluster:
name: ${cluster_name}
scenario:
- cinder

View File

@ -70,6 +70,12 @@ class AmbariClient(object):
data = self.check_response(resp) data = self.check_response(resp)
return data.get("items", []) return data.get("items", [])
def get_host_info(self, host):
url = self._base_url + "/hosts/%s" % host
resp = self.get(url)
data = self.check_response(resp)
return data.get("Hosts", {})
def update_user_password(self, user, old_password, new_password): def update_user_password(self, user, old_password, new_password):
url = self._base_url + "/users/%s" % user url = self._base_url + "/users/%s" % user
data = jsonutils.dumps({ data = jsonutils.dumps({
@ -80,3 +86,32 @@ class AmbariClient(object):
}) })
resp = self.put(url, data=data) resp = self.put(url, data=data)
self.check_response(resp) self.check_response(resp)
def create_blueprint(self, name, data):
url = self._base_url + "/blueprints/%s" % name
resp = self.post(url, data=jsonutils.dumps(data))
self.check_response(resp)
def create_cluster(self, name, data):
url = self._base_url + "/clusters/%s" % name
resp = self.post(url, data=jsonutils.dumps(data))
return self.check_response(resp).get("Requests")
def check_request_status(self, cluster_name, req_id):
url = self._base_url + "/clusters/%s/requests/%d" % (cluster_name,
req_id)
resp = self.get(url)
return self.check_response(resp).get("Requests")
def set_up_mirror(self, stack_version, os_type, repo_id, repo_url):
url = self._base_url + (
"/stacks/HDP/versions/%s/operating_systems/%s/repositories/%s") % (
stack_version, os_type, repo_id)
data = {
"Repositories": {
"base_url": repo_url,
"verify_base_url": True
}
}
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)

View File

@ -16,4 +16,66 @@
# define service names # define service names
AMBARI_SERVICE = "Ambari"
HDFS_SERVICE = "HDFS"
RANGER_SERVICE = "Ranger"
YARN_SERVICE = "YARN"
ZOOKEEPER_SERVICE = "ZooKeeper"
# define process names
AMBARI_SERVER = "Ambari" AMBARI_SERVER = "Ambari"
APP_TIMELINE_SERVER = "YARN Timeline Server"
DATANODE = "DataNode"
HISTORYSERVER = "MapReduce History Server"
NAMENODE = "NameNode"
NODEMANAGER = "NodeManager"
RESOURCEMANAGER = "ResourceManager"
SECONDARY_NAMENODE = "SecondaryNameNode"
ZOOKEEPER_SERVER = "ZooKeeper"
PROC_MAP = {
AMBARI_SERVER: ["METRICS_COLLECTOR"],
APP_TIMELINE_SERVER: ["APP_TIMELINE_SERVER"],
DATANODE: ["DATANODE"],
HISTORYSERVER: ["HISTORYSERVER"],
NAMENODE: ["NAMENODE"],
NODEMANAGER: ["NODEMANAGER"],
RESOURCEMANAGER: ["RESOURCEMANAGER"],
SECONDARY_NAMENODE: ["SECONDARY_NAMENODE"],
ZOOKEEPER_SERVER: ["ZOOKEEPER_SERVER"]
}
CLIENT_MAP = {
APP_TIMELINE_SERVER: ["MAPREDUCE2_CLIENT", "YARN_CLIENT"],
DATANODE: ["HDFS_CLIENT"],
HISTORYSERVER: ["MAPREDUCE2_CLIENT", "YARN_CLIENT"],
NAMENODE: ["HDFS_CLIENT"],
NODEMANAGER: ["MAPREDUCE2_CLIENT", "YARN_CLIENT"],
RESOURCEMANAGER: ["MAPREDUCE2_CLIENT", "YARN_CLIENT"],
SECONDARY_NAMENODE: ["HDFS_CLIENT"],
ZOOKEEPER_SERVER: ["ZOOKEEPER_CLIENT"]
}
ALL_LIST = ["METRICS_MONITOR"]
def get_ambari_proc_list(node_group):
procs = []
for sp in node_group.node_processes:
procs.extend(PROC_MAP.get(sp, []))
return procs
def get_clients(cluster):
procs = []
for ng in cluster.node_groups:
procs.extend(ng.node_processes)
clients = []
for proc in procs:
clients.extend(CLIENT_MAP.get(proc, []))
clients = list(set(clients))
clients.extend(ALL_LIST)
return clients

View File

@ -0,0 +1,171 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
import six
from sahara.plugins.ambari import common
from sahara.plugins import provisioning
from sahara.swift import swift_helper
from sahara.utils import files
configs = {}
obj_configs = {}
cfg_process_map = {
"ams-env": common.AMBARI_SERVICE,
"ams-hbase-env": common.AMBARI_SERVICE,
"ams-hbase-policy": common.AMBARI_SERVICE,
"ams-hbase-security-site": common.AMBARI_SERVICE,
"ams-hbase-site": common.AMBARI_SERVICE,
"ams-site": common.AMBARI_SERVICE,
"capacity-scheduler": common.YARN_SERVICE,
"cluster-env": "general",
"core-site": common.HDFS_SERVICE,
"hadoop-env": common.HDFS_SERVICE,
"hadoop-policy": common.HDFS_SERVICE,
"hdfs-site": common.HDFS_SERVICE,
"mapred-env": common.YARN_SERVICE,
"mapred-site": common.YARN_SERVICE,
"ranger-hdfs-plugin-properties": common.RANGER_SERVICE,
"yarn-env": common.YARN_SERVICE,
"yarn-site": common.YARN_SERVICE,
"zoo.cfg": common.ZOOKEEPER_SERVICE,
"zookeeper-env": common.ZOOKEEPER_SERVICE
}
ng_confs = [
"dfs.datanode.data.dir",
"dtnode_heapsize",
"mapreduce.map.java.opts",
"mapreduce.map.memory.mb",
"mapreduce.reduce.java.opts",
"mapreduce.reduce.memory.mb",
"mapreduce.task.io.sort.mb",
"nodemanager_heapsize",
"yarn.app.mapreduce.am.command-opts",
"yarn.app.mapreduce.am.resource.mb",
"yarn.nodemanager.resource.cpu-vcores",
"yarn.nodemanager.resource.memory-mb",
"yarn.scheduler.maximum-allocation-mb",
"yarn.scheduler.minimum-allocation-mb"
]
hdp_repo_cfg = provisioning.Config(
"HDP repo URL", "general", "cluster", priority=1, default_value="")
hdp_utils_repo_cfg = provisioning.Config(
"HDP-UTILS repo URL", "general", "cluster", priority=1, default_value="")
def _get_service_name(service):
return cfg_process_map.get(service, service)
def _get_config_group(group, param, plugin_version):
for section, process in six.iteritems(cfg_process_map):
if process == group and param in configs[plugin_version][section]:
return section
def _get_param_scope(param):
if param in ng_confs:
return "node"
else:
return "cluster"
def load_configs(version):
if obj_configs.get(version):
return obj_configs[version]
cfg_path = "plugins/ambari/resources/configs-%s.json" % version
vanilla_cfg = jsonutils.loads(files.get_file_text(cfg_path))
configs[version] = vanilla_cfg
sahara_cfg = [hdp_repo_cfg, hdp_utils_repo_cfg]
for service, confs in vanilla_cfg.items():
for k, v in confs.items():
sahara_cfg.append(provisioning.Config(
k, _get_service_name(service), _get_param_scope(k),
default_value=v))
obj_configs[version] = sahara_cfg
return sahara_cfg
def _get_config_value(cluster, key):
return cluster.cluster_configs.get("general", {}).get(key.name,
key.default_value)
def get_hdp_repo_url(cluster):
return _get_config_value(cluster, hdp_repo_cfg)
def get_hdp_utils_repo_url(cluster):
return _get_config_value(cluster, hdp_utils_repo_cfg)
def _serialize_ambari_configs(configs):
return list(map(lambda x: {x: configs[x]}, configs))
def _create_ambari_configs(sahara_configs, plugin_version):
configs = {}
for service, params in six.iteritems(sahara_configs):
for k, v in six.iteritems(params):
group = _get_config_group(service, k, plugin_version)
configs.setdefault(group, {})
configs[group].update({k: v})
return configs
def _make_paths(dirs, suffix):
return ",".join([d + suffix for d in dirs])
def get_ng_params(node_group):
configs = _create_ambari_configs(node_group.node_configs,
node_group.cluster.hadoop_version)
storage_paths = node_group.storage_paths()
configs.setdefault("hdfs-site", {})
configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths(
storage_paths, "/hdfs/data")
configs["hdfs-site"]["dfs.journalnode.edits.dir"] = _make_paths(
storage_paths, "/hdfs/journalnode")
configs["hdfs-site"]["dfs.namenode.checkpoint.dir"] = _make_paths(
storage_paths, "/hdfs/namesecondary")
configs["hdfs-site"]["dfs.namenode.name.dir"] = _make_paths(
storage_paths, "/hdfs/namenode")
configs.setdefault("yarn-site", {})
configs["yarn-site"]["yarn.nodemanager.local-dirs"] = _make_paths(
storage_paths, "/yarn/local")
configs["yarn-site"]["yarn.nodemanager.log-dirs"] = _make_paths(
storage_paths, "/yarn/log")
configs["yarn-site"][
"yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths(
storage_paths, "/yarn/timeline")
return _serialize_ambari_configs(configs)
def get_cluster_params(cluster):
configs = _create_ambari_configs(cluster.cluster_configs,
cluster.hadoop_version)
swift_configs = {x["name"]: x["value"]
for x in swift_helper.get_swift_configs()}
configs.setdefault("core-site", {})
configs["core-site"].update(swift_configs)
return _serialize_ambari_configs(configs)

View File

@ -25,6 +25,7 @@ from sahara import context
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins.ambari import client as ambari_client from sahara.plugins.ambari import client as ambari_client
from sahara.plugins.ambari import common as p_common from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import configs
from sahara.plugins import exceptions as p_exc from sahara.plugins import exceptions as p_exc
from sahara.plugins import utils as plugin_utils from sahara.plugins import utils as plugin_utils
from sahara.utils import poll_utils from sahara.utils import poll_utils
@ -34,6 +35,23 @@ LOG = logging.getLogger(__name__)
conductor = conductor.API conductor = conductor.API
repo_id_map = {
"2.2": {
"HDP": "HDP-2.2",
"HDP-UTILS": "HDP-UTILS-1.1.0.20"
},
"2.3": {
"HDP": "HDP-2.3",
"HDP-UTILS": "HDP-UTILS-1.1.0.20"
}
}
os_type_map = {
"centos6": "redhat6",
"redhat6": "redhat6"
}
def setup_ambari(cluster): def setup_ambari(cluster):
LOG.debug("Set up Ambari management console") LOG.debug("Set up Ambari management console")
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
@ -111,3 +129,77 @@ def wait_host_registration(cluster):
def _check_host_registration(client, num_hosts): def _check_host_registration(client, num_hosts):
hosts = client.get_registered_hosts() hosts = client.get_registered_hosts()
return len(hosts) == num_hosts return len(hosts) == num_hosts
def set_up_hdp_repos(cluster):
hdp_repo = configs.get_hdp_repo_url(cluster)
hdp_utils_repo = configs.get_hdp_utils_repo_url(cluster)
if not hdp_repo and not hdp_utils_repo:
return
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
pv = cluster.hadoop_version
repos = repo_id_map[pv]
with ambari_client.AmbariClient(ambari, password=password) as client:
os_type = os_type_map[client.get_host_info(ambari.fqdn())["os_type"]]
if hdp_repo:
client.set_up_mirror(pv, os_type, repos["HDP"], hdp_repo)
if hdp_utils_repo:
client.set_up_mirror(pv, os_type, repos["HDP-UTILS"],
hdp_utils_repo)
def create_blueprint(cluster):
host_groups = []
for ng in cluster.node_groups:
hg = {
"name": ng.name,
"configurations": configs.get_ng_params(ng),
"components": []
}
procs = p_common.get_ambari_proc_list(ng)
procs.extend(p_common.get_clients(cluster))
for proc in procs:
hg["components"].append({"name": proc})
host_groups.append(hg)
bp = {
"Blueprints": {
"stack_name": "HDP",
"stack_version": cluster.hadoop_version
},
"host_groups": host_groups,
"configurations": configs.get_cluster_params(cluster)
}
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
client.create_blueprint(cluster.name, bp)
def start_cluster(cluster):
cl_tmpl = {
"blueprint": cluster.name,
"default_password": uuidutils.generate_uuid(),
"host_groups": []
}
for ng in cluster.node_groups:
cl_tmpl["host_groups"].append({
"name": ng.name,
"hosts": map(lambda x: {"fqdn": x.fqdn()}, ng.instances)
})
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
req_id = client.create_cluster(cluster.name, cl_tmpl)["id"]
while True:
status = client.check_request_status(cluster.name, req_id)
LOG.debug("Task %s in %s state. Completed %.1f%%" % (
status["request_context"], status["request_status"],
status["progress_percent"]))
if status["request_status"] == "COMPLETED":
return
if status["request_status"] in ["IN_PROGRESS", "PENDING"]:
context.sleep(5)
else:
raise p_exc.HadoopProvisionError(
_("Ambari request in %s state") % status["request_status"])

View File

@ -18,6 +18,7 @@ from sahara import conductor
from sahara import context from sahara import context
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins.ambari import common as p_common from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import configs
from sahara.plugins.ambari import deploy from sahara.plugins.ambari import deploy
from sahara.plugins.ambari import validation from sahara.plugins.ambari import validation
from sahara.plugins import provisioning as p from sahara.plugins import provisioning as p
@ -40,11 +41,17 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
def get_node_processes(self, hadoop_version): def get_node_processes(self, hadoop_version):
return { return {
"Ambari": [p_common.AMBARI_SERVER] p_common.AMBARI_SERVICE: [p_common.AMBARI_SERVER],
p_common.HDFS_SERVICE: [p_common.DATANODE, p_common.NAMENODE,
p_common.SECONDARY_NAMENODE],
p_common.YARN_SERVICE: [
p_common.APP_TIMELINE_SERVER, p_common.HISTORYSERVER,
p_common.NODEMANAGER, p_common.RESOURCEMANAGER],
p_common.ZOOKEEPER_SERVICE: [p_common.ZOOKEEPER_SERVER]
} }
def get_configs(self, hadoop_version): def get_configs(self, hadoop_version):
return [] return configs.load_configs(hadoop_version)
def configure_cluster(self, cluster): def configure_cluster(self, cluster):
deploy.setup_ambari(cluster) deploy.setup_ambari(cluster)
@ -53,9 +60,12 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
deploy.update_default_ambari_password(cluster) deploy.update_default_ambari_password(cluster)
cluster = conductor.cluster_get(context.ctx(), cluster.id) cluster = conductor.cluster_get(context.ctx(), cluster.id)
deploy.wait_host_registration(cluster) deploy.wait_host_registration(cluster)
deploy.set_up_hdp_repos(cluster)
deploy.create_blueprint(cluster)
def start_cluster(self, cluster): def start_cluster(self, cluster):
self._set_cluster_info(cluster) self._set_cluster_info(cluster)
deploy.start_cluster(cluster)
def _set_cluster_info(self, cluster): def _set_cluster_info(self, cluster):
ambari_ip = plugin_utils.get_instance( ambari_ip = plugin_utils.get_instance(
@ -69,6 +79,29 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
"Password": cluster.extra["ambari_password"] "Password": cluster.extra["ambari_password"]
} }
} }
namenode = plugin_utils.get_instance(cluster, p_common.NAMENODE)
if namenode:
info[p_common.NAMENODE] = {
"Web UI": "http://%s:50070" % namenode.management_ip
}
resourcemanager = plugin_utils.get_instance(cluster,
p_common.RESOURCEMANAGER)
if resourcemanager:
info[p_common.RESOURCEMANAGER] = {
"Web UI": "http://%s:8088" % resourcemanager.management_ip
}
historyserver = plugin_utils.get_instance(cluster,
p_common.HISTORYSERVER)
if historyserver:
info[p_common.HISTORYSERVER] = {
"Web UI": "http://%s:19888" % historyserver.management_ip
}
atlserver = plugin_utils.get_instance(cluster,
p_common.APP_TIMELINE_SERVER)
if atlserver:
info[p_common.APP_TIMELINE_SERVER] = {
"Web UI": "http://%s:8188" % atlserver.management_ip
}
info.update(cluster.info.to_dict()) info.update(cluster.info.to_dict())
ctx = context.ctx() ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {"info": info}) conductor.cluster_update(ctx, cluster, {"info": info})
@ -97,7 +130,14 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
ports_map = { ports_map = {
p_common.AMBARI_SERVER: [8080] p_common.AMBARI_SERVER: [8080],
p_common.APP_TIMELINE_SERVER: [8188, 8190, 10200],
p_common.DATANODE: [50075, 50475],
p_common.HISTORYSERVER: [10020, 19888],
p_common.NAMENODE: [8020, 9000, 50070, 50470],
p_common.NODEMANAGER: [8042, 8044, 45454],
p_common.RESOURCEMANAGER: [8025, 8030, 8050, 8088, 8141],
p_common.SECONDARY_NAMENODE: [50090]
} }
ports = [] ports = []
for service in node_group.node_processes: for service in node_group.node_processes:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import sys
from oslo_serialization import jsonutils
import requests
def get_blueprint(ambari_address, username, password, cluster_name):
url = "http://%s:8080/api/v1/clusters/%s?format=blueprint" % (
ambari_address, cluster_name)
resp = requests.get(url, auth=(username, password))
resp.raise_for_status()
if resp.text:
return jsonutils.loads(resp.text)
def generate_config(blueprint):
configs = {}
for entity in blueprint["configurations"]:
for cfg in entity:
p = entity[cfg]["properties"]
if not p:
continue
if "content" in p:
del p["content"]
for k, v in p.items():
p[k] = " ".join(v.split())
if p:
configs[cfg] = p
return configs
def write_config(cfg, version):
with open("sahara/plugins/ambari/resources/configs-%s.json" % version,
"w") as fp:
jsonutils.dump(cfg, fp, indent=4, sort_keys=True,
separators=(",", ": "))
def main():
parser = argparse.ArgumentParser(
description="Ambari sample config generator")
parser.add_argument("--address", help="Ambari address",
default="localhost")
parser.add_argument("--username", help="Ambari username",
default="admin")
parser.add_argument("--password", help="Ambari password",
default="admin")
parser.add_argument("--cluster-name", help="Name of cluster",
default="cluster")
ns = parser.parse_args(sys.argv[1:])
bp = get_blueprint(ns.address,
ns.username,
ns.password,
ns.cluster_name)
cfg = generate_config(bp)
write_config(cfg, bp["Blueprints"]["stack_version"])
if __name__ == "__main__":
main()

View File

@ -16,6 +16,7 @@
from sahara import conductor from sahara import conductor
from sahara import context from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import common from sahara.plugins.ambari import common
from sahara.plugins import exceptions as ex from sahara.plugins import exceptions as ex
from sahara.plugins import utils from sahara.plugins import utils
@ -28,9 +29,45 @@ def validate_creation(cluster_id):
ctx = context.ctx() ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id) cluster = conductor.cluster_get(ctx, cluster_id)
_check_ambari(cluster) _check_ambari(cluster)
_check_hdfs(cluster)
_check_yarn(cluster)
def _check_ambari(cluster): def _check_ambari(cluster):
count = utils.get_instances_count(cluster, common.AMBARI_SERVER) am_count = utils.get_instances_count(cluster, common.AMBARI_SERVER)
if count != 1: zk_count = utils.get_instances_count(cluster, common.ZOOKEEPER_SERVER)
raise ex.InvalidComponentCountException(common.AMBARI_SERVER, 1, count) if am_count != 1:
raise ex.InvalidComponentCountException(common.AMBARI_SERVER, 1,
am_count)
if zk_count == 0:
raise ex.InvalidComponentCountException(common.ZOOKEEPER_SERVER,
_("1 or more"), zk_count)
def _check_hdfs(cluster):
nn_count = utils.get_instances_count(cluster, common.NAMENODE)
dn_count = utils.get_instances_count(cluster, common.DATANODE)
if nn_count != 1:
raise ex.InvalidComponentCountException(common.NAMENODE, 1, nn_count)
if dn_count == 0:
raise ex.InvalidComponentCountException(
common.DATANODE, _("1 or more"), dn_count)
def _check_yarn(cluster):
rm_count = utils.get_instances_count(cluster, common.RESOURCEMANAGER)
nm_count = utils.get_instances_count(cluster, common.NODEMANAGER)
hs_count = utils.get_instances_count(cluster, common.HISTORYSERVER)
at_count = utils.get_instances_count(cluster, common.APP_TIMELINE_SERVER)
if rm_count != 1:
raise ex.InvalidComponentCountException(common.RESOURCEMANAGER, 1,
rm_count)
if hs_count != 1:
raise ex.InvalidComponentCountException(common.HISTORYSERVER, 1,
hs_count)
if at_count != 1:
raise ex.InvalidComponentCountException(common.APP_TIMELINE_SERVER, 1,
at_count)
if nm_count == 0:
raise ex.InvalidComponentCountException(common.NODEMANAGER,
_("1 or more"), nm_count)

View File

@ -143,3 +143,33 @@ class AmbariClientTestCase(base.SaharaTestCase):
self.http_client.put.assert_called_with( self.http_client.put.assert_called_with(
"http://1.2.3.4:8080/api/v1/users/bart", data=exp_req, "http://1.2.3.4:8080/api/v1/users/bart", data=exp_req,
verify=False, auth=client._auth, headers=self.headers) verify=False, auth=client._auth, headers=self.headers)
def test_create_blueprint(self):
client = ambari_client.AmbariClient(self.instance)
resp = mock.Mock()
resp.text = ""
resp.status_code = 200
self.http_client.post.return_value = resp
client.create_blueprint("cluster_name", {"some": "data"})
self.http_client.post.assert_called_with(
"http://1.2.3.4:8080/api/v1/blueprints/cluster_name",
data=jsonutils.dumps({"some": "data"}), verify=False,
auth=client._auth, headers=self.headers)
def test_create_cluster(self):
client = ambari_client.AmbariClient(self.instance)
resp = mock.Mock()
resp.text = """{
"Requests": {
"id": 1,
"status": "InProgress"
}
}"""
resp.status_code = 200
self.http_client.post.return_value = resp
req_info = client.create_cluster("cluster_name", {"some": "data"})
self.assertEqual(1, req_info["id"])
self.http_client.post.assert_called_with(
"http://1.2.3.4:8080/api/v1/clusters/cluster_name",
data=jsonutils.dumps({"some": "data"}), verify=False,
auth=client._auth, headers=self.headers)

View File

@ -0,0 +1,114 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import mock
from sahara.plugins.ambari import configs
from sahara.tests.unit import base
class AmbariConfigsTestCase(base.SaharaTestCase):
def setUp(self):
super(AmbariConfigsTestCase, self).setUp()
configs.load_configs("2.2")
self.ng = mock.Mock()
self.ng.node_configs = {}
self.ng.cluster = mock.Mock()
self.ng.cluster.hadoop_version = "2.2"
self.ng.storage_paths = mock.Mock()
self.ng.storage_paths.return_value = ["/data1", "/data2"]
def assertConfigEqual(self, expected, actual):
self.assertEqual(len(expected), len(actual))
cnt_ex = collections.Counter()
cnt_act = collections.Counter()
for i, ex in enumerate(expected):
for j, act in enumerate(actual):
if ex == act:
cnt_ex[i] += 1
cnt_act[j] += 1
self.assertEqual(len(expected), len(cnt_ex))
self.assertEqual(len(actual), len(cnt_act))
def test_get_ng_params_default(self):
ng_configs = configs.get_ng_params(self.ng)
expected = [
{
"hdfs-site": {
"dfs.datanode.data.dir":
"/data1/hdfs/data,/data2/hdfs/data",
"dfs.journalnode.edits.dir":
"/data1/hdfs/journalnode,/data2/hdfs/journalnode",
"dfs.namenode.checkpoint.dir":
"/data1/hdfs/namesecondary,/data2/hdfs/namesecondary",
"dfs.namenode.name.dir":
"/data1/hdfs/namenode,/data2/hdfs/namenode"
}
},
{
"yarn-site": {
"yarn.nodemanager.local-dirs":
"/data1/yarn/local,/data2/yarn/local",
"yarn.nodemanager.log-dirs":
"/data1/yarn/log,/data2/yarn/log",
"yarn.timeline-service.leveldb-timeline-store.path":
"/data1/yarn/timeline,/data2/yarn/timeline"
}
}
]
self.assertConfigEqual(expected, ng_configs)
def test_get_ng_params(self):
self.ng.node_configs = {
"YARN": {
"mapreduce.map.java.opts": "-Dk=v",
"yarn.scheduler.minimum-allocation-mb": "256"
}
}
ng_configs = configs.get_ng_params(self.ng)
expected = [
{
"hdfs-site": {
"dfs.datanode.data.dir":
"/data1/hdfs/data,/data2/hdfs/data",
"dfs.journalnode.edits.dir":
"/data1/hdfs/journalnode,/data2/hdfs/journalnode",
"dfs.namenode.checkpoint.dir":
"/data1/hdfs/namesecondary,/data2/hdfs/namesecondary",
"dfs.namenode.name.dir":
"/data1/hdfs/namenode,/data2/hdfs/namenode"
}
},
{
"mapred-site": {
"mapreduce.map.java.opts": "-Dk=v"
}
},
{
"yarn-site": {
"yarn.nodemanager.local-dirs":
"/data1/yarn/local,/data2/yarn/local",
"yarn.nodemanager.log-dirs":
"/data1/yarn/log,/data2/yarn/log",
"yarn.scheduler.minimum-allocation-mb": "256",
"yarn.timeline-service.leveldb-timeline-store.path":
"/data1/yarn/timeline,/data2/yarn/timeline"
}
}
]
self.assertConfigEqual(expected, ng_configs)

View File

@ -40,7 +40,14 @@ class AmbariValidationTestCase(base.SaharaTestCase):
self.plugin = plugin.AmbariPluginProvider() self.plugin = plugin.AmbariPluginProvider()
def test_cluster_with_ambari(self): def test_cluster_with_ambari(self):
cluster = make_cluster({1: [p_common.AMBARI_SERVER]}) cluster = make_cluster({1: [p_common.AMBARI_SERVER,
p_common.ZOOKEEPER_SERVER,
p_common.NAMENODE,
p_common.DATANODE,
p_common.RESOURCEMANAGER,
p_common.NODEMANAGER,
p_common.HISTORYSERVER,
p_common.APP_TIMELINE_SERVER]})
with mock.patch("sahara.plugins.ambari.validation.conductor") as p: with mock.patch("sahara.plugins.ambari.validation.conductor") as p:
p.cluster_get = mock.Mock() p.cluster_get = mock.Mock()
p.cluster_get.return_value = cluster p.cluster_get.return_value = cluster