Merge "Added scaling support for HDP 2.2 / 2.3"

This commit is contained in:
Jenkins 2016-02-25 16:37:05 +00:00 committed by Gerrit Code Review
commit e1d81ff3ec
9 changed files with 634 additions and 38 deletions

View File

@ -14,9 +14,18 @@
# limitations under the License.
from oslo_log import log as logging
from oslo_serialization import jsonutils
from requests import auth
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import decomission_helper as d_helper
from sahara.plugins import exceptions as p_exc
LOG = logging.getLogger(__name__)
class AmbariClient(object):
def __init__(self, instance, port="8080", **kwargs):
@ -64,6 +73,17 @@ class AmbariClient(object):
if resp.text:
return jsonutils.loads(resp.text)
@staticmethod
def req_id(response):
if not response.text:
raise p_exc.HadoopProvisionError("Cannot find request id. "
"No response body")
body = jsonutils.loads(response.text)
if "Requests" not in body or "id" not in body["Requests"]:
raise p_exc.HadoopProvisionError("Cannot find request id. "
"Unexpected response format")
return body["Requests"]["id"]
def get_registered_hosts(self):
url = self._base_url + "/hosts"
resp = self.get(url)
@ -90,19 +110,106 @@ class AmbariClient(object):
def create_blueprint(self, name, data):
url = self._base_url + "/blueprints/%s" % name
resp = self.post(url, data=jsonutils.dumps(data))
self.check_response(resp)
return self.check_response(resp)
def create_cluster(self, name, data):
url = self._base_url + "/clusters/%s" % name
resp = self.post(url, data=jsonutils.dumps(data))
return self.check_response(resp).get("Requests")
def add_host_to_cluster(self, instance):
cluster_name = instance.cluster.name
hostname = instance.fqdn()
url = self._base_url + "/clusters/{cluster}/hosts/{hostname}".format(
cluster=cluster_name, hostname=hostname)
resp = self.post(url)
self.check_response(resp)
def create_config_group(self, cluster, data):
url = self._base_url + "/clusters/%s/config_groups" % cluster.name
resp = self.post(url, data=jsonutils.dumps(data))
return self.check_response(resp)
def add_service_to_host(self, inst, service):
url = "{pref}/clusters/{cluster}/hosts/{host}/host_components/{proc}"
url = url.format(pref=self._base_url, cluster=inst.cluster.name,
host=inst.fqdn(), proc=service)
self.check_response(self.post(url))
def start_service_on_host(self, inst, service, final_state):
url = "{pref}/clusters/{cluster}/hosts/{host}/host_components/{proc}"
url = url.format(
pref=self._base_url, cluster=inst.cluster.name, host=inst.fqdn(),
proc=service)
data = {
'HostRoles': {
'state': final_state
},
'RequestInfo': {
'context': "Starting service {service}, moving to state "
"{state}".format(service=service, state=final_state)
}
}
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
# return req_id to check health of request
return self.req_id(resp)
def decommission_nodemanagers(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_nodemanager_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def decommission_datanodes(self, cluster_name, instances):
url = self._base_url + "/clusters/%s/requests" % cluster_name
data = d_helper.build_datanode_decommission_request(cluster_name,
instances)
resp = self.post(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def remove_process_from_host(self, cluster_name, instance, process):
url = self._base_url + "/clusters/%s/hosts/%s/host_components/%s" % (
cluster_name, instance.fqdn(), process)
resp = self.delete(url)
return self.check_response(resp)
def stop_process_on_host(self, cluster_name, instance, process):
url = self._base_url + "/clusters/%s/hosts/%s/host_components/%s" % (
cluster_name, instance.fqdn(), process)
check_installed_resp = self.check_response(self.get(url))
if check_installed_resp["HostRoles"]["state"] != "INSTALLED":
data = {"HostRoles": {"state": "INSTALLED"},
"RequestInfo": {"context": "Stopping %s" % process}}
resp = self.put(url, data=jsonutils.dumps(data))
self.wait_ambari_request(self.req_id(resp), cluster_name)
def delete_host(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/hosts/%s" % (cluster_name,
instance.fqdn())
resp = self.delete(url)
return self.check_response(resp)
def check_request_status(self, cluster_name, req_id):
url = self._base_url + "/clusters/%s/requests/%d" % (cluster_name,
req_id)
resp = self.get(url)
return self.check_response(resp).get("Requests")
def list_host_processes(self, cluster_name, instance):
url = self._base_url + "/clusters/%s/hosts/%s" % (
cluster_name, instance.fqdn())
resp = self.get(url)
body = jsonutils.loads(resp.text)
procs = [p["HostRoles"]["component_name"]
for p in body["host_components"]]
return procs
def set_up_mirror(self, stack_version, os_type, repo_id, repo_url):
url = self._base_url + (
"/stacks/HDP/versions/%s/operating_systems/%s/repositories/%s") % (
@ -115,3 +222,42 @@ class AmbariClient(object):
}
resp = self.put(url, data=jsonutils.dumps(data))
self.check_response(resp)
def get_request_status(self, cluster_name, request_id):
url = self._base_url + ("/clusters/%s/requests/%s" %
(cluster_name, request_id))
resp = self.check_response(self.get(url))
return resp.get('Requests').get("request_status")
def wait_ambari_requests(self, requests, cluster_name):
requests = set(requests)
while len(requests) > 0:
completed, not_completed = set(), set()
for request in requests:
status = self.get_request_status(cluster_name, request)
if status == 'COMPLETED':
completed.add(request)
elif status in ['IN_PROGRESS', 'PENDING']:
not_completed.add(request)
else:
raise p_exc.HadoopProvisionError(
_("Some Ambari request(s) not in COMPLETED state"))
requests = not_completed
context.sleep(5)
LOG.debug("Waiting for ambari %d requests to be completed",
len(not_completed))
LOG.debug("Waiting for ambari requests completed")
def wait_ambari_request(self, request_id, cluster_name):
while True:
status = self.check_request_status(cluster_name, request_id)
LOG.debug("Task %s in %s state. Completed %.1f%%" % (
status["request_context"], status["request_status"],
status["progress_percent"]))
if status["request_status"] == "COMPLETED":
return
if status["request_status"] in ["IN_PROGRESS", "PENDING"]:
context.sleep(5)
else:
raise p_exc.HadoopProvisionError(
_("Ambari request in %s state") % status["request_status"])

View File

@ -133,3 +133,11 @@ def get_clients(cluster):
clients = list(set(clients))
clients.extend(ALL_LIST)
return clients
def instances_have_process(instances, process):
for i in instances:
if process in i.node_group.node_processes:
return True
return False

View File

@ -79,6 +79,22 @@ CFG_PROCESS_MAP = {
}
SERVICES_TO_CONFIGS_MAP = None
def get_service_to_configs_map():
global SERVICES_TO_CONFIGS_MAP
if SERVICES_TO_CONFIGS_MAP:
return SERVICES_TO_CONFIGS_MAP
data = {}
for (key, item) in six.iteritems(CFG_PROCESS_MAP):
if item not in data:
data[item] = []
data[item].append(key)
SERVICES_TO_CONFIGS_MAP = data
return SERVICES_TO_CONFIGS_MAP
ng_confs = [
"dfs.datanode.data.dir",
"dtnode_heapsize",
@ -176,7 +192,7 @@ def _make_paths(dirs, suffix):
return ",".join([d + suffix for d in dirs])
def get_instance_params(inst):
def get_instance_params_mapping(inst):
configs = _create_ambari_configs(inst.node_group.node_configs,
inst.node_group.cluster.hadoop_version)
storage_paths = inst.storage_paths()
@ -200,8 +216,11 @@ def get_instance_params(inst):
configs.setdefault("oozie-site", {})
configs["oozie-site"][
"oozie.service.AuthorizationService.security.enabled"] = "false"
return configs
return _serialize_ambari_configs(configs)
def get_instance_params(inst):
return _serialize_ambari_configs(get_instance_params_mapping(inst))
def get_cluster_params(cluster):
@ -216,3 +235,37 @@ def get_cluster_params(cluster):
configs["admin-properties"]["db_root_password"] = (
cluster.extra["ranger_db_password"])
return _serialize_ambari_configs(configs)
def get_config_group(instance):
params = get_instance_params_mapping(instance)
groups = []
for (service, targets) in six.iteritems(get_service_to_configs_map()):
current_group = {
'cluster_name': instance.cluster.name,
'group_name': "%s:%s" % (
instance.cluster.name, instance.instance_name),
'tag': service,
'description': "Config group for scaled "
"node %s" % instance.instance_name,
'hosts': [
{
'host_name': instance.fqdn()
}
],
'desired_configs': []
}
at_least_one_added = False
for target in targets:
configs = params.get(target, {})
if configs:
current_group['desired_configs'].append({
'type': target,
'properties': configs,
'tag': instance.instance_name
})
at_least_one_added = True
if at_least_one_added:
# Config Group without overridden data is not interesting
groups.append({'ConfigGroup': current_group})
return groups

View File

@ -0,0 +1,118 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
_COMMON_DECOMMISSION_TEMPLATE = {
"RequestInfo": {
"context": "",
"command": "DECOMMISSION",
"parameters": {
"slave_type": "",
"excluded_hosts": ""
},
"operation_level": {
"level": "HOST_COMPONENT",
"cluster_name": ""
}
},
"Requests/resource_filters": [
{
"service_name": "",
"component_name": ""
}
]
}
_COMMON_RESTART_TEMPLATE = {
"RequestInfo": {
"context": "",
"command": "RESTART",
"operation_level": {
"level": "HOST",
"cluster_name": ""
}
},
"Requests/resource_filters": [
{
"service_name": "",
"component_name": "",
"hosts": ""
}
]
}
def build_datanode_decommission_request(cluster_name, instances):
tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE)
tmpl["RequestInfo"]["context"] = "Decommission DataNodes"
tmpl["RequestInfo"]["parameters"]["slave_type"] = "DATANODE"
tmpl["RequestInfo"]["parameters"]["excluded_hosts"] = ",".join(
[i.fqdn() for i in instances])
tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name
tmpl["Requests/resource_filters"][0]["service_name"] = "HDFS"
tmpl["Requests/resource_filters"][0]["component_name"] = "NAMENODE"
return tmpl
def build_nodemanager_decommission_request(cluster_name, instances):
tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE)
tmpl["RequestInfo"]["context"] = "Decommission NodeManagers"
tmpl["RequestInfo"]["parameters"]["slave_type"] = "NODEMANAGER"
tmpl["RequestInfo"]["parameters"]["excluded_hosts"] = ",".join(
[i.fqdn() for i in instances])
tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name
tmpl["Requests/resource_filters"][0]["service_name"] = "YARN"
tmpl["Requests/resource_filters"][0]["component_name"] = "RESOURCEMANAGER"
return tmpl
def build_namenode_restart_request(cluster_name, nn_instance):
tmpl = copy.deepcopy(_COMMON_RESTART_TEMPLATE)
tmpl["RequestInfo"]["context"] = "Restart NameNode"
tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name
tmpl["Requests/resource_filters"][0]["service_name"] = "HDFS"
tmpl["Requests/resource_filters"][0]["component_name"] = "NAMENODE"
tmpl["Requests/resource_filters"][0]["hosts"] = nn_instance.fqdn()
return tmpl
def build_resourcemanager_restart_request(cluster_name, rm_instance):
tmpl = copy.deepcopy(_COMMON_RESTART_TEMPLATE)
tmpl["RequestInfo"]["context"] = "Restart ResourceManager"
tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name
tmpl["Requests/resource_filters"][0]["service_name"] = "YARN"
tmpl["Requests/resource_filters"][0]["component_name"] = "RESOURCEMANAGER"
tmpl["Requests/resource_filters"][0]["hosts"] = rm_instance.fqdn()
return tmpl

View File

@ -22,11 +22,9 @@ from oslo_utils import uuidutils
from sahara import conductor
from sahara import context
from sahara.i18n import _
from sahara.plugins.ambari import client as ambari_client
from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import configs
from sahara.plugins import exceptions as p_exc
from sahara.plugins import utils as plugin_utils
from sahara.utils import poll_utils
@ -59,15 +57,21 @@ def setup_ambari(cluster):
LOG.debug("Ambari management console installed")
def setup_agents(cluster):
def setup_agents(cluster, instances=None):
LOG.debug("Set up Ambari agents")
manager_address = plugin_utils.get_instance(
cluster, p_common.AMBARI_SERVER).fqdn()
if not instances:
instances = plugin_utils.get_instances(cluster)
_setup_agents(instances, manager_address)
def _setup_agents(instances, manager_address):
with context.ThreadGroup() as tg:
for inst in plugin_utils.get_instances(cluster):
for inst in instances:
tg.spawn("hwx-agent-setup-%s" % inst.id,
_setup_agent, inst, manager_address)
LOG.debug("Ambari agents has been installed")
LOG.debug("Ambari agents have been installed")
def _disable_repos_on_inst(instance):
@ -158,24 +162,21 @@ def update_default_ambari_password(cluster):
cluster = conductor.cluster_get(ctx, cluster.id)
def wait_host_registration(cluster):
def wait_host_registration(cluster, instances):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
hosts = plugin_utils.get_instances(cluster)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
kwargs = {"client": client, "num_hosts": len(hosts)}
kwargs = {"client": client, "instances": instances}
poll_utils.poll(_check_host_registration, kwargs=kwargs, timeout=600)
registered_hosts = client.get_registered_hosts()
registered_host_names = [h["Hosts"]["host_name"] for h in registered_hosts]
actual_host_names = [h.fqdn() for h in hosts]
if sorted(registered_host_names) != sorted(actual_host_names):
raise p_exc.HadoopProvisionError(
_("Host registration fails in Ambari"))
def _check_host_registration(client, num_hosts):
def _check_host_registration(client, instances):
hosts = client.get_registered_hosts()
return len(hosts) == num_hosts
registered_host_names = [h["Hosts"]["host_name"] for h in hosts]
for instance in instances:
if instance.fqdn() not in registered_host_names:
return False
return True
def set_up_hdp_repos(cluster):
@ -223,10 +224,10 @@ def create_blueprint(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
client.create_blueprint(cluster.name, bp)
return client.create_blueprint(cluster.name, bp)
def start_cluster(cluster):
def _build_ambari_cluster_template(cluster):
cl_tmpl = {
"blueprint": cluster.name,
"default_password": uuidutils.generate_uuid(),
@ -238,19 +239,135 @@ def start_cluster(cluster):
"name": instance.instance_name,
"hosts": [{"fqdn": instance.fqdn()}]
})
return cl_tmpl
def start_cluster(cluster):
ambari_template = _build_ambari_cluster_template(cluster)
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
req_id = client.create_cluster(cluster.name, cl_tmpl)["id"]
req_id = client.create_cluster(cluster.name, ambari_template)["id"]
client.wait_ambari_request(req_id, cluster.name)
def add_new_hosts(cluster, instances):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
for inst in instances:
client.add_host_to_cluster(inst)
def manage_config_groups(cluster, instances):
groups = []
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
for instance in instances:
groups.extend(configs.get_config_group(instance))
with ambari_client.AmbariClient(ambari, password=password) as client:
client.create_config_group(cluster, groups)
def manage_host_components(cluster, instances):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
requests_ids = []
with ambari_client.AmbariClient(ambari, password=password) as client:
clients = p_common.get_clients(cluster)
for instance in instances:
services = p_common.get_ambari_proc_list(instance.node_group)
services.extend(clients)
for service in services:
client.add_service_to_host(instance, service)
requests_ids.append(
client.start_service_on_host(
instance, service, 'INSTALLED'))
client.wait_ambari_requests(requests_ids, cluster.name)
# all services added and installed, let's start them
requests_ids = []
for instance in instances:
services = p_common.get_ambari_proc_list(instance.node_group)
services.extend(p_common.ALL_LIST)
for service in services:
requests_ids.append(
client.start_service_on_host(
instance, service, 'STARTED'))
client.wait_ambari_requests(requests_ids, cluster.name)
def decommission_hosts(cluster, instances):
nodemanager_instances = filter(
lambda i: p_common.NODEMANAGER in i.node_group.node_processes,
instances)
if len(nodemanager_instances) > 0:
decommission_nodemanagers(cluster, nodemanager_instances)
datanode_instances = filter(
lambda i: p_common.DATANODE in i.node_group.node_processes,
instances)
if len(datanode_instances) > 0:
decommission_datanodes(cluster, datanode_instances)
def decommission_nodemanagers(cluster, instances):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
client.decommission_nodemanagers(cluster.name, instances)
def decommission_datanodes(cluster, instances):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
client.decommission_datanodes(cluster.name, instances)
def remove_services_from_hosts(cluster, instances):
for inst in instances:
LOG.debug("Stopping and removing processes from host %s" % inst.fqdn())
_remove_services_from_host(cluster, inst)
LOG.debug("Removing the host %s" % inst.fqdn())
_remove_host(cluster, inst)
def _remove_services_from_host(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
hdp_processes = client.list_host_processes(cluster.name, instance)
for proc in hdp_processes:
LOG.debug("Stopping process %s on host %s " %
(proc, instance.fqdn()))
client.stop_process_on_host(cluster.name, instance, proc)
LOG.debug("Removing process %s from host %s " %
(proc, instance.fqdn()))
client.remove_process_from_host(cluster.name, instance, proc)
_wait_all_processes_removed(cluster, instance)
def _remove_host(cluster, inst):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
client.delete_host(cluster.name, inst)
def _wait_all_processes_removed(cluster, instance):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client:
while True:
status = client.check_request_status(cluster.name, req_id)
LOG.debug("Task %s in %s state. Completed %.1f%%" % (
status["request_context"], status["request_status"],
status["progress_percent"]))
if status["request_status"] == "COMPLETED":
hdp_processes = client.list_host_processes(cluster.name, instance)
if not hdp_processes:
return
if status["request_status"] in ["IN_PROGRESS", "PENDING"]:
context.sleep(5)
else:
raise p_exc.HadoopProvisionError(
_("Ambari request in %s state") % status["request_status"])
context.sleep(5)

View File

@ -79,7 +79,8 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
deploy.wait_ambari_accessible(cluster)
deploy.update_default_ambari_password(cluster)
cluster = conductor.cluster_get(context.ctx(), cluster.id)
deploy.wait_host_registration(cluster)
deploy.wait_host_registration(cluster,
plugin_utils.get_instances(cluster))
deploy.set_up_hdp_repos(cluster)
deploy.create_blueprint(cluster)
@ -164,16 +165,23 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
cluster = conductor.cluster_get(ctx, cluster.id)
def validate(self, cluster):
validation.validate_creation(cluster.id)
validation.validate(cluster.id)
def scale_cluster(self, cluster, instances):
pass
deploy.setup_agents(cluster, instances)
cluster = conductor.cluster_get(context.ctx(), cluster.id)
deploy.wait_host_registration(cluster, instances)
deploy.add_new_hosts(cluster, instances)
deploy.manage_config_groups(cluster, instances)
deploy.manage_host_components(cluster, instances)
swift_helper.install_ssl_certs(instances)
def decommission_nodes(self, cluster, instances):
pass
deploy.decommission_hosts(cluster, instances)
deploy.remove_services_from_hosts(cluster, instances)
def validate_scaling(self, cluster, existing, additional):
pass
validation.validate(cluster.id)
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EDPSparkEngine.get_supported_job_types():

View File

@ -25,7 +25,7 @@ from sahara.plugins import utils
conductor = conductor.API
def validate_creation(cluster_id):
def validate(cluster_id):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
_check_ambari(cluster)

View File

@ -18,6 +18,7 @@ import mock
from oslo_serialization import jsonutils
from sahara.plugins.ambari import client as ambari_client
from sahara.plugins import exceptions as p_exc
from sahara.tests.unit import base
@ -40,6 +41,11 @@ class AmbariClientTestCase(base.SaharaTestCase):
self.instance.remote.return_value = self.remote
self.instance.management_ip = "1.2.3.4"
self.good_pending_resp = mock.MagicMock()
self.good_pending_resp.status_code = 200
self.good_pending_resp.text = ('{"Requests": '
'{"id": 1, "status": "PENDING"}}')
def test_init_client_default(self):
client = ambari_client.AmbariClient(self.instance)
self.assertEqual(self.http_client, client._http_client)
@ -173,3 +179,75 @@ class AmbariClientTestCase(base.SaharaTestCase):
"http://1.2.3.4:8080/api/v1/clusters/cluster_name",
data=jsonutils.dumps({"some": "data"}), verify=False,
auth=client._auth, headers=self.headers)
def test_start_process_on_host(self):
client = ambari_client.AmbariClient(self.instance)
self.http_client.put.return_value = self.good_pending_resp
client.wait_ambari_request = mock.MagicMock()
instance = mock.MagicMock()
instance.fqdn.return_value = "i1"
instance.cluster.name = "cl"
client.start_service_on_host(instance, "HDFS", 'STATE')
self.http_client.put.assert_called_with(
"http://1.2.3.4:8080/api/v1/clusters/"
"cl/hosts/i1/host_components/HDFS",
data=jsonutils.dumps(
{
"HostRoles": {"state": "STATE"},
"RequestInfo": {
"context": "Starting service HDFS, "
"moving to state STATE"}
}),
verify=False, auth=client._auth, headers=self.headers)
def test_stop_process_on_host(self):
client = ambari_client.AmbariClient(self.instance)
check_mock = mock.MagicMock()
check_mock.status_code = 200
check_mock.text = '{"HostRoles": {"state": "SOME_STATE"}}'
self.http_client.get.return_value = check_mock
self.http_client.put.return_value = self.good_pending_resp
client.wait_ambari_request = mock.MagicMock()
instance = mock.MagicMock()
instance.fqdn.return_value = "i1"
client.stop_process_on_host("cluster_name", instance, "p1")
self.http_client.put.assert_called_with(
"http://1.2.3.4:8080/api/v1/clusters/"
"cluster_name/hosts/i1/host_components/p1",
data=jsonutils.dumps(
{
"HostRoles": {"state": "INSTALLED"},
"RequestInfo": {"context": "Stopping p1"}
}),
verify=False, auth=client._auth, headers=self.headers)
@mock.patch("sahara.plugins.ambari.client.context")
def test_wait_ambari_request(self, mock_context):
client = ambari_client.AmbariClient(self.instance)
check_mock = mock.MagicMock()
d1 = {"request_context": "r1", "request_status": "PENDING",
"progress_percent": 42}
d2 = {"request_context": "r1", "request_status": "COMPLETED",
"progress_percent": 100}
check_mock.side_effect = [d1, d2]
client.check_request_status = check_mock
client.wait_ambari_request("id1", "c1")
check_mock.assert_has_calls([mock.call("c1", "id1"),
mock.call("c1", "id1")])
@mock.patch("sahara.plugins.ambari.client.context")
def test_wait_ambari_request_error(self, mock_context):
client = ambari_client.AmbariClient(self.instance)
check_mock = mock.MagicMock()
d1 = {"request_context": "r1", "request_status": "ERROR",
"progress_percent": 146}
check_mock.return_value = d1
client.check_request_status = check_mock
self.assertRaises(p_exc.HadoopProvisionError,
client.wait_ambari_request, "id1", "c1")

View File

@ -0,0 +1,68 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.ambari import decomission_helper
from sahara.tests.unit import base
class DecommissionHelperTestCase(base.SaharaTestCase):
def setUp(self):
super(DecommissionHelperTestCase, self).setUp()
self.i1 = mock.MagicMock()
self.i1.fqdn.return_value = "i1"
self.i2 = mock.MagicMock()
self.i2.fqdn.return_value = "i2"
def test_build_datanode_decommission_request(self):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_datanode_decommission_request(c_name,
instances)
self.assertEqual("i1,i2",
res["RequestInfo"]["parameters"]["excluded_hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_nodemanager_decommission_request(self):
c_name = "c1"
instances = [self.i1, self.i2]
res = decomission_helper.build_nodemanager_decommission_request(
c_name, instances)
self.assertEqual("i1,i2",
res["RequestInfo"]["parameters"]["excluded_hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_namenode_restart_request(self):
res = decomission_helper.build_namenode_restart_request("c1", self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])
def test_build_resourcemanager_restart_request(self):
res = decomission_helper.build_resourcemanager_restart_request("c1",
self.i1)
self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"])
self.assertEqual("c1",
res["RequestInfo"]["operation_level"]["cluster_name"])