diff --git a/sahara/plugins/cdh/__init__.py b/sahara/plugins/cdh/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py new file mode 100644 index 00000000..d052a883 --- /dev/null +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -0,0 +1,195 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + +# cm_api client is not present in OS requirements +try: + from cm_api import api_client + from cm_api.endpoints import services +except ImportError: + api_client = None + services = None + +from sahara.i18n import _ +from sahara.plugins.cdh import utils as pu +from sahara.plugins.general import exceptions as ex + +CM_DEFAULT_USERNAME = 'admin' +CM_DEFAULT_PASSWD = 'admin' + +HDFS_SERVICE_NAME = 'hdfs01' +YARN_SERVICE_NAME = 'yarn01' +OOZIE_SERVICE_NAME = 'oozie01' + + +def have_cm_api_libs(): + return api_client and services + + +def cloudera_cmd(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + for cmd in f(*args, **kwargs): + result = cmd.wait() + if not result.success: + raise ex.HadoopProvisionError(result.resultMessage) + + return wrapper + + +def get_api_client(cluster): + manager_ip = pu.get_manager(cluster).management_ip + return api_client.ApiResource(manager_ip, username=CM_DEFAULT_USERNAME, + password=CM_DEFAULT_PASSWD) + + +def get_cloudera_cluster(cluster): + api = get_api_client(cluster) + return api.get_cluster(cluster.name) + + +@cloudera_cmd +def start_instances(cluster): + cm_cluster = get_cloudera_cluster(cluster) + yield cm_cluster.start() + + +def delete_instances(cluster, instances): + api = get_api_client(cluster) + hosts = api.get_all_hosts(view='full') + hostsnames_to_deleted = [i.fqdn() for i in instances] + for host in hosts: + if host.hostname in hostsnames_to_deleted: + api.delete_host(host.hostId) + + +def get_service(process, cluster=None, instance=None): + cm_cluster = None + if cluster: + cm_cluster = get_cloudera_cluster(cluster) + elif instance: + cm_cluster = get_cloudera_cluster(instance.node_group.cluster) + else: + raise ValueError(_("'cluster' or 'instance' argument missed")) + + if process in ['NAMENODE', 'DATANODE', 'SECONDARYNAMENODE']: + return cm_cluster.get_service(HDFS_SERVICE_NAME) + elif process in ['RESOURCEMANAGER', 'NODEMANAGER', 'JOBHISTORY']: + return cm_cluster.get_service(YARN_SERVICE_NAME) + elif process in ['OOZIE_SERVER']: + return cm_cluster.get_service(OOZIE_SERVICE_NAME) + else: + raise ValueError( + _("Process %(process)s is not supported by CDH plugin") % + {'process': process}) + + +def decomission_nodes(cluster, process, role_names): + service = get_service(process, cluster) + service.decommission(*role_names).wait() + for role_name in role_names: + service.delete_role(role_name) + + +@cloudera_cmd +def refresh_nodes(cluster, process, service_name): + cm_cluster = get_cloudera_cluster(cluster) + service = cm_cluster.get_service(service_name) + + nds = [n.name for n in service.get_roles_by_type(process)] + for nd in nds: + for st in service.refresh(nd): + yield st + + +@cloudera_cmd +def deploy_configs(cluster): + cm_cluster = get_cloudera_cluster(cluster) + yield cm_cluster.deploy_client_config() + + +@cloudera_cmd +def update_configs(instance): + for process in instance.node_group.node_processes: + service = get_service(process, instance=instance) + yield service.deploy_client_config(get_role_name(instance, process)) + + +def get_role_name(instance, service): + # NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}" + shortcuts = { + 'NAMENODE': 'NN', + 'DATANODE': 'DN', + 'SECONDARYNAMENODE': 'SNN', + 'RESOURCEMANAGER': 'RM', + 'NODEMANAGER': 'NM', + 'JOBHISTORY': 'JS', + 'OOZIE_SERVER': 'OS', + 'SERVICEMONITOR': 'SM', + 'HOSTMONITOR': 'HM', + 'EVENTSERVER': 'ES', + 'ALERTPUBLISHER': 'AP' + } + return '%s_%s' % (shortcuts.get(service, service), + instance.hostname().replace('-', '_')) + + +def create_mgmt_service(cluster): + api = get_api_client(cluster) + cm = api.get_cloudera_manager() + + setup_info = services.ApiServiceSetupInfo() + manager = pu.get_manager(cluster) + hostname = manager.fqdn() + processes = ['SERVICEMONITOR', 'HOSTMONITOR', + 'EVENTSERVER', 'ALERTPUBLISHER'] + for proc in processes: + setup_info.add_role_info(get_role_name(manager, proc), proc, hostname) + + cm.create_mgmt_service(setup_info) + cm.hosts_start_roles([hostname]) + + +@cloudera_cmd +def format_namenode(hdfs_service): + for nn in hdfs_service.get_roles_by_type('NAMENODE'): + yield hdfs_service.format_hdfs(nn.name)[0] + + +@cloudera_cmd +def start_service(service): + yield service.start() + + +@cloudera_cmd +def start_roles(service, role_names): + for role in service.start_roles(*role_names): + yield role + + +@cloudera_cmd +def create_yarn_job_history_dir(yarn_service): + yield yarn_service.create_yarn_job_history_dir() + + +@cloudera_cmd +def create_oozie_db(oozie_service): + yield oozie_service.create_oozie_db() + + +@cloudera_cmd +def install_oozie_sharelib(oozie_service): + yield oozie_service.install_oozie_sharelib() diff --git a/sahara/plugins/cdh/commands.py b/sahara/plugins/cdh/commands.py new file mode 100644 index 00000000..6752b810 --- /dev/null +++ b/sahara/plugins/cdh/commands.py @@ -0,0 +1,107 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.i18n import _ +from sahara.plugins.general import exceptions as ex + + +def _root(remote, cmd, **kwargs): + return remote.execute_command(cmd, run_as_root=True, **kwargs) + + +def _get_os_distrib(remote): + return remote.execute_command('lsb_release -is')[1].strip().lower() + + +def is_centos_os(remote): + return _get_os_distrib(remote) == 'centos' + + +def is_ubuntu_os(remote): + return _get_os_distrib(remote) == 'ubuntu' + + +def is_pre_installed_cdh(remote): + code, out = remote.execute_command('ls /etc/init.d/cloudera-scm-server', + raise_when_error=False) + return code == 0 + + +def stop_resourcemanager(remote): + _root(remote, 'service hadoop-yarn-resourcemanager stop') + + +def stop_nodemanager(remote): + _root(remote, 'service hadoop-yarn-nodemanager stop') + + +def stop_historyserver(remote): + _root(remote, 'service hadoop-mapreduce-historyserver stop') + + +def start_cloudera_db(remote): + _root(remote, 'service cloudera-scm-server-db start') + + +def start_manager(remote): + _root(remote, 'service cloudera-scm-server start') + + +def configure_agent(remote, manager_address): + remote.replace_remote_string('/etc/cloudera-scm-agent/config.ini', + 'server_host=.*', + 'server_host=%s' % manager_address) + + +def start_agent(remote): + _root(remote, 'service cloudera-scm-agent start') + + +def install_packages(remote, packages, timeout=1800): + distrib = _get_os_distrib(remote) + if distrib == 'ubuntu': + cmd = 'apt-get install -y %s' + elif distrib == 'centos': + cmd = 'yum install %s' + else: + raise ex.HadoopProvisionError( + _("OS on image is not supported by CDH plugin")) + + cmd = cmd % ' '.join(packages) + _root(remote, cmd, timeout=timeout) + + +def update_repository(remote): + if is_ubuntu_os(remote): + _root(remote, 'apt-get update') + + +def push_remote_file(remote, src, dst): + cmd = 'curl %s -o %s' % (src, dst) + _root(remote, cmd) + + +def add_ubuntu_repository(r, repo_list_url, repo_name): + push_remote_file(r, repo_list_url, + '/etc/apt/sources.list.d/%s.list' % repo_name) + + +def add_apt_key(remote, key_url): + cmd = 'wget -qO - %s | apt-key add -' % key_url + _root(remote, cmd) + + +def add_centos_repository(r, repo_list_url, repo_name): + push_remote_file(r, repo_list_url, '/etc/yum.repos.d/%s.repo' % repo_name) diff --git a/sahara/plugins/cdh/config_helper.py b/sahara/plugins/cdh/config_helper.py new file mode 100644 index 00000000..17976dbd --- /dev/null +++ b/sahara/plugins/cdh/config_helper.py @@ -0,0 +1,164 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from sahara.plugins import provisioning as p +from sahara.utils import files as f + +DEFAULT_CDH5_UBUNTU_REPO_LIST_URL = ('http://archive.cloudera.com/cdh5/ubuntu' + '/precise/amd64/cdh/cloudera.list') + +DEFAULT_CDH5_UBUNTU_REPO_KEY_URL = ('http://archive.cloudera.com/cdh5/ubuntu' + '/precise/amd64/cdh/archive.key') + +DEFAULT_CM5_UBUNTU_REPO_LIST_URL = ('http://archive.cloudera.com/cm5/ubuntu' + '/precise/amd64/cm/cloudera.list') + +DEFAULT_CM5_UBUNTU_REPO_KEY_URL = ('http://archive.cloudera.com/cm5/ubuntu' + '/precise/amd64/cm/archive.key') + +DEFAULT_CDH5_CENTOS_REPO_LIST_URL = ('http://archive.cloudera.com/cdh5/redhat' + '/6/x86_64/cdh/cloudera-cdh5.repo') + +DEFAULT_CM5_CENTOS_REPO_LIST_URL = ('http://archive.cloudera.com/cm5/redhat' + '/6/x86_64/cm/cloudera-manager.repo') + +DEFAULT_SWIFT_LIB_URL = ('https://repository.cloudera.com/artifactory/repo/org' + '/apache/hadoop/hadoop-openstack/2.3.0-cdh5.1.0' + '/hadoop-openstack-2.3.0-cdh5.1.0.jar') + +CDH5_REPO_URL = p.Config( + 'CDH5 repo list URL', 'general', 'cluster', priority=1, + default_value="") + +CDH5_REPO_KEY_URL = p.Config( + 'CDH5 repo key URL (for debian-based only)', 'general', 'cluster', + priority=1, default_value="") + +CM5_REPO_URL = p.Config( + 'CM5 repo list URL', 'general', 'cluster', priority=1, + default_value="") + +CM5_REPO_KEY_URL = p.Config( + 'CM5 repo key URL (for debian-based only)', 'general', 'cluster', + priority=1, default_value="") + +ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster', + config_type='bool', priority=1, + default_value=True) + +SWIFT_LIB_URL = p.Config( + 'Hadoop OpenStack library URL', 'general', 'cluster', priority=1, + default_value=DEFAULT_SWIFT_LIB_URL, + description=("Library that adds Swift support to CDH. The file will be " + "downloaded from VM.")) + + +def _get_cluster_plugin_configs(): + return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL, + ENABLE_SWIFT, SWIFT_LIB_URL] + + +# ng wide configs + +def _load_json(path_to_file): + data = f.get_file_text(path_to_file) + return json.loads(data) + + +path_to_config = 'plugins/cdh/resources/' +hdfs_confs = _load_json(path_to_config + 'hdfs-service.json') +namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json') +datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json') +secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json') +yarn_confs = _load_json(path_to_config + 'yarn-service.json') +resourcemanager_confs = _load_json( + path_to_config + 'yarn-resourcemanager.json') +nodemanager_confs = _load_json(path_to_config + 'yarn-nodemanager.json') +jobhistory_confs = _load_json(path_to_config + 'yarn-jobhistory.json') +oozie_service_confs = _load_json(path_to_config + 'oozie-service.json') +oozie_role_confs = _load_json(path_to_config + 'oozie-oozie.json') + +priority_one_confs = _load_json(path_to_config + 'priority-one-confs.json') + + +def _prepare_value(value): + if not value: + return "" + + return value.replace('\n', ' ') + + +def _init_configs(confs, app_target, scope): + cfgs = [] + for cfg in confs: + priority = 1 if cfg['name'] in priority_one_confs else 2 + c = p.Config(cfg['name'], app_target, scope, priority=priority, + default_value=_prepare_value(cfg['value']), + description=cfg['desc'], is_optional=True) + cfgs.append(c) + + return cfgs + + +def _get_ng_plugin_configs(): + cfg = [] + cfg += _init_configs(hdfs_confs, 'HDFS', 'cluster') + cfg += _init_configs(namenode_confs, 'NAMENODE', 'node') + cfg += _init_configs(datanode_confs, 'DATANODE', 'node') + cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node') + cfg += _init_configs(yarn_confs, 'YARN', 'cluster') + cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node') + cfg += _init_configs(nodemanager_confs, 'NODEMANAGER', 'node') + cfg += _init_configs(jobhistory_confs, 'JOBHISTORY', 'node') + cfg += _init_configs(oozie_service_confs, 'OOZIE', 'cluster') + cfg += _init_configs(oozie_role_confs, 'OOZIE', 'node') + return cfg + + +def get_plugin_configs(): + cluster_wide = _get_cluster_plugin_configs() + ng_wide = _get_ng_plugin_configs() + return cluster_wide + ng_wide + + +def _get_config_value(cluster, key): + return cluster.cluster_configs.get( + 'general', {}).get(key.name, key.default_value) + + +def get_cdh5_repo_url(cluster): + return _get_config_value(cluster, CDH5_REPO_URL) + + +def get_cdh5_key_url(cluster): + return _get_config_value(cluster, CDH5_REPO_KEY_URL) + + +def get_cm5_repo_url(cluster): + return _get_config_value(cluster, CM5_REPO_URL) + + +def get_cm5_key_url(cluster): + return _get_config_value(cluster, CM5_REPO_KEY_URL) + + +def is_swift_enabled(cluster): + return _get_config_value(cluster, ENABLE_SWIFT) + + +def get_swift_lib_url(cluster): + return _get_config_value(cluster, SWIFT_LIB_URL) diff --git a/sahara/plugins/cdh/deploy.py b/sahara/plugins/cdh/deploy.py new file mode 100644 index 00000000..6270c811 --- /dev/null +++ b/sahara/plugins/cdh/deploy.py @@ -0,0 +1,373 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import telnetlib + +import six + +from sahara import context +from sahara.i18n import _ +from sahara.i18n import _LI +from sahara.openstack.common import log as logging +from sahara.openstack.common import timeutils +from sahara.plugins.cdh import cloudera_utils as cu +from sahara.plugins.cdh import commands as cmd +from sahara.plugins.cdh import config_helper as c_helper +from sahara.plugins.cdh import utils as pu +from sahara.plugins.general import exceptions as ex +from sahara.plugins.general import utils as gu +from sahara.swift import swift_helper +from sahara.utils import xmlutils + +CM_API_PORT = 7180 + +CDH_VERSION = 'CDH5' + +HDFS_SERVICE_TYPE = 'HDFS' +YARN_SERVICE_TYPE = 'YARN' +OOZIE_SERVICE_TYPE = 'OOZIE' + +PATH_TO_CORE_SITE_XML = '/etc/hadoop/conf/core-site.xml' +HADOOP_LIB_DIR = '/usr/lib/hadoop-mapreduce' + +PACKAGES = [ + 'cloudera-manager-agent', + 'cloudera-manager-daemons', + 'cloudera-manager-server', + 'cloudera-manager-server-db', + 'hadoop-hdfs-datanode', + 'hadoop-hdfs-namenode', + 'hadoop-hdfs-secondarynamenode', + 'hadoop-mapreduce', + 'hadoop-mapreduce-historyserver', + 'hadoop-yarn-nodemanager', + 'hadoop-yarn-resourcemanager', + 'oozie', + 'oracle-j2sdk1.7', +] + +LOG = logging.getLogger(__name__) + + +def _merge_dicts(a, b): + res = {} + + def update(cfg): + for service, configs in six.iteritems(cfg): + if not res.get(service): + res[service] = {} + + res[service].update(configs) + + update(a) + update(b) + return res + + +def _get_configs(service, cluster=None, node_group=None): + def get_hadoop_dirs(mount_points, suffix): + return ','.join([x + suffix for x in mount_points]) + + all_confs = { + 'OOZIE': { + 'mapreduce_yarn_service': cu.YARN_SERVICE_NAME + }, + 'YARN': { + 'hdfs_service': cu.HDFS_SERVICE_NAME + } + } + + if node_group: + paths = node_group.storage_paths() + + ng_default_confs = { + 'NAMENODE': { + 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') + }, + 'SECONDARYNAMENODE': { + 'fs_checkpoint_dir_list': get_hadoop_dirs(paths, '/fs/snn') + }, + 'DATANODE': { + 'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn') + }, + 'NODEMANAGER': { + 'yarn_nodemanager_local_dirs': get_hadoop_dirs(paths, + '/yarn/local') + } + } + + ng_user_confs = node_group.node_configs + all_confs = _merge_dicts(all_confs, ng_user_confs) + all_confs = _merge_dicts(all_confs, ng_default_confs) + + if cluster: + all_confs = _merge_dicts(all_confs, cluster.cluster_configs) + + return all_confs.get(service, {}) + + +def configure_cluster(cluster): + instances = gu.get_instances(cluster) + + if not cmd.is_pre_installed_cdh(pu.get_manager(cluster).remote()): + _configure_os(instances) + _install_packages(instances, PACKAGES) + _post_install(instances) + + _start_cloudera_agents(instances) + _start_cloudera_manager(cluster) + _configure_manager(cluster) + _create_services(cluster) + _configure_services(cluster) + _configure_instances(instances) + cu.deploy_configs(cluster) + if c_helper.is_swift_enabled(cluster): + _configure_swift(instances) + + +def scale_cluster(cluster, instances): + if not cmd.is_pre_installed_cdh(pu.get_manager(cluster).remote()): + _configure_os(instances) + _install_packages(instances, PACKAGES) + _post_install(instances) + + _start_cloudera_agents(instances) + for instance in instances: + _configure_instance(instance) + cu.update_configs(instance) + + if 'DATANODE' in instance.node_group.node_processes: + cu.refresh_nodes(cluster, 'DATANODE', cu.HDFS_SERVICE_NAME) + + _configure_swift_to_inst(instance) + + if 'DATANODE' in instance.node_group.node_processes: + hdfs = cu.get_service('DATANODE', instance=instance) + cu.start_roles(hdfs, cu.get_role_name(instance, 'DATANODE')) + + if 'NODEMANAGER' in instance.node_group.node_processes: + yarn = cu.get_service('NODEMANAGER', instance=instance) + cu.start_roles(yarn, cu.get_role_name(instance, 'NODEMANAGER')) + + +def decomission_cluster(cluster, instances): + dns = [] + nms = [] + for i in instances: + if 'DATANODE' in i.node_group.node_processes: + dns.append(cu.get_role_name(i, 'DATANODE')) + if 'NODEMANAGER' in i.node_group.node_processes: + nms.append(cu.get_role_name(i, 'NODEMANAGER')) + + if dns: + cu.decomission_nodes(cluster, 'DATANODE', dns) + + if nms: + cu.decomission_nodes(cluster, 'NODEMANAGER', nms) + + cu.delete_instances(cluster, instances) + + cu.refresh_nodes(cluster, 'DATANODE', cu.HDFS_SERVICE_NAME) + cu.refresh_nodes(cluster, 'NODEMANAGER', cu.YARN_SERVICE_NAME) + + +def _configure_os(instances): + with context.ThreadGroup() as tg: + for inst in instances: + tg.spawn('cdh-repo-conf-%s' % inst.instance_name, + _configure_repo_from_inst, inst) + + +def _configure_repo_from_inst(instance): + LOG.debug("Configure repos from instance '%(instance)s'" % { + 'instance': instance.instance_name}) + cluster = instance.node_group.cluster + + cdh5_repo = c_helper.get_cdh5_repo_url(cluster) + cdh5_key = c_helper.get_cdh5_key_url(cluster) + cm5_repo = c_helper.get_cm5_repo_url(cluster) + cm5_key = c_helper.get_cm5_key_url(cluster) + + with instance.remote() as r: + if cmd.is_ubuntu_os(r): + cdh5_repo = cdh5_repo or c_helper.DEFAULT_CDH5_UBUNTU_REPO_LIST_URL + cdh5_key = cdh5_key or c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL + cm5_repo = cm5_repo or c_helper.DEFAULT_CM5_UBUNTU_REPO_LIST_URL + cm5_key = cm5_key or c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL + + cmd.add_ubuntu_repository(r, cdh5_repo, 'cdh') + cmd.add_apt_key(r, cdh5_key) + cmd.add_ubuntu_repository(r, cm5_repo, 'cm') + cmd.add_apt_key(r, cm5_key) + cmd.update_repository(r) + + if cmd.is_centos_os(r): + cdh5_repo = cdh5_repo or c_helper.DEFAULT_CDH5_CENTOS_REPO_LIST_URL + cm5_repo = cm5_repo or c_helper.DEFAULT_CM5_CENTOS_REPO_LIST_URL + + cmd.add_centos_repository(r, cdh5_repo, 'cdh') + cmd.add_centos_repository(r, cm5_repo, 'cm') + + +def _install_packages(instances, packages): + with context.ThreadGroup() as tg: + for i in instances: + tg.spawn('cdh-inst-pkgs-%s' % i.instance_name, + _install_pkgs, i, packages) + + +def _install_pkgs(instance, packages): + with instance.remote() as r: + cmd.install_packages(r, packages) + + +def _post_install(instances): + with context.ThreadGroup() as tg: + for i in instances: + tg.spawn('cdh-post-inst-%s' % i.instance_name, + _stop_services, i) + + +def _stop_services(instance): + with instance.remote() as r: + cmd.stop_resourcemanager(r) + cmd.stop_nodemanager(r) + cmd.stop_historyserver(r) + + +def _start_cloudera_agents(instances): + with context.ThreadGroup() as tg: + for i in instances: + tg.spawn('cdh-agent-start-%s' % i.instance_name, + _start_cloudera_agent, i) + + +def _start_cloudera_agent(instance): + mng_hostname = pu.get_manager(instance.node_group.cluster).hostname() + with instance.remote() as r: + cmd.configure_agent(r, mng_hostname) + cmd.start_agent(r) + + +def _start_cloudera_manager(cluster): + manager = pu.get_manager(cluster) + with manager.remote() as r: + cmd.start_cloudera_db(r) + cmd.start_manager(r) + + timeout = 300 + LOG.debug("Waiting %(timeout)s seconds for Manager to start : " % { + 'timeout': timeout}) + s_time = timeutils.utcnow() + while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout: + try: + conn = telnetlib.Telnet(manager.management_ip, CM_API_PORT) + conn.close() + break + except IOError: + context.sleep(2) + else: + message = _("Cloudera Manager failed to start in %(timeout)s minutes " + "on node '%(node)s' of cluster '%(cluster)s'") % { + 'timeout': timeout / 60, + 'node': manager.management_ip, + 'cluster': cluster.name} + raise ex.HadoopProvisionError(message) + + LOG.info(_LI("Cloudera Manager has been started")) + + +def _create_services(cluster): + api = cu.get_api_client(cluster) + + cm_cluster = api.create_cluster(cluster.name, CDH_VERSION) + + cm_cluster.create_service(cu.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE) + cm_cluster.create_service(cu.YARN_SERVICE_NAME, YARN_SERVICE_TYPE) + cm_cluster.create_service(cu.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE) + + +def _configure_services(cluster): + cm_cluster = cu.get_cloudera_cluster(cluster) + + hdfs = cm_cluster.get_service(cu.HDFS_SERVICE_NAME) + hdfs.update_config(_get_configs(HDFS_SERVICE_TYPE, cluster=cluster)) + + yarn = cm_cluster.get_service(cu.YARN_SERVICE_NAME) + yarn.update_config(_get_configs(YARN_SERVICE_TYPE, cluster=cluster)) + + oozie = cm_cluster.get_service(cu.OOZIE_SERVICE_NAME) + oozie.update_config(_get_configs(OOZIE_SERVICE_TYPE, cluster=cluster)) + + +def _configure_instances(instances): + for inst in instances: + _configure_instance(inst) + + +def _configure_instance(instance): + for process in instance.node_group.node_processes: + _add_role(instance, process) + + +def _add_role(instance, process): + if process in ['MANAGER']: + return + + service = cu.get_service(process, instance=instance) + role = service.create_role(cu.get_role_name(instance, process), + process, instance.fqdn()) + role.update_config(_get_configs(process, node_group=instance.node_group)) + + +def _configure_manager(cluster): + cu.create_mgmt_service(cluster) + + +def _configure_swift(instances): + with context.ThreadGroup() as tg: + for i in instances: + tg.spawn('cdh-swift-conf-%s' % i.instance_name, + _configure_swift_to_inst, i) + + +def _configure_swift_to_inst(instance): + cluster = instance.node_group.cluster + with instance.remote() as r: + r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % ( + c_helper.get_swift_lib_url(cluster), HADOOP_LIB_DIR)) + core_site = r.read_file_from(PATH_TO_CORE_SITE_XML) + configs = xmlutils.parse_hadoop_xml_with_name_and_value(core_site) + configs.extend(swift_helper.get_swift_configs()) + confs = dict((c['name'], c['value']) for c in configs) + new_core_site = xmlutils.create_hadoop_xml(confs) + r.write_file_to(PATH_TO_CORE_SITE_XML, new_core_site, run_as_root=True) + + +def start_cluster(cluster): + cm_cluster = cu.get_cloudera_cluster(cluster) + + hdfs = cm_cluster.get_service(cu.HDFS_SERVICE_NAME) + cu.format_namenode(hdfs) + cu.start_service(hdfs) + + yarn = cm_cluster.get_service(cu.YARN_SERVICE_NAME) + cu.create_yarn_job_history_dir(yarn) + cu.start_service(yarn) + + oozie = cm_cluster.get_service(cu.OOZIE_SERVICE_NAME) + cu.create_oozie_db(oozie) + cu.install_oozie_sharelib(oozie) + cu.start_service(oozie) diff --git a/sahara/plugins/cdh/plugin.py b/sahara/plugins/cdh/plugin.py new file mode 100644 index 00000000..bed5bd48 --- /dev/null +++ b/sahara/plugins/cdh/plugin.py @@ -0,0 +1,106 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara import conductor +from sahara import context +from sahara.plugins.cdh import config_helper as c_helper +from sahara.plugins.cdh import deploy as dp +from sahara.plugins.cdh import utils as cu +from sahara.plugins.cdh import validation as vl +from sahara.plugins import provisioning as p + +conductor = conductor.API + + +class CDHPluginProvider(p.ProvisioningPluginBase): + + def get_title(self): + return "Cloudera Plugin" + + def get_description(self): + return ("This plugin provides an ability to launch CDH clusters with" + "Cloudera Manager management console.") + + def get_versions(self): + return ['5'] + + def get_node_processes(self, hadoop_version): + return { + "CLOUDERA": ['MANAGER'], + "HDFS": [], + "NAMENODE": ['NAMENODE'], + "DATANODE": ['DATANODE'], + "SECONDARYNAMENODE": ['SECONDARYNAMENODE'], + "YARN": [], + "RESOURCEMANAGER": ['RESOURCEMANAGER'], + "NODEMANAGER": ['NODEMANAGER'], + "JOBHISTORY": ['JOBHISTORY'], + "OOZIE": ['OOZIE_SERVER'] + } + + def get_configs(self, hadoop_version): + return c_helper.get_plugin_configs() + + def configure_cluster(self, cluster): + dp.configure_cluster(cluster) + + def start_cluster(self, cluster): + dp.start_cluster(cluster) + + self._set_cluster_info(cluster) + + def validate(self, cluster): + vl.validate_cluster_creating(cluster) + + def scale_cluster(self, cluster, instances): + dp.scale_cluster(cluster, instances) + + def decommission_nodes(self, cluster, instances): + dp.decomission_cluster(cluster, instances) + + def validate_scaling(self, cluster, existing, additional): + vl.validate_existing_ng_scaling(cluster, existing) + vl.validate_additional_ng_scaling(cluster, additional) + + def get_hdfs_user(self): + return 'hdfs' + + def get_oozie_server(self, cluster): + return cu.get_oozie(cluster) + + def get_oozie_server_uri(self, cluster): + oozie_ip = cu.get_oozie(cluster).management_ip + return 'http://%s:11000/oozie' % oozie_ip + + def get_name_node_uri(self, cluster): + namenode_ip = cu.get_namenode(cluster).fqdn() + return 'hdfs://%s:8020' % namenode_ip + + def get_resource_manager_uri(self, cluster): + resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn() + return '%s:8032' % resourcemanager_ip + + def _set_cluster_info(self, cluster): + mng = cu.get_manager(cluster) + info = { + 'Cloudera Manager': { + 'Web UI': 'http://%s:7180' % mng.management_ip, + 'Username': 'admin', + 'Password': 'admin' + } + } + + ctx = context.ctx() + conductor.cluster_update(ctx, cluster, {'info': info}) diff --git a/sahara/plugins/cdh/resources/cdh_config.py b/sahara/plugins/cdh/resources/cdh_config.py new file mode 100644 index 00000000..59f84ca3 --- /dev/null +++ b/sahara/plugins/cdh/resources/cdh_config.py @@ -0,0 +1,90 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from cm_api import api_client + + +# -- cm config -- + +cm_address = 'localhost' +cm_port = 7180 +cm_username = 'admin' +cm_password = 'admin' + +hdfs_service_name = 'hdfs01' +yarn_service_name = 'yarn01' +oozie_service_name = 'oozie01' + + +def get_cm_api(): + return api_client.ApiResource(cm_address, server_port=cm_port, + username=cm_username, password=cm_password) + + +def get_cluster(api): + return api.get_all_clusters()[0] + + +def process_service(service, service_name): + for role_cfgs in service.get_all_role_config_groups(): + role_cm_cfg = role_cfgs.get_config(view='full') + role_cfg = parse_config(role_cm_cfg) + role_name = role_cfgs.displayName.split(' ')[0].lower() + write_cfg(role_cfg, '%s-%s.json' % (service_name, role_name)) + + service_cm_cfg = service.get_config(view='full')[0] + service_cfg = parse_config(service_cm_cfg) + write_cfg(service_cfg, '%s-service.json' % service_name) + + +def parse_config(config): + cfg = [] + for name, value in config.iteritems(): + p = { + 'name': value.name, + 'value': value.default, + 'display_name': value.displayName, + 'desc': value.description + } + cfg.append(p) + + return cfg + + +def write_cfg(cfg, file_name): + to_write = json.dumps(cfg, sort_keys=True, indent=4, + separators=(',', ': ')) + + with open(file_name, 'w') as f: + f.write(to_write) + + +def main(): + client = get_cm_api() + cluster = get_cluster(client) + + hdfs = cluster.get_service(hdfs_service_name) + process_service(hdfs, 'hdfs') + + yarn = cluster.get_service(yarn_service_name) + process_service(yarn, 'yarn') + + oozie = cluster.get_service(oozie_service_name) + process_service(oozie, 'oozie') + +if __name__ == '__main__': + main() diff --git a/sahara/plugins/cdh/resources/cdh_config.sh b/sahara/plugins/cdh/resources/cdh_config.sh new file mode 100755 index 00000000..4cfb7b7a --- /dev/null +++ b/sahara/plugins/cdh/resources/cdh_config.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +tox -evenv -- python $(dirname $0)/cdh_config.py $* diff --git a/sahara/plugins/cdh/resources/hdfs-balancer.json b/sahara/plugins/cdh/resources/hdfs-balancer.json new file mode 100644 index 00000000..f2f777d7 --- /dev/null +++ b/sahara/plugins/cdh/resources/hdfs-balancer.json @@ -0,0 +1,44 @@ +[ + { + "desc": "These arguments will be passed as part of the Java command line. Commonly, garbage collection flags or extra debugging flags would be passed here.", + "display_name": "Java Configuration Options for Balancer", + "name": "balancer_java_opts", + "value": "" + }, + { + "desc": "

