Add event log for HDP plugin

closes-bug: 1612206
Change-Id: Ifd9cb87261d4ae1d27b4b9ebbd15f84e0882ea22
This commit is contained in:
Michael Ionkin 2016-09-06 15:52:42 +03:00
parent 518d7e3141
commit 2de626ad8f
3 changed files with 123 additions and 43 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Added event log for HDP plugin

View File

@ -22,6 +22,7 @@ from oslo_utils import uuidutils
from sahara import conductor
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LW
from sahara.plugins.ambari import client as ambari_client
from sahara.plugins.ambari import common as p_common
@ -30,6 +31,7 @@ from sahara.plugins.ambari import ha_helper
from sahara.plugins import kerberos
from sahara.plugins import utils as plugin_utils
from sahara.topology import topology_helper as t_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import poll_utils
@ -56,6 +58,8 @@ os_type_map = {
}
@cpo.event_wrapper(True, step=_("Set up Ambari management console"),
param=('cluster', 0))
def setup_ambari(cluster):
LOG.debug("Set up Ambari management console")
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
@ -79,6 +83,8 @@ def setup_agents(cluster, instances=None):
def _setup_agents(instances, manager_address):
cpo.add_provisioning_step(
instances[0].cluster.id, _("Set up Ambari agents"), len(instances))
with context.ThreadGroup() as tg:
for inst in instances:
tg.spawn("hwx-agent-setup-%s" % inst.id,
@ -108,6 +114,7 @@ def disable_repos(cluster):
_disable_repos_on_inst, inst)
@cpo.event_wrapper(True)
def _setup_agent(instance, ambari_address):
with instance.remote() as r:
sudo = functools.partial(r.execute_command, run_as_root=True)
@ -118,6 +125,8 @@ def _setup_agent(instance, ambari_address):
sudo("yum clean all")
@cpo.event_wrapper(True, step=_("Wait Ambari accessible"),
param=('cluster', 0))
def wait_ambari_accessible(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
kwargs = {"host": ambari.management_ip, "port": 8080}
@ -162,6 +171,7 @@ def _prepare_ranger(cluster):
sudo("rm /tmp/init.sql")
@cpo.event_wrapper(True, step=_("Prepare Hive"), param=('cluster', 0))
def prepare_hive(cluster):
hive = plugin_utils.get_instance(cluster, p_common.HIVE_SERVER)
if not hive:
@ -175,6 +185,8 @@ def prepare_hive(cluster):
'/user/oozie/conf/hive-site.xml" hdfs')
@cpo.event_wrapper(True, step=_("Update default Ambari password"),
param=('cluster', 0))
def update_default_ambari_password(cluster):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
new_password = uuidutils.generate_uuid()
@ -187,6 +199,8 @@ def update_default_ambari_password(cluster):
cluster = conductor.cluster_get(ctx, cluster.id)
@cpo.event_wrapper(True, step=_("Wait registration of hosts"),
param=('cluster', 0))
def wait_host_registration(cluster, instances):
with _get_ambari_client(cluster) as client:
kwargs = {"client": client, "instances": instances}
@ -202,11 +216,9 @@ def _check_host_registration(client, instances):
return True
def set_up_hdp_repos(cluster):
hdp_repo = configs.get_hdp_repo_url(cluster)
hdp_utils_repo = configs.get_hdp_utils_repo_url(cluster)
if not hdp_repo and not hdp_utils_repo:
return
@cpo.event_wrapper(True, step=_("Set up HDP repositories"),
param=('cluster', 0))
def _set_up_hdp_repos(cluster, hdp_repo, hdp_utils_repo):
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
pv = cluster.hadoop_version
repos = repo_id_map[pv]
@ -219,15 +231,20 @@ def set_up_hdp_repos(cluster):
hdp_utils_repo)
def set_up_hdp_repos(cluster):
hdp_repo = configs.get_hdp_repo_url(cluster)
hdp_utils_repo = configs.get_hdp_utils_repo_url(cluster)
if hdp_repo or hdp_utils_repo:
_set_up_hdp_repos(cluster, hdp_repo, hdp_utils_repo)
def get_kdc_server(cluster):
return plugin_utils.get_instance(
cluster, p_common.AMBARI_SERVER)
def prepare_kerberos(cluster, instances=None):
if not kerberos.is_kerberos_security_enabled(cluster):
return
@cpo.event_wrapper(True, step=_("Prepare Kerberos"), param=('cluster', 0))
def _prepare_kerberos(cluster, instances=None):
if instances is None:
kerberos.deploy_infrastructure(cluster, get_kdc_server(cluster))
kerberos.prepare_policy_files(cluster)
@ -239,6 +256,11 @@ def prepare_kerberos(cluster, instances=None):
kerberos.prepare_policy_files(cluster)
def prepare_kerberos(cluster, instances=None):
if kerberos.is_kerberos_security_enabled(cluster):
_prepare_kerberos(cluster, instances)
def _serialize_mit_kdc_kerberos_env(cluster):
return {
'kerberos-env': {
@ -281,6 +303,8 @@ def get_host_group_components(cluster, processes):
return result
@cpo.event_wrapper(True, step=_("Create Ambari blueprint"),
param=('cluster', 0))
def create_blueprint(cluster):
_prepare_ranger(cluster)
cluster = conductor.cluster_get(context.ctx(), cluster.id)
@ -344,7 +368,7 @@ def _build_ambari_cluster_template(cluster):
if kerberos.is_kerberos_security_enabled(cluster):
cl_tmpl["credentials"] = _get_credentials(cluster)
cl_tmpl["security"] = {"type": "KERBEROS"}
topology = _configure_topology_data(cluster)
topology = _get_topology_data(cluster)
for ng in cluster.node_groups:
for instance in ng.instances:
host = {"fqdn": instance.fqdn()}
@ -357,6 +381,7 @@ def _build_ambari_cluster_template(cluster):
return cl_tmpl
@cpo.event_wrapper(True, step=_("Start cluster"), param=('cluster', 0))
def start_cluster(cluster):
ambari_template = _build_ambari_cluster_template(cluster)
with _get_ambari_client(cluster) as client:
@ -364,12 +389,21 @@ def start_cluster(cluster):
client.wait_ambari_request(req_id, cluster.name)
@cpo.event_wrapper(True)
def _add_host_to_cluster(instance, client):
client.add_host_to_cluster(instance)
def add_new_hosts(cluster, instances):
with _get_ambari_client(cluster) as client:
cpo.add_provisioning_step(
cluster.id, _("Add new hosts"), len(instances))
for inst in instances:
client.add_host_to_cluster(inst)
_add_host_to_cluster(inst, client)
@cpo.event_wrapper(True, step=_("Generate config groups"),
param=('cluster', 0))
def manage_config_groups(cluster, instances):
groups = []
for instance in instances:
@ -378,6 +412,8 @@ def manage_config_groups(cluster, instances):
client.create_config_group(cluster, groups)
@cpo.event_wrapper(True, step=_("Cleanup config groups"),
param=('cluster', 0))
def cleanup_config_groups(cluster, instances):
to_remove = set()
for instance in instances:
@ -394,10 +430,9 @@ def cleanup_config_groups(cluster, instances):
client.remove_config_group(cluster, cfg_id)
@cpo.event_wrapper(True, step=_("Regenerate keytabs for Kerberos"),
param=('cluster', 0))
def _regenerate_keytabs(cluster):
if not kerberos.is_kerberos_security_enabled(cluster):
return
with _get_ambari_client(cluster) as client:
alias = "kdc.admin.credential"
try:
@ -418,7 +453,9 @@ def _regenerate_keytabs(cluster):
client.wait_ambari_request(req_id, cluster.name)
def manage_host_components(cluster, instances):
@cpo.event_wrapper(True, step=_("Install services on hosts"),
param=('cluster', 0))
def _install_services_to_hosts(cluster, instances):
requests_ids = []
with _get_ambari_client(cluster) as client:
clients = p_common.get_clients(cluster)
@ -432,8 +469,10 @@ def manage_host_components(cluster, instances):
instance, service, 'INSTALLED'))
client.wait_ambari_requests(requests_ids, cluster.name)
_regenerate_keytabs(cluster)
@cpo.event_wrapper(True, step=_("Start services on hosts"),
param=('cluster', 0))
def _start_services_on_hosts(cluster, instances):
with _get_ambari_client(cluster) as client:
# all services added and installed, let's start them
requests_ids = []
@ -447,6 +486,15 @@ def manage_host_components(cluster, instances):
client.wait_ambari_requests(requests_ids, cluster.name)
def manage_host_components(cluster, instances):
_install_services_to_hosts(cluster, instances)
if kerberos.is_kerberos_security_enabled(cluster):
_regenerate_keytabs(cluster)
_start_services_on_hosts(cluster, instances)
@cpo.event_wrapper(True, step=_("Decommission NodeManagers and DataNodes"),
param=('cluster', 0))
def decommission_hosts(cluster, instances):
nodemanager_instances = filter(
lambda i: p_common.NODEMANAGER in i.node_group.node_processes,
@ -481,6 +529,8 @@ def restart_resourcemanager(cluster, instance):
client.restart_resourcemanager(cluster.name, instance)
@cpo.event_wrapper(True, step=_("Restart NameNodes and ResourceManagers"),
param=('cluster', 0))
def restart_nns_and_rms(cluster):
nns = plugin_utils.get_instances(cluster, p_common.NAMENODE)
for nn in nns:
@ -496,6 +546,7 @@ def restart_service(cluster, service_name):
client.restart_service(cluster.name, service_name)
@cpo.event_wrapper(True, step=_("Remove hosts"), param=('cluster', 0))
def remove_services_from_hosts(cluster, instances):
for inst in instances:
LOG.debug("Stopping and removing processes from host %s" % inst.fqdn())
@ -539,7 +590,7 @@ def _get_ambari_client(cluster):
return ambari_client.AmbariClient(ambari, password=password)
def _configure_topology_data(cluster):
def _get_topology_data(cluster):
if not t_helper.is_data_locality_enabled():
return {}
@ -549,39 +600,60 @@ def _configure_topology_data(cluster):
return t_helper.generate_topology_map(cluster, is_node_awareness=False)
@cpo.event_wrapper(True)
def _configure_topology_data(cluster, inst, client):
topology = _get_topology_data(cluster)
client.set_rack_info_for_instance(
cluster.name, inst, topology[inst.instance_name])
@cpo.event_wrapper(True, step=_("Restart HDFS and MAPREDUCE2 services"),
param=('cluster', 0))
def _restart_hdfs_and_mapred_services(cluster, client):
client.restart_service(cluster.name, p_common.HDFS_SERVICE)
client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE)
def configure_rack_awareness(cluster, instances):
if not t_helper.is_data_locality_enabled():
return
topology = _configure_topology_data(cluster)
with _get_ambari_client(cluster) as client:
cpo.add_provisioning_step(
cluster.id, _("Configure rack awareness"), len(instances))
for inst in instances:
client.set_rack_info_for_instance(
cluster.name, inst, topology[inst.instance_name])
client.restart_service(cluster.name, p_common.HDFS_SERVICE)
client.restart_service(cluster.name, p_common.MAPREDUCE2_SERVICE)
_configure_topology_data(cluster, inst, client)
_restart_hdfs_and_mapred_services(cluster, client)
@cpo.event_wrapper(True)
def _add_hadoop_swift_jar(instance, new_jar):
with instance.remote() as r:
code, out = r.execute_command(
"test -f %s" % new_jar, raise_when_error=False)
if code == 0:
# get ambari hadoop version (e.g.: 2.7.1.2.3.4.0-3485)
code, amb_hadoop_version = r.execute_command(
"sudo hadoop version | grep 'Hadoop' | awk '{print $2}'")
amb_hadoop_version = amb_hadoop_version.strip()
# get special code of ambari hadoop version(e.g.:2.3.4.0-3485)
amb_code = '.'.join(amb_hadoop_version.split('.')[3:])
origin_jar = (
"/usr/hdp/{}/hadoop-mapreduce/hadoop-openstack-{}.jar".format(
amb_code, amb_hadoop_version))
r.execute_command("sudo cp {} {}".format(new_jar, origin_jar))
else:
LOG.warning(_LW("The {jar_file} file cannot be found "
"in the {dir} directory so Keystone API v3 "
"is not enabled for this cluster.")
.format(jar_file="hadoop-openstack.jar",
dir="/opt"))
def add_hadoop_swift_jar(instances):
new_jar = "/opt/hadoop-openstack.jar"
cpo.add_provisioning_step(instances[0].cluster.id,
_("Add Hadoop Swift jar to instances"),
len(instances))
for inst in instances:
with inst.remote() as r:
code, out = r.execute_command("test -f %s" % new_jar,
raise_when_error=False)
if code == 0:
# get ambari hadoop version (e.g.: 2.7.1.2.3.4.0-3485)
code, amb_hadoop_version = r.execute_command(
"sudo hadoop version | grep 'Hadoop' | awk '{print $2}'")
amb_hadoop_version = amb_hadoop_version.strip()
# get special code of ambari hadoop version(e.g.:2.3.4.0-3485)
amb_code = '.'.join(amb_hadoop_version.split('.')[3:])
origin_jar = (
"/usr/hdp/%s/hadoop-mapreduce/hadoop-openstack-%s.jar" % (
amb_code, amb_hadoop_version))
r.execute_command("sudo cp %s %s" % (new_jar, origin_jar))
else:
LOG.warning(_LW("The {jar_file} file cannot be found "
"in the {dir} directory so Keystone API v3 "
"is not enabled for this cluster.")
.format(jar_file="hadoop-openstack.jar",
dir="/opt"))
_add_hadoop_swift_jar(inst, new_jar)

View File

@ -21,11 +21,14 @@ from sahara.tests.unit import base
class TestDeploy(base.SaharaTestCase):
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
@mock.patch('sahara.utils.cluster.check_cluster_exists')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.plugins.ambari.client.AmbariClient.get')
@mock.patch('sahara.plugins.ambari.client.AmbariClient.delete')
def test_cleanup_config_groups(self, client_delete, client_get,
get_instance):
get_instance, check_cluster_exists,
add_provisioning_step):
def response(data):
fake = mock.Mock()
fake.text = jsonutils.dumps(data)
@ -60,6 +63,8 @@ class TestDeploy(base.SaharaTestCase):
]
client_delete.side_effect = [response({})]
check_cluster_exists.return_value = True
deploy.cleanup_config_groups(cl, [inst1])
get_calls = [
mock.call(