# Copyright (c) 2014 Intel Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. # This file only contains utils not related to cm_api, while in # cloudera_utils the functions are cm_api involved. import os import telnetlib # nosec from oslo_log import log as logging from sahara.conductor import resource as res from sahara import context from sahara import exceptions as exc from sahara.i18n import _ from sahara.plugins.cdh import commands as cmd from sahara.plugins.cdh import db_helper as dh from sahara.plugins import recommendations_utils as ru from sahara.plugins import utils as u from sahara.swift import swift_helper from sahara.utils import cluster_progress_ops as cpo from sahara.utils import edp as edp_u from sahara.utils import poll_utils from sahara.utils import types PATH_TO_CORE_SITE_XML = '/etc/hadoop/conf/core-site.xml' HADOOP_LIB_DIR = '/usr/lib/hadoop-mapreduce' CM_API_PORT = 7180 LOG = logging.getLogger(__name__) AUTO_CONFIGURATION_SCHEMA = { 'node_configs': { 'yarn.scheduler.minimum-allocation-mb': ( 'RESOURCEMANAGER', 'yarn_scheduler_minimum_allocation_mb'), 'mapreduce.reduce.memory.mb': ( 'YARN_GATEWAY', 'mapreduce_reduce_memory_mb'), 'mapreduce.map.memory.mb': ( 'YARN_GATEWAY', 'mapreduce_map_memory_mb',), 'yarn.scheduler.maximum-allocation-mb': ( 'RESOURCEMANAGER', 'yarn_scheduler_maximum_allocation_mb'), 'yarn.app.mapreduce.am.command-opts': ( 'YARN_GATEWAY', 'yarn_app_mapreduce_am_command_opts'), 'yarn.nodemanager.resource.memory-mb': ( 'NODEMANAGER', 'yarn_nodemanager_resource_memory_mb'), 'mapreduce.task.io.sort.mb': ( 'YARN_GATEWAY', 'io_sort_mb'), 'mapreduce.map.java.opts': ( 'YARN_GATEWAY', 'mapreduce_map_java_opts'), 'mapreduce.reduce.java.opts': ( 'YARN_GATEWAY', 'mapreduce_reduce_java_opts'), 'yarn.app.mapreduce.am.resource.mb': ( 'YARN_GATEWAY', 'yarn_app_mapreduce_am_resource_mb') }, 'cluster_configs': { 'dfs.replication': ('HDFS', 'dfs_replication') } } class CDHPluginAutoConfigsProvider(ru.HadoopAutoConfigsProvider): def get_datanode_name(self): return 'HDFS_DATANODE' class AbstractPluginUtils(object): def __init__(self): # c_helper will be defined in derived classes. self.c_helper = None def get_role_name(self, instance, service): # NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}" shortcuts = { 'AGENT': 'A', 'ALERTPUBLISHER': 'AP', 'CATALOGSERVER': 'ICS', 'DATANODE': 'DN', 'EVENTSERVER': 'ES', 'HBASE_INDEXER': 'LHBI', 'HIVEMETASTORE': 'HVM', 'HIVESERVER2': 'HVS', 'HOSTMONITOR': 'HM', 'IMPALAD': 'ID', 'JOBHISTORY': 'JS', 'JOURNALNODE': 'JN', 'KAFKA_BROKER': 'KB', 'KMS': 'KMS', 'MASTER': 'M', 'NAMENODE': 'NN', 'NODEMANAGER': 'NM', 'OOZIE_SERVER': 'OS', 'REGIONSERVER': 'RS', 'RESOURCEMANAGER': 'RM', 'SECONDARYNAMENODE': 'SNN', 'SENTRY_SERVER': 'SNT', 'SERVER': 'S', 'SERVICEMONITOR': 'SM', 'SOLR_SERVER': 'SLR', 'SPARK_YARN_HISTORY_SERVER': 'SHS', 'SQOOP_SERVER': 'S2S', 'STATESTORE': 'ISS', 'WEBHCAT': 'WHC', 'HDFS_GATEWAY': 'HG', 'YARN_GATEWAY': 'YG' } return '%s_%s' % (shortcuts.get(service, service), instance.hostname().replace('-', '_')) def get_manager(self, cluster): return u.get_instance(cluster, 'CLOUDERA_MANAGER') def get_namenode(self, cluster): return u.get_instance(cluster, "HDFS_NAMENODE") def get_datanodes(self, cluster): return u.get_instances(cluster, 'HDFS_DATANODE') def get_hdfs_nodes(self, cluster, instances=None): instances = instances if instances else u.get_instances(cluster) return u.instances_with_services( instances, ["HDFS_DATANODE", "HDFS_NAMENODE", "HDFS_SECONDARYNAMENODE"]) def get_secondarynamenode(self, cluster): return u.get_instance(cluster, 'HDFS_SECONDARYNAMENODE') def get_historyserver(self, cluster): return u.get_instance(cluster, 'YARN_JOBHISTORY') def get_resourcemanager(self, cluster): return u.get_instance(cluster, 'YARN_RESOURCEMANAGER') def get_nodemanagers(self, cluster): return u.get_instances(cluster, 'YARN_NODEMANAGER') def get_oozie(self, cluster): return u.get_instance(cluster, 'OOZIE_SERVER') def get_hive_metastore(self, cluster): return u.get_instance(cluster, 'HIVE_METASTORE') def get_hive_servers(self, cluster): return u.get_instances(cluster, 'HIVE_SERVER2') def get_hue(self, cluster): return u.get_instance(cluster, 'HUE_SERVER') def get_spark_historyserver(self, cluster): return u.get_instance(cluster, 'SPARK_YARN_HISTORY_SERVER') def get_zookeepers(self, cluster): return u.get_instances(cluster, 'ZOOKEEPER_SERVER') def get_hbase_master(self, cluster): return u.get_instance(cluster, 'HBASE_MASTER') def get_sentry(self, cluster): return u.get_instance(cluster, 'SENTRY_SERVER') def get_flumes(self, cluster): return u.get_instances(cluster, 'FLUME_AGENT') def get_solrs(self, cluster): return u.get_instances(cluster, 'SOLR_SERVER') def get_sqoop(self, cluster): return u.get_instance(cluster, 'SQOOP_SERVER') def get_hbase_indexers(self, cluster): return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER') def get_catalogserver(self, cluster): return u.get_instance(cluster, 'IMPALA_CATALOGSERVER') def get_statestore(self, cluster): return u.get_instance(cluster, 'IMPALA_STATESTORE') def get_impalads(self, cluster): return u.get_instances(cluster, 'IMPALAD') def get_kms(self, cluster): return u.get_instances(cluster, 'KMS') def get_jns(self, cluster): return u.get_instances(cluster, 'HDFS_JOURNALNODE') def get_stdb_rm(self, cluster): return u.get_instance(cluster, 'YARN_STANDBYRM') def get_kafka_brokers(self, cluster): return u.get_instances(cluster, 'KAFKA_BROKER') def convert_process_configs(self, configs): p_dict = { "CLOUDERA": ['MANAGER'], "NAMENODE": ['NAMENODE'], "DATANODE": ['DATANODE'], "SECONDARYNAMENODE": ['SECONDARYNAMENODE'], "RESOURCEMANAGER": ['RESOURCEMANAGER'], "NODEMANAGER": ['NODEMANAGER'], "JOBHISTORY": ['JOBHISTORY'], "OOZIE": ['OOZIE_SERVER'], "HIVESERVER": ['HIVESERVER2'], "HIVEMETASTORE": ['HIVEMETASTORE'], "WEBHCAT": ['WEBHCAT'], "HUE": ['HUE_SERVER'], "SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'], "ZOOKEEPER": ['SERVER'], "MASTER": ['MASTER'], "REGIONSERVER": ['REGIONSERVER'], "FLUME": ['AGENT'], "CATALOGSERVER": ['CATALOGSERVER'], "STATESTORE": ['STATESTORE'], "IMPALAD": ['IMPALAD'], "KS_INDEXER": ['HBASE_INDEXER'], "SENTRY": ['SENTRY_SERVER'], "SOLR": ['SOLR_SERVER'], "SQOOP": ['SQOOP_SERVER'], "KMS": ['KMS'], "YARN_GATEWAY": ['YARN_GATEWAY'], "HDFS_GATEWAY": ['HDFS_GATEWAY'], "JOURNALNODE": ['JOURNALNODE'], "KAFKA": ['KAFKA_BROKER'] } if isinstance(configs, res.Resource): configs = configs.to_dict() for k in configs.keys(): if k in p_dict.keys(): item = configs[k] del configs[k] newkey = p_dict[k][0] configs[newkey] = item return res.Resource(configs) def convert_role_showname(self, showname): # Yarn ResourceManager and Standby ResourceManager will # be converted to ResourceManager. name_dict = { 'CLOUDERA_MANAGER': 'MANAGER', 'HDFS_NAMENODE': 'NAMENODE', 'HDFS_DATANODE': 'DATANODE', 'HDFS_JOURNALNODE': 'JOURNALNODE', 'HDFS_SECONDARYNAMENODE': 'SECONDARYNAMENODE', 'YARN_RESOURCEMANAGER': 'RESOURCEMANAGER', 'YARN_STANDBYRM': 'RESOURCEMANAGER', 'YARN_NODEMANAGER': 'NODEMANAGER', 'YARN_JOBHISTORY': 'JOBHISTORY', 'OOZIE_SERVER': 'OOZIE_SERVER', 'HIVE_SERVER2': 'HIVESERVER2', 'HIVE_METASTORE': 'HIVEMETASTORE', 'HIVE_WEBHCAT': 'WEBHCAT', 'HUE_SERVER': 'HUE_SERVER', 'SPARK_YARN_HISTORY_SERVER': 'SPARK_YARN_HISTORY_SERVER', 'ZOOKEEPER_SERVER': 'SERVER', 'HBASE_MASTER': 'MASTER', 'HBASE_REGIONSERVER': 'REGIONSERVER', 'FLUME_AGENT': 'AGENT', 'IMPALA_CATALOGSERVER': 'CATALOGSERVER', 'IMPALA_STATESTORE': 'STATESTORE', 'IMPALAD': 'IMPALAD', 'KEY_VALUE_STORE_INDEXER': 'HBASE_INDEXER', 'SENTRY_SERVER': 'SENTRY_SERVER', 'SOL_SERVER': 'SOLR_SERVER', 'SQOOP_SERVER': 'SQOOP_SERVER', } return name_dict.get(showname, showname) def install_packages(self, instances, packages): # instances non-empty cpo.add_provisioning_step( instances[0].cluster_id, _("Install packages"), len(instances)) with context.ThreadGroup() as tg: for i in instances: tg.spawn('cdh-inst-pkgs-%s' % i.instance_name, self._install_pkgs, i, packages) @cpo.event_wrapper(True) def _install_pkgs(self, instance, packages): with instance.remote() as r: cmd.install_packages(r, packages) def start_cloudera_agents(self, instances): # instances non-empty cpo.add_provisioning_step( instances[0].cluster_id, _("Start Cloudera Agents"), len(instances)) with context.ThreadGroup() as tg: for i in instances: tg.spawn('cdh-agent-start-%s' % i.instance_name, self._start_cloudera_agent, i) @cpo.event_wrapper(True) def _start_cloudera_agent(self, instance): mng_hostname = self.get_manager(instance.cluster).hostname() with instance.remote() as r: cmd.configure_agent(r, mng_hostname) cmd.start_agent(r) def configure_swift(self, cluster, instances=None): if self.c_helper.is_swift_enabled(cluster): if not instances: instances = u.get_instances(cluster) cpo.add_provisioning_step( cluster.id, _("Configure Swift"), len(instances)) with context.ThreadGroup() as tg: for i in instances: tg.spawn('cdh-swift-conf-%s' % i.instance_name, self._configure_swift_to_inst, i) swift_helper.install_ssl_certs(instances) @cpo.event_wrapper(True) def _configure_swift_to_inst(self, instance): cluster = instance.cluster swift_lib_remote_url = self.c_helper.get_swift_lib_url(cluster) with instance.remote() as r: if r.execute_command('ls %s/hadoop-openstack.jar' % HADOOP_LIB_DIR, raise_when_error=False)[0] != 0: r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % ( swift_lib_remote_url, HADOOP_LIB_DIR)) def configure_sentry(self, cluster): manager = self.get_manager(cluster) with manager.remote() as r: dh.create_sentry_database(cluster, r) def put_hive_hdfs_xml(self, cluster): servers = self.get_hive_servers(cluster) with servers[0].remote() as r: conf_path = edp_u.get_hive_shared_conf_path('hdfs') r.execute_command( 'sudo su - -c "hadoop fs -mkdir -p %s" hdfs' % os.path.dirname(conf_path)) r.execute_command( 'sudo su - -c "hadoop fs -put /etc/hive/conf/hive-site.xml ' '%s" hdfs' % conf_path) def configure_hive(self, cluster): manager = self.get_manager(cluster) with manager.remote() as r: dh.create_hive_database(cluster, r) def install_extjs(self, cluster): extjs_remote_location = self.c_helper.get_extjs_lib_url(cluster) extjs_vm_location_dir = '/var/lib/oozie' extjs_vm_location_path = extjs_vm_location_dir + '/extjs.zip' with self.get_oozie(cluster).remote() as r: if r.execute_command('ls %s/ext-2.2' % extjs_vm_location_dir, raise_when_error=False)[0] != 0: r.execute_command('curl -L -o \'%s\' %s' % ( extjs_vm_location_path, extjs_remote_location), run_as_root=True) r.execute_command('unzip %s -d %s' % ( extjs_vm_location_path, extjs_vm_location_dir), run_as_root=True) def _check_cloudera_manager_started(self, manager): try: conn = telnetlib.Telnet(manager.management_ip, CM_API_PORT) conn.close() return True except IOError: return False @cpo.event_wrapper( True, step=_("Start Cloudera Manager"), param=('cluster', 1)) def _start_cloudera_manager(self, cluster, timeout_config): manager = self.get_manager(cluster) with manager.remote() as r: cmd.start_cloudera_db(r) cmd.start_manager(r) poll_utils.plugin_option_poll( cluster, self._check_cloudera_manager_started, timeout_config, _("Await starting Cloudera Manager"), 2, {'manager': manager}) def configure_os(self, instances): # instances non-empty cpo.add_provisioning_step( instances[0].cluster_id, _("Configure OS"), len(instances)) with context.ThreadGroup() as tg: for inst in instances: tg.spawn('cdh-repo-conf-%s' % inst.instance_name, self._configure_repo_from_inst, inst) @cpo.event_wrapper(True) def _configure_repo_from_inst(self, instance): LOG.debug("Configure repos from instance {instance}".format( instance=instance.instance_name)) cluster = instance.cluster with instance.remote() as r: if cmd.is_ubuntu_os(r): cdh5_key = ( self.c_helper.get_cdh5_key_url(cluster) or self.c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL) cm5_key = ( self.c_helper.get_cm5_key_url(cluster) or self.c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL) if self.c_helper.is_keytrustee_available(): kms_key = ( self.c_helper.get_kms_key_url(cluster) or self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL) kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL cmd.add_ubuntu_repository(r, kms_repo_url, 'kms') cmd.add_apt_key(r, kms_key) cdh5_repo_content = self.c_helper.CDH5_UBUNTU_REPO cm5_repo_content = self.c_helper.CM5_UBUNTU_REPO cmd.write_ubuntu_repository(r, cdh5_repo_content, 'cdh') cmd.add_apt_key(r, cdh5_key) cmd.write_ubuntu_repository(r, cm5_repo_content, 'cm') cmd.add_apt_key(r, cm5_key) cmd.update_repository(r) if cmd.is_centos_os(r): cdh5_repo_content = self.c_helper.CDH5_CENTOS_REPO cm5_repo_content = self.c_helper.CM5_CENTOS_REPO if self.c_helper.is_keytrustee_available(): kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL cmd.add_centos_repository(r, kms_repo_url, 'kms') cmd.write_centos_repository(r, cdh5_repo_content, 'cdh') cmd.write_centos_repository(r, cm5_repo_content, 'cm') cmd.update_repository(r) def _get_config_value(self, service, name, configs, cluster=None): if cluster: conf = cluster.cluster_configs if service in conf and name in conf[service]: return types.transform_to_num(conf[service][name]) for node_group in cluster.node_groups: conf = node_group.node_configs if service in conf and name in conf[service]: return types.transform_to_num(conf[service][name]) for config in configs: if config.applicable_target == service and config.name == name: return types.transform_to_num(config.default_value) raise exc.InvalidDataException( _("Unable to find config: applicable_target: {target}, name: " "{name}").format(target=service, name=name)) def recommend_configs(self, cluster, plugin_configs, scaling): provider = CDHPluginAutoConfigsProvider( AUTO_CONFIGURATION_SCHEMA, plugin_configs, cluster, scaling) provider.apply_recommended_configs() def start_cloudera_manager(self, cluster): self._start_cloudera_manager( cluster, self.c_helper.AWAIT_MANAGER_STARTING_TIMEOUT) def get_config_value(self, service, name, cluster=None): configs = self.c_helper.get_plugin_configs() return self._get_config_value(service, name, configs, cluster)