diff --git a/sahara/plugins/ambari/client.py b/sahara/plugins/ambari/client.py index 436b4d65..0f31b5df 100644 --- a/sahara/plugins/ambari/client.py +++ b/sahara/plugins/ambari/client.py @@ -14,9 +14,18 @@ # limitations under the License. +from oslo_log import log as logging from oslo_serialization import jsonutils from requests import auth +from sahara import context +from sahara.i18n import _ +from sahara.plugins.ambari import decomission_helper as d_helper +from sahara.plugins import exceptions as p_exc + + +LOG = logging.getLogger(__name__) + class AmbariClient(object): def __init__(self, instance, port="8080", **kwargs): @@ -64,6 +73,17 @@ class AmbariClient(object): if resp.text: return jsonutils.loads(resp.text) + @staticmethod + def req_id(response): + if not response.text: + raise p_exc.HadoopProvisionError("Cannot find request id. " + "No response body") + body = jsonutils.loads(response.text) + if "Requests" not in body or "id" not in body["Requests"]: + raise p_exc.HadoopProvisionError("Cannot find request id. " + "Unexpected response format") + return body["Requests"]["id"] + def get_registered_hosts(self): url = self._base_url + "/hosts" resp = self.get(url) @@ -90,19 +110,106 @@ class AmbariClient(object): def create_blueprint(self, name, data): url = self._base_url + "/blueprints/%s" % name resp = self.post(url, data=jsonutils.dumps(data)) - self.check_response(resp) + return self.check_response(resp) def create_cluster(self, name, data): url = self._base_url + "/clusters/%s" % name resp = self.post(url, data=jsonutils.dumps(data)) return self.check_response(resp).get("Requests") + def add_host_to_cluster(self, instance): + cluster_name = instance.cluster.name + hostname = instance.fqdn() + url = self._base_url + "/clusters/{cluster}/hosts/{hostname}".format( + cluster=cluster_name, hostname=hostname) + resp = self.post(url) + self.check_response(resp) + + def create_config_group(self, cluster, data): + url = self._base_url + "/clusters/%s/config_groups" % cluster.name + resp = self.post(url, data=jsonutils.dumps(data)) + return self.check_response(resp) + + def add_service_to_host(self, inst, service): + url = "{pref}/clusters/{cluster}/hosts/{host}/host_components/{proc}" + url = url.format(pref=self._base_url, cluster=inst.cluster.name, + host=inst.fqdn(), proc=service) + self.check_response(self.post(url)) + + def start_service_on_host(self, inst, service, final_state): + url = "{pref}/clusters/{cluster}/hosts/{host}/host_components/{proc}" + url = url.format( + pref=self._base_url, cluster=inst.cluster.name, host=inst.fqdn(), + proc=service) + data = { + 'HostRoles': { + 'state': final_state + }, + 'RequestInfo': { + 'context': "Starting service {service}, moving to state " + "{state}".format(service=service, state=final_state) + } + } + resp = self.put(url, data=jsonutils.dumps(data)) + self.check_response(resp) + # return req_id to check health of request + return self.req_id(resp) + + def decommission_nodemanagers(self, cluster_name, instances): + url = self._base_url + "/clusters/%s/requests" % cluster_name + data = d_helper.build_nodemanager_decommission_request(cluster_name, + instances) + resp = self.post(url, data=jsonutils.dumps(data)) + self.wait_ambari_request(self.req_id(resp), cluster_name) + + def decommission_datanodes(self, cluster_name, instances): + url = self._base_url + "/clusters/%s/requests" % cluster_name + data = d_helper.build_datanode_decommission_request(cluster_name, + instances) + resp = self.post(url, data=jsonutils.dumps(data)) + self.wait_ambari_request(self.req_id(resp), cluster_name) + + def remove_process_from_host(self, cluster_name, instance, process): + url = self._base_url + "/clusters/%s/hosts/%s/host_components/%s" % ( + cluster_name, instance.fqdn(), process) + resp = self.delete(url) + + return self.check_response(resp) + + def stop_process_on_host(self, cluster_name, instance, process): + url = self._base_url + "/clusters/%s/hosts/%s/host_components/%s" % ( + cluster_name, instance.fqdn(), process) + check_installed_resp = self.check_response(self.get(url)) + + if check_installed_resp["HostRoles"]["state"] != "INSTALLED": + data = {"HostRoles": {"state": "INSTALLED"}, + "RequestInfo": {"context": "Stopping %s" % process}} + resp = self.put(url, data=jsonutils.dumps(data)) + + self.wait_ambari_request(self.req_id(resp), cluster_name) + + def delete_host(self, cluster_name, instance): + url = self._base_url + "/clusters/%s/hosts/%s" % (cluster_name, + instance.fqdn()) + resp = self.delete(url) + return self.check_response(resp) + def check_request_status(self, cluster_name, req_id): url = self._base_url + "/clusters/%s/requests/%d" % (cluster_name, req_id) resp = self.get(url) return self.check_response(resp).get("Requests") + def list_host_processes(self, cluster_name, instance): + url = self._base_url + "/clusters/%s/hosts/%s" % ( + cluster_name, instance.fqdn()) + resp = self.get(url) + body = jsonutils.loads(resp.text) + + procs = [p["HostRoles"]["component_name"] + for p in body["host_components"]] + return procs + def set_up_mirror(self, stack_version, os_type, repo_id, repo_url): url = self._base_url + ( "/stacks/HDP/versions/%s/operating_systems/%s/repositories/%s") % ( @@ -115,3 +222,42 @@ class AmbariClient(object): } resp = self.put(url, data=jsonutils.dumps(data)) self.check_response(resp) + + def get_request_status(self, cluster_name, request_id): + url = self._base_url + ("/clusters/%s/requests/%s" % + (cluster_name, request_id)) + resp = self.check_response(self.get(url)) + return resp.get('Requests').get("request_status") + + def wait_ambari_requests(self, requests, cluster_name): + requests = set(requests) + while len(requests) > 0: + completed, not_completed = set(), set() + for request in requests: + status = self.get_request_status(cluster_name, request) + if status == 'COMPLETED': + completed.add(request) + elif status in ['IN_PROGRESS', 'PENDING']: + not_completed.add(request) + else: + raise p_exc.HadoopProvisionError( + _("Some Ambari request(s) not in COMPLETED state")) + requests = not_completed + context.sleep(5) + LOG.debug("Waiting for ambari %d requests to be completed", + len(not_completed)) + LOG.debug("Waiting for ambari requests completed") + + def wait_ambari_request(self, request_id, cluster_name): + while True: + status = self.check_request_status(cluster_name, request_id) + LOG.debug("Task %s in %s state. Completed %.1f%%" % ( + status["request_context"], status["request_status"], + status["progress_percent"])) + if status["request_status"] == "COMPLETED": + return + if status["request_status"] in ["IN_PROGRESS", "PENDING"]: + context.sleep(5) + else: + raise p_exc.HadoopProvisionError( + _("Ambari request in %s state") % status["request_status"]) diff --git a/sahara/plugins/ambari/common.py b/sahara/plugins/ambari/common.py index a0c9eb3d..e4759d87 100644 --- a/sahara/plugins/ambari/common.py +++ b/sahara/plugins/ambari/common.py @@ -133,3 +133,11 @@ def get_clients(cluster): clients = list(set(clients)) clients.extend(ALL_LIST) return clients + + +def instances_have_process(instances, process): + for i in instances: + if process in i.node_group.node_processes: + return True + + return False diff --git a/sahara/plugins/ambari/configs.py b/sahara/plugins/ambari/configs.py index 75edb408..ad69a77a 100644 --- a/sahara/plugins/ambari/configs.py +++ b/sahara/plugins/ambari/configs.py @@ -79,6 +79,22 @@ CFG_PROCESS_MAP = { } +SERVICES_TO_CONFIGS_MAP = None + + +def get_service_to_configs_map(): + global SERVICES_TO_CONFIGS_MAP + if SERVICES_TO_CONFIGS_MAP: + return SERVICES_TO_CONFIGS_MAP + data = {} + for (key, item) in six.iteritems(CFG_PROCESS_MAP): + if item not in data: + data[item] = [] + data[item].append(key) + SERVICES_TO_CONFIGS_MAP = data + return SERVICES_TO_CONFIGS_MAP + + ng_confs = [ "dfs.datanode.data.dir", "dtnode_heapsize", @@ -176,7 +192,7 @@ def _make_paths(dirs, suffix): return ",".join([d + suffix for d in dirs]) -def get_instance_params(inst): +def get_instance_params_mapping(inst): configs = _create_ambari_configs(inst.node_group.node_configs, inst.node_group.cluster.hadoop_version) storage_paths = inst.storage_paths() @@ -200,8 +216,11 @@ def get_instance_params(inst): configs.setdefault("oozie-site", {}) configs["oozie-site"][ "oozie.service.AuthorizationService.security.enabled"] = "false" + return configs - return _serialize_ambari_configs(configs) + +def get_instance_params(inst): + return _serialize_ambari_configs(get_instance_params_mapping(inst)) def get_cluster_params(cluster): @@ -216,3 +235,37 @@ def get_cluster_params(cluster): configs["admin-properties"]["db_root_password"] = ( cluster.extra["ranger_db_password"]) return _serialize_ambari_configs(configs) + + +def get_config_group(instance): + params = get_instance_params_mapping(instance) + groups = [] + for (service, targets) in six.iteritems(get_service_to_configs_map()): + current_group = { + 'cluster_name': instance.cluster.name, + 'group_name': "%s:%s" % ( + instance.cluster.name, instance.instance_name), + 'tag': service, + 'description': "Config group for scaled " + "node %s" % instance.instance_name, + 'hosts': [ + { + 'host_name': instance.fqdn() + } + ], + 'desired_configs': [] + } + at_least_one_added = False + for target in targets: + configs = params.get(target, {}) + if configs: + current_group['desired_configs'].append({ + 'type': target, + 'properties': configs, + 'tag': instance.instance_name + }) + at_least_one_added = True + if at_least_one_added: + # Config Group without overridden data is not interesting + groups.append({'ConfigGroup': current_group}) + return groups diff --git a/sahara/plugins/ambari/decomission_helper.py b/sahara/plugins/ambari/decomission_helper.py new file mode 100644 index 00000000..1e8f28b1 --- /dev/null +++ b/sahara/plugins/ambari/decomission_helper.py @@ -0,0 +1,118 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + + +_COMMON_DECOMMISSION_TEMPLATE = { + "RequestInfo": { + "context": "", + "command": "DECOMMISSION", + "parameters": { + "slave_type": "", + "excluded_hosts": "" + }, + "operation_level": { + "level": "HOST_COMPONENT", + "cluster_name": "" + } + }, + "Requests/resource_filters": [ + { + "service_name": "", + "component_name": "" + } + ] +} + +_COMMON_RESTART_TEMPLATE = { + "RequestInfo": { + "context": "", + "command": "RESTART", + "operation_level": { + "level": "HOST", + "cluster_name": "" + } + }, + "Requests/resource_filters": [ + { + "service_name": "", + "component_name": "", + "hosts": "" + } + ] +} + + +def build_datanode_decommission_request(cluster_name, instances): + tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE) + + tmpl["RequestInfo"]["context"] = "Decommission DataNodes" + + tmpl["RequestInfo"]["parameters"]["slave_type"] = "DATANODE" + tmpl["RequestInfo"]["parameters"]["excluded_hosts"] = ",".join( + [i.fqdn() for i in instances]) + + tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name + + tmpl["Requests/resource_filters"][0]["service_name"] = "HDFS" + tmpl["Requests/resource_filters"][0]["component_name"] = "NAMENODE" + + return tmpl + + +def build_nodemanager_decommission_request(cluster_name, instances): + tmpl = copy.deepcopy(_COMMON_DECOMMISSION_TEMPLATE) + + tmpl["RequestInfo"]["context"] = "Decommission NodeManagers" + + tmpl["RequestInfo"]["parameters"]["slave_type"] = "NODEMANAGER" + tmpl["RequestInfo"]["parameters"]["excluded_hosts"] = ",".join( + [i.fqdn() for i in instances]) + + tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name + + tmpl["Requests/resource_filters"][0]["service_name"] = "YARN" + tmpl["Requests/resource_filters"][0]["component_name"] = "RESOURCEMANAGER" + + return tmpl + + +def build_namenode_restart_request(cluster_name, nn_instance): + tmpl = copy.deepcopy(_COMMON_RESTART_TEMPLATE) + + tmpl["RequestInfo"]["context"] = "Restart NameNode" + + tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name + + tmpl["Requests/resource_filters"][0]["service_name"] = "HDFS" + tmpl["Requests/resource_filters"][0]["component_name"] = "NAMENODE" + tmpl["Requests/resource_filters"][0]["hosts"] = nn_instance.fqdn() + + return tmpl + + +def build_resourcemanager_restart_request(cluster_name, rm_instance): + tmpl = copy.deepcopy(_COMMON_RESTART_TEMPLATE) + + tmpl["RequestInfo"]["context"] = "Restart ResourceManager" + + tmpl["RequestInfo"]["operation_level"]["cluster_name"] = cluster_name + + tmpl["Requests/resource_filters"][0]["service_name"] = "YARN" + tmpl["Requests/resource_filters"][0]["component_name"] = "RESOURCEMANAGER" + tmpl["Requests/resource_filters"][0]["hosts"] = rm_instance.fqdn() + + return tmpl diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py index b9880f88..67520f00 100644 --- a/sahara/plugins/ambari/deploy.py +++ b/sahara/plugins/ambari/deploy.py @@ -22,11 +22,9 @@ from oslo_utils import uuidutils from sahara import conductor from sahara import context -from sahara.i18n import _ from sahara.plugins.ambari import client as ambari_client from sahara.plugins.ambari import common as p_common from sahara.plugins.ambari import configs -from sahara.plugins import exceptions as p_exc from sahara.plugins import utils as plugin_utils from sahara.utils import poll_utils @@ -59,15 +57,21 @@ def setup_ambari(cluster): LOG.debug("Ambari management console installed") -def setup_agents(cluster): +def setup_agents(cluster, instances=None): LOG.debug("Set up Ambari agents") manager_address = plugin_utils.get_instance( cluster, p_common.AMBARI_SERVER).fqdn() + if not instances: + instances = plugin_utils.get_instances(cluster) + _setup_agents(instances, manager_address) + + +def _setup_agents(instances, manager_address): with context.ThreadGroup() as tg: - for inst in plugin_utils.get_instances(cluster): + for inst in instances: tg.spawn("hwx-agent-setup-%s" % inst.id, _setup_agent, inst, manager_address) - LOG.debug("Ambari agents has been installed") + LOG.debug("Ambari agents have been installed") def _disable_repos_on_inst(instance): @@ -158,24 +162,21 @@ def update_default_ambari_password(cluster): cluster = conductor.cluster_get(ctx, cluster.id) -def wait_host_registration(cluster): +def wait_host_registration(cluster, instances): ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) - hosts = plugin_utils.get_instances(cluster) password = cluster.extra["ambari_password"] with ambari_client.AmbariClient(ambari, password=password) as client: - kwargs = {"client": client, "num_hosts": len(hosts)} + kwargs = {"client": client, "instances": instances} poll_utils.poll(_check_host_registration, kwargs=kwargs, timeout=600) - registered_hosts = client.get_registered_hosts() - registered_host_names = [h["Hosts"]["host_name"] for h in registered_hosts] - actual_host_names = [h.fqdn() for h in hosts] - if sorted(registered_host_names) != sorted(actual_host_names): - raise p_exc.HadoopProvisionError( - _("Host registration fails in Ambari")) -def _check_host_registration(client, num_hosts): +def _check_host_registration(client, instances): hosts = client.get_registered_hosts() - return len(hosts) == num_hosts + registered_host_names = [h["Hosts"]["host_name"] for h in hosts] + for instance in instances: + if instance.fqdn() not in registered_host_names: + return False + return True def set_up_hdp_repos(cluster): @@ -223,10 +224,10 @@ def create_blueprint(cluster): ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) password = cluster.extra["ambari_password"] with ambari_client.AmbariClient(ambari, password=password) as client: - client.create_blueprint(cluster.name, bp) + return client.create_blueprint(cluster.name, bp) -def start_cluster(cluster): +def _build_ambari_cluster_template(cluster): cl_tmpl = { "blueprint": cluster.name, "default_password": uuidutils.generate_uuid(), @@ -238,19 +239,135 @@ def start_cluster(cluster): "name": instance.instance_name, "hosts": [{"fqdn": instance.fqdn()}] }) + return cl_tmpl + + +def start_cluster(cluster): + ambari_template = _build_ambari_cluster_template(cluster) + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) password = cluster.extra["ambari_password"] with ambari_client.AmbariClient(ambari, password=password) as client: - req_id = client.create_cluster(cluster.name, cl_tmpl)["id"] + req_id = client.create_cluster(cluster.name, ambari_template)["id"] + client.wait_ambari_request(req_id, cluster.name) + + +def add_new_hosts(cluster, instances): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + with ambari_client.AmbariClient(ambari, password=password) as client: + for inst in instances: + client.add_host_to_cluster(inst) + + +def manage_config_groups(cluster, instances): + groups = [] + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + for instance in instances: + groups.extend(configs.get_config_group(instance)) + with ambari_client.AmbariClient(ambari, password=password) as client: + client.create_config_group(cluster, groups) + + +def manage_host_components(cluster, instances): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + requests_ids = [] + with ambari_client.AmbariClient(ambari, password=password) as client: + clients = p_common.get_clients(cluster) + for instance in instances: + services = p_common.get_ambari_proc_list(instance.node_group) + services.extend(clients) + for service in services: + client.add_service_to_host(instance, service) + requests_ids.append( + client.start_service_on_host( + instance, service, 'INSTALLED')) + client.wait_ambari_requests(requests_ids, cluster.name) + # all services added and installed, let's start them + requests_ids = [] + for instance in instances: + services = p_common.get_ambari_proc_list(instance.node_group) + services.extend(p_common.ALL_LIST) + for service in services: + requests_ids.append( + client.start_service_on_host( + instance, service, 'STARTED')) + client.wait_ambari_requests(requests_ids, cluster.name) + + +def decommission_hosts(cluster, instances): + nodemanager_instances = filter( + lambda i: p_common.NODEMANAGER in i.node_group.node_processes, + instances) + if len(nodemanager_instances) > 0: + decommission_nodemanagers(cluster, nodemanager_instances) + + datanode_instances = filter( + lambda i: p_common.DATANODE in i.node_group.node_processes, + instances) + if len(datanode_instances) > 0: + decommission_datanodes(cluster, datanode_instances) + + +def decommission_nodemanagers(cluster, instances): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + + with ambari_client.AmbariClient(ambari, password=password) as client: + client.decommission_nodemanagers(cluster.name, instances) + + +def decommission_datanodes(cluster, instances): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + + with ambari_client.AmbariClient(ambari, password=password) as client: + client.decommission_datanodes(cluster.name, instances) + + +def remove_services_from_hosts(cluster, instances): + for inst in instances: + LOG.debug("Stopping and removing processes from host %s" % inst.fqdn()) + _remove_services_from_host(cluster, inst) + LOG.debug("Removing the host %s" % inst.fqdn()) + _remove_host(cluster, inst) + + +def _remove_services_from_host(cluster, instance): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + + with ambari_client.AmbariClient(ambari, password=password) as client: + hdp_processes = client.list_host_processes(cluster.name, instance) + for proc in hdp_processes: + LOG.debug("Stopping process %s on host %s " % + (proc, instance.fqdn())) + client.stop_process_on_host(cluster.name, instance, proc) + + LOG.debug("Removing process %s from host %s " % + (proc, instance.fqdn())) + client.remove_process_from_host(cluster.name, instance, proc) + + _wait_all_processes_removed(cluster, instance) + + +def _remove_host(cluster, inst): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + + with ambari_client.AmbariClient(ambari, password=password) as client: + client.delete_host(cluster.name, inst) + + +def _wait_all_processes_removed(cluster, instance): + ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) + password = cluster.extra["ambari_password"] + + with ambari_client.AmbariClient(ambari, password=password) as client: while True: - status = client.check_request_status(cluster.name, req_id) - LOG.debug("Task %s in %s state. Completed %.1f%%" % ( - status["request_context"], status["request_status"], - status["progress_percent"])) - if status["request_status"] == "COMPLETED": + hdp_processes = client.list_host_processes(cluster.name, instance) + if not hdp_processes: return - if status["request_status"] in ["IN_PROGRESS", "PENDING"]: - context.sleep(5) - else: - raise p_exc.HadoopProvisionError( - _("Ambari request in %s state") % status["request_status"]) + context.sleep(5) diff --git a/sahara/plugins/ambari/plugin.py b/sahara/plugins/ambari/plugin.py index 764a82e8..01ccb890 100644 --- a/sahara/plugins/ambari/plugin.py +++ b/sahara/plugins/ambari/plugin.py @@ -79,7 +79,8 @@ class AmbariPluginProvider(p.ProvisioningPluginBase): deploy.wait_ambari_accessible(cluster) deploy.update_default_ambari_password(cluster) cluster = conductor.cluster_get(context.ctx(), cluster.id) - deploy.wait_host_registration(cluster) + deploy.wait_host_registration(cluster, + plugin_utils.get_instances(cluster)) deploy.set_up_hdp_repos(cluster) deploy.create_blueprint(cluster) @@ -164,16 +165,23 @@ class AmbariPluginProvider(p.ProvisioningPluginBase): cluster = conductor.cluster_get(ctx, cluster.id) def validate(self, cluster): - validation.validate_creation(cluster.id) + validation.validate(cluster.id) def scale_cluster(self, cluster, instances): - pass + deploy.setup_agents(cluster, instances) + cluster = conductor.cluster_get(context.ctx(), cluster.id) + deploy.wait_host_registration(cluster, instances) + deploy.add_new_hosts(cluster, instances) + deploy.manage_config_groups(cluster, instances) + deploy.manage_host_components(cluster, instances) + swift_helper.install_ssl_certs(instances) def decommission_nodes(self, cluster, instances): - pass + deploy.decommission_hosts(cluster, instances) + deploy.remove_services_from_hosts(cluster, instances) def validate_scaling(self, cluster, existing, additional): - pass + validation.validate(cluster.id) def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EDPSparkEngine.get_supported_job_types(): diff --git a/sahara/plugins/ambari/validation.py b/sahara/plugins/ambari/validation.py index 73d244b0..db2a2e83 100644 --- a/sahara/plugins/ambari/validation.py +++ b/sahara/plugins/ambari/validation.py @@ -25,7 +25,7 @@ from sahara.plugins import utils conductor = conductor.API -def validate_creation(cluster_id): +def validate(cluster_id): ctx = context.ctx() cluster = conductor.cluster_get(ctx, cluster_id) _check_ambari(cluster) diff --git a/sahara/tests/unit/plugins/ambari/test_client.py b/sahara/tests/unit/plugins/ambari/test_client.py index 7f2cab25..2bd92a8b 100644 --- a/sahara/tests/unit/plugins/ambari/test_client.py +++ b/sahara/tests/unit/plugins/ambari/test_client.py @@ -18,6 +18,7 @@ import mock from oslo_serialization import jsonutils from sahara.plugins.ambari import client as ambari_client +from sahara.plugins import exceptions as p_exc from sahara.tests.unit import base @@ -40,6 +41,11 @@ class AmbariClientTestCase(base.SaharaTestCase): self.instance.remote.return_value = self.remote self.instance.management_ip = "1.2.3.4" + self.good_pending_resp = mock.MagicMock() + self.good_pending_resp.status_code = 200 + self.good_pending_resp.text = ('{"Requests": ' + '{"id": 1, "status": "PENDING"}}') + def test_init_client_default(self): client = ambari_client.AmbariClient(self.instance) self.assertEqual(self.http_client, client._http_client) @@ -173,3 +179,75 @@ class AmbariClientTestCase(base.SaharaTestCase): "http://1.2.3.4:8080/api/v1/clusters/cluster_name", data=jsonutils.dumps({"some": "data"}), verify=False, auth=client._auth, headers=self.headers) + + def test_start_process_on_host(self): + client = ambari_client.AmbariClient(self.instance) + self.http_client.put.return_value = self.good_pending_resp + client.wait_ambari_request = mock.MagicMock() + + instance = mock.MagicMock() + instance.fqdn.return_value = "i1" + instance.cluster.name = "cl" + + client.start_service_on_host(instance, "HDFS", 'STATE') + self.http_client.put.assert_called_with( + "http://1.2.3.4:8080/api/v1/clusters/" + "cl/hosts/i1/host_components/HDFS", + data=jsonutils.dumps( + { + "HostRoles": {"state": "STATE"}, + "RequestInfo": { + "context": "Starting service HDFS, " + "moving to state STATE"} + }), + verify=False, auth=client._auth, headers=self.headers) + + def test_stop_process_on_host(self): + client = ambari_client.AmbariClient(self.instance) + check_mock = mock.MagicMock() + check_mock.status_code = 200 + check_mock.text = '{"HostRoles": {"state": "SOME_STATE"}}' + self.http_client.get.return_value = check_mock + self.http_client.put.return_value = self.good_pending_resp + client.wait_ambari_request = mock.MagicMock() + instance = mock.MagicMock() + instance.fqdn.return_value = "i1" + + client.stop_process_on_host("cluster_name", instance, "p1") + self.http_client.put.assert_called_with( + "http://1.2.3.4:8080/api/v1/clusters/" + "cluster_name/hosts/i1/host_components/p1", + data=jsonutils.dumps( + { + "HostRoles": {"state": "INSTALLED"}, + "RequestInfo": {"context": "Stopping p1"} + }), + verify=False, auth=client._auth, headers=self.headers) + + @mock.patch("sahara.plugins.ambari.client.context") + def test_wait_ambari_request(self, mock_context): + client = ambari_client.AmbariClient(self.instance) + check_mock = mock.MagicMock() + d1 = {"request_context": "r1", "request_status": "PENDING", + "progress_percent": 42} + d2 = {"request_context": "r1", "request_status": "COMPLETED", + "progress_percent": 100} + check_mock.side_effect = [d1, d2] + client.check_request_status = check_mock + + client.wait_ambari_request("id1", "c1") + + check_mock.assert_has_calls([mock.call("c1", "id1"), + mock.call("c1", "id1")]) + + @mock.patch("sahara.plugins.ambari.client.context") + def test_wait_ambari_request_error(self, mock_context): + client = ambari_client.AmbariClient(self.instance) + check_mock = mock.MagicMock() + d1 = {"request_context": "r1", "request_status": "ERROR", + "progress_percent": 146} + check_mock.return_value = d1 + client.check_request_status = check_mock + + self.assertRaises(p_exc.HadoopProvisionError, + client.wait_ambari_request, "id1", "c1") diff --git a/sahara/tests/unit/plugins/ambari/test_decommission_helper.py b/sahara/tests/unit/plugins/ambari/test_decommission_helper.py new file mode 100644 index 00000000..4b685bb0 --- /dev/null +++ b/sahara/tests/unit/plugins/ambari/test_decommission_helper.py @@ -0,0 +1,68 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from sahara.plugins.ambari import decomission_helper +from sahara.tests.unit import base + + +class DecommissionHelperTestCase(base.SaharaTestCase): + + def setUp(self): + super(DecommissionHelperTestCase, self).setUp() + self.i1 = mock.MagicMock() + self.i1.fqdn.return_value = "i1" + + self.i2 = mock.MagicMock() + self.i2.fqdn.return_value = "i2" + + def test_build_datanode_decommission_request(self): + c_name = "c1" + instances = [self.i1, self.i2] + + res = decomission_helper.build_datanode_decommission_request(c_name, + instances) + self.assertEqual("i1,i2", + res["RequestInfo"]["parameters"]["excluded_hosts"]) + self.assertEqual("c1", + res["RequestInfo"]["operation_level"]["cluster_name"]) + + def test_build_nodemanager_decommission_request(self): + c_name = "c1" + instances = [self.i1, self.i2] + + res = decomission_helper.build_nodemanager_decommission_request( + c_name, instances) + + self.assertEqual("i1,i2", + res["RequestInfo"]["parameters"]["excluded_hosts"]) + self.assertEqual("c1", + res["RequestInfo"]["operation_level"]["cluster_name"]) + + def test_build_namenode_restart_request(self): + res = decomission_helper.build_namenode_restart_request("c1", self.i1) + + self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"]) + self.assertEqual("c1", + res["RequestInfo"]["operation_level"]["cluster_name"]) + + def test_build_resourcemanager_restart_request(self): + res = decomission_helper.build_resourcemanager_restart_request("c1", + self.i1) + + self.assertEqual("i1", res["Requests/resource_filters"][0]["hosts"]) + self.assertEqual("c1", + res["RequestInfo"]["operation_level"]["cluster_name"])