This file contains the rules which govern how log messages are turned into events by the custom log4j appender that this role loads. It is in JSON format, and is composed of a list of rules. Every log message is evaluated against each of these rules in turn to decide whether or not to send an event for that message.

Each rule has some or all of the following fields:


Example:{\"alert\": false, \"rate\": 10, \"exceptiontype\": \"java.lang.StringIndexOutOfBoundsException\"}

This rule will send events to Cloudera Manager for every StringIndexOutOfBoundsException, up to a maximum of 10 every minute.

", + "display_name": "Rules to Extract Events from Log Files", + "name": "log_event_whitelist", + "value": "{\n \"version\": \"0\",\n \"rules\": [\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 1, \"threshold\":\"FATAL\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\": \".* is deprecated. Instead, use .*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\": \".* is deprecated. Use .* instead\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.io.IOException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.net.SocketException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.net.SocketClosedException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.io.EOFException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.nio.channels.CancelledKeyException\"},\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 2, \"exceptiontype\": \".*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"Unknown job [^ ]+ being deleted.*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"Error executing shell command .+ No such process.+\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\".*attempt to override final parameter.+\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"[^ ]+ is a deprecated filesystem name. Use.*\"},\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 1, \"threshold\":\"WARN\"}\n ]\n}\n" + }, + { + "desc": "For advanced use only, a string to be inserted into hdfs-site.xml for this role only.", + "display_name": "Balancer Advanced Configuration Snippet (Safety Valve) for hdfs-site.xml", + "name": "balancer_config_safety_valve", + "value": null + }, + { + "desc": "The policy that should be used to rebalance HDFS storage. The default DataNode policy balances the storage at the DataNode level. This is similar to the balancing policy from prior releases. The BlockPool policy balances the storage at the block pool level as well as at the Datanode level. The BlockPool policy is relevant only to a Federated HDFS service.", + "display_name": "Rebalancing Policy", + "name": "rebalancing_policy", + "value": "DataNode" + }, + { + "desc": "The percentage deviation from average utilization, after which a node will be rebalanced. (for example, '10.0' for 10%)", + "display_name": "Rebalancing Threshold", + "name": "rebalancer_threshold", + "value": "10.0" + }, + { + "desc": "When set, Cloudera Manager will send alerts when this entity's configuration changes.", + "display_name": "Enable Configuration Change Alerts", + "name": "enable_config_alerts", + "value": "false" + }, + { + "desc": "Maximum size for the Java Process heap memory. Passed to Java -Xmx. Measured in bytes.", + "display_name": "Java Heap Size of Balancer in Bytes", + "name": "balancer_java_heapsize", + "value": "1073741824" + } +] \ No newline at end of file diff --git a/sahara/plugins/cdh/resources/hdfs-datanode.json b/sahara/plugins/cdh/resources/hdfs-datanode.json new file mode 100644 index 00000000..46c9fb2c --- /dev/null +++ b/sahara/plugins/cdh/resources/hdfs-datanode.json @@ -0,0 +1,380 @@ +[ + { + "desc": "The health test thresholds for monitoring of free space on the filesystem that contains this role's log directory. Specified as a percentage of the capacity on that filesystem. This setting is not used if a Log Directory Free Space Monitoring Absolute Thresholds setting is configured.", + "display_name": "Log Directory Free Space Monitoring Percentage Thresholds", + "name": "log_directory_free_space_percentage_thresholds", + "value": "{\"critical\":\"never\",\"warning\":\"never\"}" + }, + { + "desc": "

This file contains the rules which govern how log messages are turned into events by the custom log4j appender that this role loads. It is in JSON format, and is composed of a list of rules. Every log message is evaluated against each of these rules in turn to decide whether or not to send an event for that message.

Each rule has some or all of the following fields:


Example:{\"alert\": false, \"rate\": 10, \"exceptiontype\": \"java.lang.StringIndexOutOfBoundsException\"}

This rule will send events to Cloudera Manager for every StringIndexOutOfBoundsException, up to a maximum of 10 every minute.

", + "display_name": "Rules to Extract Events from Log Files", + "name": "log_event_whitelist", + "value": "{\n \"version\": \"0\",\n \"rules\": [\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 1, \"threshold\":\"FATAL\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\": \".* is deprecated. Instead, use .*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\": \".* is deprecated. Use .* instead\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.io.IOException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.net.SocketException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.net.SocketClosedException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.io.EOFException\"},\n {\"alert\": false, \"rate\": 0, \"exceptiontype\": \"java.nio.channels.CancelledKeyException\"},\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 5, \"content\":\"Datanode registration failed\"},\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 2, \"exceptiontype\": \".*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"Got a command from standby NN - ignoring command:.*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"Unknown job [^ ]+ being deleted.*\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"Error executing shell command .+ No such process.+\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\".*attempt to override final parameter.+\"},\n {\"alert\": false, \"rate\": 0, \"threshold\":\"WARN\", \"content\":\"[^ ]+ is a deprecated filesystem name. Use.*\"},\n {\"alert\": false, \"rate\": 1, \"periodminutes\": 1, \"threshold\":\"WARN\"}\n ]\n}\n" + }, + { + "desc": "Specifies the maximum number of threads to use for transferring data in and out of the DataNode.", + "display_name": "Maximum Number of Transfer Threads", + "name": "dfs_datanode_max_xcievers", + "value": "4096" + }, + { + "desc": "Comma-separated list of DataNode plug-ins to be activated. If one plug-in cannot be loaded, all the plug-ins are ignored.", + "display_name": "DateNode Plugins", + "name": "dfs_datanode_plugins_list", + "value": "" + }, + { + "desc": "Weight for the read I/O requests issued by this role. The greater the weight, the higher the priority of the requests when the host experiences I/O contention. Must be between 100 and 1000. Defaults to 1000 for processes not managed by Cloudera Manager.", + "display_name": "Cgroup I/O Weight", + "name": "rm_io_weight", + "value": "500" + }, + { + "desc": "In some workloads, the data read from HDFS is known to be significantly large enough that it is unlikely to be useful to cache it in the operating system buffer cache. In this case, the DataNode may be configured to automatically purge all data from the buffer cache after it is delivered to the client. This may improve performance for some workloads by freeing buffer cache spare usage for more cacheable data. This behavior will always be disabled for workloads that read only short sections of a block (e.g HBase random-IO workloads). This property is supported in CDH3u3 or later deployments.", + "display_name": "Enable purging cache after reads", + "name": "dfs_datanode_drop_cache_behind_reads", + "value": "false" + }, + { + "desc": "Number of CPU shares to assign to this role. The greater the number of shares, the larger the share of the host's CPUs that will be given to this role when the host experiences CPU contention. Must be between 2 and 262144. Defaults to 1024 for processes not managed by Cloudera Manager.", + "display_name": "Cgroup CPU Shares", + "name": "rm_cpu_shares", + "value": "1024" + }, + { + "desc": "Comma-delimited list of directories on the local file system where the DataNode stores HDFS block data. Typical values are /data/N/dfs/dn for N = 1, 2, 3... These directories should be mounted using the noatime option and the disks should be configured using JBOD. RAID is not recommended.", + "display_name": "DataNode Data Directory", + "name": "dfs_data_dir_list", + "value": null + }, + { + "desc": "The number of volumes that are allowed to fail before a DataNode stops offering service. By default, any volume failure will cause a DataNode to shutdown.", + "display_name": "DataNode Failed Volumes Tolerated", + "name": "dfs_datanode_failed_volumes_tolerated", + "value": "0" + }, + { + "desc": "In some workloads, the data written to HDFS is known to be significantly large enough that it is unlikely to be useful to cache it in the operating system buffer cache. In this case, the DataNode may be configured to automatically purge all data from the buffer cache after it is written to disk. This may improve performance for some workloads by freeing buffer cache spare usage for more cacheable data. This property is supported in CDH3u3 or later deployments.", + "display_name": "Enable purging cache after writes", + "name": "dfs_datanode_drop_cache_behind_writes", + "value": "false" + }, + { + "desc": "If enabled, the DataNode binds to the wildcard address (\"0.0.0.0\") on all of its ports.", + "display_name": "Bind DataNode to Wildcard Address", + "name": "dfs_datanode_bind_wildcard", + "value": "false" + }, + { + "desc": "The number of server threads for the DataNode.", + "display_name": "Handler Count", + "name": "dfs_datanode_handler_count", + "value": "3" + }, + { + "desc": "When computing the overall DataNode health, consider the host's health.", + "display_name": "DataNode Host Health Test", + "name": "datanode_host_health_enabled", + "value": "true" + }, + { + "desc": "When set, this role's process is automatically (and transparently) restarted in the event of an unexpected failure.", + "display_name": "Automatically Restart Process", + "name": "process_auto_restart", + "value": "true" + }, + { + "desc": "The maximum number of rolled log files to keep for DataNode logs. Typically used by log4j.", + "display_name": "DataNode Maximum Log File Backups", + "name": "max_log_backup_index", + "value": "10" + }, + { + "desc": "

The configured triggers for this role. This is a JSON formatted list of triggers. These triggers are evaluated as part as the health system. Every trigger expression is parsed, and if the trigger condition is met, the list of actions provided in the trigger expression is executed.

Each trigger has all of the following fields: