From 53eaa64263e2b1cd3ed404a274c26595e04552e4 Mon Sep 17 00:00:00 2001 From: Andrey Pavlov Date: Fri, 17 Jul 2015 13:19:14 +0300 Subject: [PATCH] Formatting and mounting methods changed for ironic Currently only cinder volumes can be formatted and mounted. This patch provides an ability of formatting and mounting any attached storage devices. Also storage_paths method was moved from node group objects to instance and corresponding fixes in plugins was made. Closes-bug: #1478899 Closes-bug: #1478904 Change-Id: If92d6fdea25e374d6d5c404be5ac3677cb60c057 --- sahara/conductor/manager.py | 3 +- sahara/conductor/objects.py | 23 +++-- .../versions/028_storage_devices_number.py | 35 +++++++ sahara/db/sqlalchemy/models.py | 1 + sahara/plugins/ambari/configs.py | 9 +- sahara/plugins/ambari/deploy.py | 26 ++--- sahara/plugins/cdh/cloudera_utils.py | 4 +- sahara/plugins/cdh/v5/cloudera_utils.py | 12 +-- sahara/plugins/cdh/v5_3_0/cloudera_utils.py | 12 +-- sahara/plugins/cdh/v5_4_0/cloudera_utils.py | 12 +-- sahara/plugins/hdp/clusterspec.py | 5 - .../hdp/versions/version_2_0_6/services.py | 16 ++- sahara/plugins/mapr/services/maprfs/maprfs.py | 2 +- sahara/plugins/spark/plugin.py | 59 ++++++----- sahara/plugins/vanilla/hadoop2/config.py | 22 ++--- sahara/service/volumes.py | 97 +++++++++++++------ .../unit/db/migration/test_migrations.py | 3 + .../tests/unit/plugins/ambari/test_configs.py | 18 ++-- .../tests/unit/plugins/hdp/hdp_test_base.py | 9 +- .../tests/unit/plugins/hdp/test_services.py | 33 ++++--- sahara/tests/unit/service/test_volumes.py | 40 +++++++- 21 files changed, 276 insertions(+), 165 deletions(-) create mode 100644 sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index e3c35080..16e4a667 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -55,7 +55,8 @@ NODE_GROUP_TEMPLATE_DEFAULTS.update({"is_public": False, "is_protected": False}) INSTANCE_DEFAULTS = { - "volumes": [] + "volumes": [], + "storage_devices_number": 0 } DATA_SOURCE_DEFAULTS = { diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 1ab88de4..6c8566eb 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -131,18 +131,6 @@ class NodeGroup(object): return configs.merge_configs(self.cluster.cluster_configs, self.node_configs) - def storage_paths(self): - mp = [] - for idx in range(1, self.volumes_per_node + 1): - mp.append(self.volume_mount_prefix + str(idx)) - - # Here we assume that NG's instances use ephemeral - # drives for storage if volumes_per_node == 0 - if not mp: - mp = ['/mnt'] - - return mp - def get_image_id(self): return self.image_id or self.cluster.default_image_id @@ -158,6 +146,7 @@ class Instance(object): internal_ip management_ip volumes + storage_devices_number """ def hostname(self): @@ -169,6 +158,16 @@ class Instance(object): def remote(self): return remote.get_remote(self) + def storage_paths(self): + mp = [] + for idx in range(1, self.storage_devices_number + 1): + mp.append(self.node_group.volume_mount_prefix + str(idx)) + + if not mp: + mp = ['/mnt'] + + return mp + class ClusterTemplate(object): """An object representing Cluster Template. diff --git a/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py b/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py new file mode 100644 index 00000000..7cc521b6 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py @@ -0,0 +1,35 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add_storage_devices_number + +Revision ID: 028 +Revises: 027 +Create Date: 2015-07-20 16:56:23.562710 + +""" + +# revision identifiers, used by Alembic. +revision = '028' +down_revision = '027' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('instances', + sa.Column('storage_devices_number', sa.Integer(), + nullable=True)) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index 5a40c8ac..d1289fdf 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -157,6 +157,7 @@ class Instance(mb.SaharaBase): internal_ip = sa.Column(sa.String(45)) management_ip = sa.Column(sa.String(45)) volumes = sa.Column(st.JsonListType()) + storage_devices_number = sa.Column(sa.Integer) # Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation diff --git a/sahara/plugins/ambari/configs.py b/sahara/plugins/ambari/configs.py index 49f3991a..685cad7d 100644 --- a/sahara/plugins/ambari/configs.py +++ b/sahara/plugins/ambari/configs.py @@ -167,10 +167,10 @@ def _make_paths(dirs, suffix): return ",".join([d + suffix for d in dirs]) -def get_ng_params(node_group): - configs = _create_ambari_configs(node_group.node_configs, - node_group.cluster.hadoop_version) - storage_paths = node_group.storage_paths() +def get_instance_params(inst): + configs = _create_ambari_configs(inst.node_group.node_configs, + inst.node_group.cluster.hadoop_version) + storage_paths = inst.storage_paths() configs.setdefault("hdfs-site", {}) configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths( storage_paths, "/hdfs/data") @@ -188,7 +188,6 @@ def get_ng_params(node_group): configs["yarn-site"][ "yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths( storage_paths, "/yarn/timeline") - return _serialize_ambari_configs(configs) diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py index d244fa03..ae68e856 100644 --- a/sahara/plugins/ambari/deploy.py +++ b/sahara/plugins/ambari/deploy.py @@ -183,16 +183,17 @@ def create_blueprint(cluster): cluster = conductor.cluster_get(context.ctx(), cluster.id) host_groups = [] for ng in cluster.node_groups: - hg = { - "name": ng.name, - "configurations": configs.get_ng_params(ng), - "components": [] - } procs = p_common.get_ambari_proc_list(ng) procs.extend(p_common.get_clients(cluster)) - for proc in procs: - hg["components"].append({"name": proc}) - host_groups.append(hg) + for instance in ng.instances: + hg = { + "name": instance.instance_name, + "configurations": configs.get_instance_params(instance), + "components": [] + } + for proc in procs: + hg["components"].append({"name": proc}) + host_groups.append(hg) bp = { "Blueprints": { "stack_name": "HDP", @@ -214,10 +215,11 @@ def start_cluster(cluster): "host_groups": [] } for ng in cluster.node_groups: - cl_tmpl["host_groups"].append({ - "name": ng.name, - "hosts": map(lambda x: {"fqdn": x.fqdn()}, ng.instances) - }) + for instance in ng.instances: + cl_tmpl["host_groups"].append({ + "name": instance.instance_name, + "hosts": [{"fqdn": instance.fqdn()}] + }) ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) password = cluster.extra["ambari_password"] with ambari_client.AmbariClient(ambari, password=password) as client: diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index 364700cf..ea184d34 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -284,7 +284,7 @@ class ClouderaUtils(object): role = service.create_role(self.pu.get_role_name(instance, process), role_type, instance.fqdn()) role.update_config(self._get_configs(process, cluster, - node_group=instance.node_group)) + instance=instance)) def get_cloudera_manager_info(self, cluster): mng = self.pu.get_manager(cluster) @@ -297,6 +297,6 @@ class ClouderaUtils(object): } return info - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): # Defined in derived class. return diff --git a/sahara/plugins/cdh/v5/cloudera_utils.py b/sahara/plugins/cdh/v5/cloudera_utils.py index 34bfc113..b14f646a 100644 --- a/sahara/plugins/cdh/v5/cloudera_utils.py +++ b/sahara/plugins/cdh/v5/cloudera_utils.py @@ -145,7 +145,7 @@ class ClouderaUtilsV5(cu.ClouderaUtils): hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -214,10 +214,10 @@ class ClouderaUtilsV5(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, hive_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -240,8 +240,8 @@ class ClouderaUtilsV5(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py index 509b87ca..1b8b399d 100644 --- a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py @@ -205,7 +205,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils): impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -344,10 +344,10 @@ class ClouderaUtilsV530(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -382,8 +382,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py index 54ec974b..42b01565 100644 --- a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py @@ -222,7 +222,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils): kms.update_config(self._get_configs(KMS_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -363,10 +363,10 @@ class ClouderaUtilsV540(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -401,9 +401,9 @@ class ClouderaUtilsV540(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/hdp/clusterspec.py b/sahara/plugins/hdp/clusterspec.py index cd346fca..cb8fc8ef 100644 --- a/sahara/plugins/hdp/clusterspec.py +++ b/sahara/plugins/hdp/clusterspec.py @@ -210,7 +210,6 @@ class ClusterSpec(object): node_group.count = ng.count node_group.id = ng.id node_group.components = ng.node_processes[:] - node_group.ng_storage_paths = ng.storage_paths() for instance in ng.instances: node_group.instances.add(Instance(instance)) self.node_groups[node_group.name] = node_group @@ -266,14 +265,10 @@ class NodeGroup(object): self.cardinality = None self.count = None self.instances = set() - self.ng_storage_paths = [] def add_component(self, component): self.components.append(component) - def storage_paths(self): - return self.ng_storage_paths - class User(object): def __init__(self, name, password, groups): diff --git a/sahara/plugins/hdp/versions/version_2_0_6/services.py b/sahara/plugins/hdp/versions/version_2_0_6/services.py index a88a7a59..16c81447 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/services.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/services.py @@ -102,16 +102,12 @@ class Service(object): config[prop_name] = value def _get_common_paths(self, node_groups): - if len(node_groups) == 1: - paths = node_groups[0].storage_paths() - else: - sets = [set(ng.storage_paths()) for ng in node_groups] - paths = list(set.intersection(*sets)) + sets = [] + for node_group in node_groups: + for instance in node_group.instances: + sets.append(set(instance.sahara_instance.storage_paths())) - if len(paths) > 1 and '/mnt' in paths: - paths.remove('/mnt') - - return paths + return list(set.intersection(*sets)) if sets else [] def _generate_storage_path(self, storage_paths, path): return ",".join([p + path for p in storage_paths]) @@ -201,7 +197,7 @@ class HdfsService(Service): hdfs_site_config = cluster_spec.configurations['hdfs-site'] hdfs_site_config['dfs.namenode.name.dir'] = ( self._generate_storage_path( - nn_ng.storage_paths(), '/hadoop/hdfs/namenode')) + self._get_common_paths([nn_ng]), '/hadoop/hdfs/namenode')) if common_paths: hdfs_site_config['dfs.datanode.data.dir'] = ( self._generate_storage_path( diff --git a/sahara/plugins/mapr/services/maprfs/maprfs.py b/sahara/plugins/mapr/services/maprfs/maprfs.py index 6cd211b6..bea23fe3 100644 --- a/sahara/plugins/mapr/services/maprfs/maprfs.py +++ b/sahara/plugins/mapr/services/maprfs/maprfs.py @@ -107,7 +107,7 @@ class MapRFS(s.Service): def _generate_disk_list_file(self, instance, path_to_disk_setup_script): LOG.debug('Creating disk list file') g.run_script(instance, path_to_disk_setup_script, 'root', - *instance.node_group.storage_paths()) + *instance.storage_paths()) def _execute_disksetup(self, instance): with instance.remote() as rmt: diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index af86f732..c143bafb 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -163,7 +163,6 @@ class SparkProvider(p.ProvisioningPluginBase): cluster) def _extract_configs_to_extra(self, cluster): - nn = utils.get_instance(cluster, "namenode") sp_master = utils.get_instance(cluster, "master") sp_slaves = utils.get_instances(cluster, "slave") @@ -186,22 +185,10 @@ class SparkProvider(p.ProvisioningPluginBase): config_defaults = c_helper.generate_spark_executor_classpath(cluster) extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) - for ng in cluster.node_groups: - extra[ng.id] = { - 'xml': c_helper.generate_xml_configs( - ng.configuration(), - ng.storage_paths(), - nn.hostname(), None - ), - 'setup_script': c_helper.generate_hadoop_setup_script( - ng.storage_paths(), - c_helper.extract_hadoop_environment_confs( - ng.configuration()) - ), - 'sp_master': config_master, - 'sp_slaves': config_slaves, - 'sp_defaults': config_defaults - } + + extra['sp_master'] = config_master + extra['sp_slaves'] = config_slaves + extra['sp_defaults'] = config_defaults if c_helper.is_data_locality_enabled(cluster): topology_data = th.generate_topology_map( @@ -211,6 +198,19 @@ class SparkProvider(p.ProvisioningPluginBase): return extra + def _add_instance_ng_related_to_extra(self, cluster, instance, extra): + extra = extra.copy() + ng = instance.node_group + nn = utils.get_instance(cluster, "namenode") + + extra['xml'] = c_helper.generate_xml_configs( + ng.configuration(), instance.storage_paths(), nn.hostname(), None) + extra['setup_script'] = c_helper.generate_hadoop_setup_script( + instance.storage_paths(), + c_helper.extract_hadoop_environment_confs(ng.configuration())) + + return extra + def _start_datanode_processes(self, dn_instances): if len(dn_instances) == 0: return @@ -243,6 +243,8 @@ class SparkProvider(p.ProvisioningPluginBase): cluster.id, _("Push configs to nodes"), len(all_instances)) with context.ThreadGroup() as tg: for instance in all_instances: + extra = self._add_instance_ng_related_to_extra( + cluster, instance, extra) if instance in new_instances: tg.spawn('spark-configure-%s' % instance.instance_name, self._push_configs_to_new_node, cluster, @@ -254,25 +256,23 @@ class SparkProvider(p.ProvisioningPluginBase): @cpo.event_wrapper(mark_successful_on_exit=True) def _push_configs_to_new_node(self, cluster, extra, instance): - ng_extra = extra[instance.node_group.id] - files_hadoop = { os.path.join(c_helper.HADOOP_CONF_DIR, - "core-site.xml"): ng_extra['xml']['core-site'], + "core-site.xml"): extra['xml']['core-site'], os.path.join(c_helper.HADOOP_CONF_DIR, - "hdfs-site.xml"): ng_extra['xml']['hdfs-site'], + "hdfs-site.xml"): extra['xml']['hdfs-site'], } sp_home = self._spark_home(cluster) files_spark = { - os.path.join(sp_home, 'conf/spark-env.sh'): ng_extra['sp_master'], - os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], + os.path.join(sp_home, 'conf/spark-env.sh'): extra['sp_master'], + os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'], os.path.join(sp_home, - 'conf/spark-defaults.conf'): ng_extra['sp_defaults'] + 'conf/spark-defaults.conf'): extra['sp_defaults'] } files_init = { - '/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'], + '/tmp/sahara-hadoop-init.sh': extra['setup_script'], 'id_rsa': cluster.management_private_key, 'authorized_keys': cluster.management_public_key } @@ -283,7 +283,7 @@ class SparkProvider(p.ProvisioningPluginBase): 'sudo chown $USER $HOME/.ssh/id_rsa; ' 'sudo chmod 600 $HOME/.ssh/id_rsa') - storage_paths = instance.node_group.storage_paths() + storage_paths = instance.storage_paths() dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, '/dfs/dn')) nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, @@ -336,15 +336,14 @@ class SparkProvider(p.ProvisioningPluginBase): 'slave' in node_processes) if need_update_spark: - ng_extra = extra[instance.node_group.id] sp_home = self._spark_home(cluster) files = { os.path.join(sp_home, - 'conf/spark-env.sh'): ng_extra['sp_master'], - os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], + 'conf/spark-env.sh'): extra['sp_master'], + os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'], os.path.join( sp_home, - 'conf/spark-defaults.conf'): ng_extra['sp_defaults'] + 'conf/spark-defaults.conf'): extra['sp_defaults'] } r = remote.get_remote(instance) r.write_files_to(files) diff --git a/sahara/plugins/vanilla/hadoop2/config.py b/sahara/plugins/vanilla/hadoop2/config.py index ed6ce8d8..78c482f4 100644 --- a/sahara/plugins/vanilla/hadoop2/config.py +++ b/sahara/plugins/vanilla/hadoop2/config.py @@ -73,24 +73,25 @@ def _configure_instance(pctx, instance): def _provisioning_configs(pctx, instance): - xmls, env = _generate_configs(pctx, instance.node_group) + xmls, env = _generate_configs(pctx, instance) _push_xml_configs(instance, xmls) _push_env_configs(instance, env) -def _generate_configs(pctx, node_group): - hadoop_xml_confs = _get_hadoop_configs(pctx, node_group) - user_xml_confs, user_env_confs = _get_user_configs(pctx, node_group) +def _generate_configs(pctx, instance): + hadoop_xml_confs = _get_hadoop_configs(pctx, instance) + user_xml_confs, user_env_confs = _get_user_configs( + pctx, instance.node_group) xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs) env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs) return xml_confs, env_confs -def _get_hadoop_configs(pctx, node_group): - cluster = node_group.cluster +def _get_hadoop_configs(pctx, instance): + cluster = instance.node_group.cluster nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster)) - dirs = _get_hadoop_dirs(node_group) + dirs = _get_hadoop_dirs(instance) confs = { 'Hadoop': { 'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname @@ -282,8 +283,7 @@ def _push_configs_to_instance(instance, configs): def _post_configuration(pctx, instance): - node_group = instance.node_group - dirs = _get_hadoop_dirs(node_group) + dirs = _get_hadoop_dirs(instance) args = { 'hadoop_user': HADOOP_USER, 'hadoop_group': HADOOP_GROUP, @@ -313,9 +313,9 @@ def _post_configuration(pctx, instance): r.execute_command('chmod +x ' + t_script, run_as_root=True) -def _get_hadoop_dirs(node_group): +def _get_hadoop_dirs(instance): dirs = {} - storage_paths = node_group.storage_paths() + storage_paths = instance.storage_paths() dirs['hadoop_name_dirs'] = _make_hadoop_paths( storage_paths, '/hdfs/namenode') dirs['hadoop_data_dirs'] = _make_hadoop_paths( diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index e97584ce..cb0a4eb5 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +import threading + from oslo_config import cfg from oslo_log import log as logging @@ -21,6 +24,7 @@ from sahara import context from sahara import exceptions as ex from sahara.i18n import _ from sahara.i18n import _LE +from sahara.i18n import _LW from sahara.utils import cluster_progress_ops as cpo from sahara.utils.openstack import base as b from sahara.utils.openstack import cinder @@ -51,6 +55,7 @@ def _count_volumes_to_mount(instances): def attach_to_instances(instances): instances_to_attach = _count_instances_to_attach(instances) if instances_to_attach == 0: + mount_to_instances(instances) return cpo.add_provisioning_step( @@ -66,6 +71,8 @@ def attach_to_instances(instances): % instance.instance_name, _attach_volumes_to_node, instance.node_group, instance) + mount_to_instances(instances) + @poll_utils.poll_status( 'await_attach_volumes', _("Await for attaching volumes to instances"), @@ -91,13 +98,6 @@ def _attach_volumes_to_node(node_group, instance): _await_attach_volumes(instance, devices) - paths = instance.node_group.storage_paths() - for idx in range(0, instance.node_group.volumes_per_node): - LOG.debug("Mounting volume {volume} to instance" - .format(volume=devices[idx])) - _mount_volume(instance, devices[idx], paths[idx]) - LOG.debug("Mounted volume to instance") - @poll_utils.poll_status( 'volume_available_timeout', _("Await for volume become available"), @@ -155,51 +155,90 @@ def mount_to_instances(instances): instances[0].cluster_id, _("Mount volumes to instances"), _count_volumes_to_mount(instances)) - with context.ThreadGroup() as tg: - for instance in instances: - with context.set_current_instance_id(instance.instance_id): - devices = _find_instance_volume_devices(instance) + for instance in instances: + with context.set_current_instance_id(instance.instance_id): + devices = _find_instance_devices(instance) + formatted_devices = [] + lock = threading.Lock() + with context.ThreadGroup() as tg: # Since formating can take several minutes (for large disks) # and can be done in parallel, launch one thread per disk. - for idx in range(0, instance.node_group.volumes_per_node): - tg.spawn( - 'mount-volume-%d-to-node-%s' % - (idx, instance.instance_name), - _mount_volume_to_node, instance, idx, devices[idx]) + for device in devices: + tg.spawn('format-device-%s' % device, _format_device, + instance, device, formatted_devices, lock) + + conductor.instance_update( + context.current(), instance, + {"storage_devices_number": len(formatted_devices)}) + for idx, dev in enumerate(formatted_devices): + _mount_volume_to_node(instance, idx+1, dev) -def _find_instance_volume_devices(instance): - volumes = b.execute_with_retries(nova.client().volumes.get_server_volumes, - instance.instance_id) - devices = [volume.device for volume in volumes] - return devices +def _find_instance_devices(instance): + with instance.remote() as r: + code, attached_info = r.execute_command( + "lsblk -r | awk '$6 ~ /disk/ || /part/ {print \"/dev/\" $1}'") + attached_dev = attached_info.split() + code, mounted_info = r.execute_command( + "mount | awk '$1 ~ /^\/dev/ {print $1}'") + mounted_dev = mounted_info.split() + + # filtering attached devices, that should not be mounted + for dev in attached_dev[:]: + idx = re.sub("\D", "", dev) + if idx: + if dev in mounted_dev: + attached_dev.remove(re.sub("\d", "", dev)) + attached_dev.remove(dev) + + for dev in attached_dev[:]: + if re.sub("\D", "", dev): + if re.sub("\d", "", dev) in attached_dev: + attached_dev.remove(dev) + + return attached_dev @cpo.event_wrapper(mark_successful_on_exit=True) -def _mount_volume_to_node(instance, idx, device): +def _mount_volume_to_node(instance, index, device): LOG.debug("Mounting volume {device} to instance".format(device=device)) - mount_point = instance.node_group.storage_paths()[idx] + mount_point = instance.node_group.volume_mount_prefix + str(index) _mount_volume(instance, device, mount_point) LOG.debug("Mounted volume to instance") +def _format_device(instance, device, formatted_devices=None, lock=None): + with instance.remote() as r: + try: + # Format devices with better performance options: + # - reduce number of blocks reserved for root to 1% + # - use 'dir_index' for faster directory listings + # - use 'extents' to work faster with large files + # - disable journaling + fs_opts = '-F -m 1 -O dir_index,extents,^has_journal' + r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device)) + if lock: + with lock: + formatted_devices.append(device) + except Exception: + LOG.warning( + _LW("Device {dev} cannot be formatted").format(dev=device)) + + def _mount_volume(instance, device_path, mount_point): with instance.remote() as r: try: # Mount volumes with better performance options: - # - reduce number of blocks reserved for root to 1% - # - use 'dir_index' for faster directory listings - # - use 'extents' to work faster with large files - # - disable journaling # - enable write-back # - do not store access time - fs_opts = '-m 1 -O dir_index,extents,^has_journal' mount_opts = '-o data=writeback,noatime,nodiratime' r.execute_command('sudo mkdir -p %s' % mount_point) - r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device_path)) r.execute_command('sudo mount %s %s %s' % (mount_opts, device_path, mount_point)) + r.execute_command( + 'sudo sh -c "grep %s /etc/mtab >> /etc/fstab"' % device_path) + except Exception: LOG.error(_LE("Error mounting volume to instance")) raise diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 0ccd3cb4..70582caf 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -540,6 +540,9 @@ class SaharaMigrationsCheckers(object): self.assertColumnExists(engine, 'job_executions', 'engine_job_id') + def _check_028(self, engine, data): + self.assertColumnExists(engine, 'instances', 'storage_devices_number') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/plugins/ambari/test_configs.py b/sahara/tests/unit/plugins/ambari/test_configs.py index 0a464314..07912307 100644 --- a/sahara/tests/unit/plugins/ambari/test_configs.py +++ b/sahara/tests/unit/plugins/ambari/test_configs.py @@ -30,8 +30,10 @@ class AmbariConfigsTestCase(base.SaharaTestCase): self.ng.node_configs = {} self.ng.cluster = mock.Mock() self.ng.cluster.hadoop_version = "2.2" - self.ng.storage_paths = mock.Mock() - self.ng.storage_paths.return_value = ["/data1", "/data2"] + self.instance = mock.Mock() + self.instance.node_group = self.ng + self.instance.storage_paths = mock.Mock() + self.instance.storage_paths.return_value = ["/data1", "/data2"] def assertConfigEqual(self, expected, actual): self.assertEqual(len(expected), len(actual)) @@ -45,8 +47,8 @@ class AmbariConfigsTestCase(base.SaharaTestCase): self.assertEqual(len(expected), len(cnt_ex)) self.assertEqual(len(actual), len(cnt_act)) - def test_get_ng_params_default(self): - ng_configs = configs.get_ng_params(self.ng) + def test_get_instance_params_default(self): + instance_configs = configs.get_instance_params(self.instance) expected = [ { "hdfs-site": { @@ -71,16 +73,16 @@ class AmbariConfigsTestCase(base.SaharaTestCase): } } ] - self.assertConfigEqual(expected, ng_configs) + self.assertConfigEqual(expected, instance_configs) - def test_get_ng_params(self): + def test_get_instance_params(self): self.ng.node_configs = { "YARN": { "mapreduce.map.java.opts": "-Dk=v", "yarn.scheduler.minimum-allocation-mb": "256" } } - ng_configs = configs.get_ng_params(self.ng) + instance_configs = configs.get_instance_params(self.instance) expected = [ { "hdfs-site": { @@ -111,4 +113,4 @@ class AmbariConfigsTestCase(base.SaharaTestCase): } } ] - self.assertConfigEqual(expected, ng_configs) + self.assertConfigEqual(expected, instance_configs) diff --git a/sahara/tests/unit/plugins/hdp/hdp_test_base.py b/sahara/tests/unit/plugins/hdp/hdp_test_base.py index 7541a786..aabc5e0f 100644 --- a/sahara/tests/unit/plugins/hdp/hdp_test_base.py +++ b/sahara/tests/unit/plugins/hdp/hdp_test_base.py @@ -29,6 +29,11 @@ class TestServer(object): self.public_ip = public_ip self.internal_ip = private_ip self.node_group = None + self.sahara_instance = self + self.storage_path = ['/mnt'] + + def storage_paths(self): + return self.storage_path def fqdn(self): return self.inst_fqdn @@ -81,10 +86,6 @@ class TestNodeGroup(object): self.node_processes = node_processes self.count = count self.id = name - self.ng_storage_paths = [] - - def storage_paths(self): - return self.ng_storage_paths class TestUserInputConfig(object): diff --git a/sahara/tests/unit/plugins/hdp/test_services.py b/sahara/tests/unit/plugins/hdp/test_services.py index 01a28700..6ddaac5b 100644 --- a/sahara/tests/unit/plugins/hdp/test_services.py +++ b/sahara/tests/unit/plugins/hdp/test_services.py @@ -763,25 +763,32 @@ class ServicesTest(base.SaharaTestCase): for version in versions: s = self.get_services_processor(version=version) service = s.create_service('AMBARI') - ng1 = hdp_test_base.TestNodeGroup(None, None, None) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1'] - ng2 = hdp_test_base.TestNodeGroup(None, None, None) - ng2.ng_storage_paths = ['/mnt'] + server1 = hdp_test_base.TestServer( + 'host1', 'test-master', '11111', 3, '1.1.1.1', '2.2.2.2') + server2 = hdp_test_base.TestServer( + 'host2', 'test-slave', '11111', 3, '3.3.3.3', '4.4.4.4') + server3 = hdp_test_base.TestServer( + 'host3', 'another-test', '11111', 3, '6.6.6.6', '5.5.5.5') + ng1 = hdp_test_base.TestNodeGroup('ng1', [server1], None) + ng2 = hdp_test_base.TestNodeGroup('ng2', [server2], None) + ng3 = hdp_test_base.TestNodeGroup('ng3', [server3], None) + + server1.storage_path = ['/volume/disk1'] + server2.storage_path = ['/mnt'] paths = service._get_common_paths([ng1, ng2]) - self.assertEqual(['/mnt'], paths) + self.assertEqual([], paths) - ng3 = hdp_test_base.TestNodeGroup(None, None, None) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] - ng2.ng_storage_paths = ['/mnt'] - ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] + server1.storage_path = ['/volume/disk1', '/volume/disk2'] + server2.storage_path = ['/mnt'] + server3.storage_path = ['/volume/disk1'] paths = service._get_common_paths([ng1, ng2, ng3]) - self.assertEqual(['/mnt'], paths) + self.assertEqual([], paths) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] - ng2.ng_storage_paths = ['/mnt', '/volume/disk1'] - ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] + server1.storage_path = ['/volume/disk1', '/volume/disk2'] + server2.storage_path = ['/volume/disk1'] + server3.storage_path = ['/volume/disk1'] paths = service._get_common_paths([ng1, ng2, ng3]) self.assertEqual(['/volume/disk1'], paths) diff --git a/sahara/tests/unit/service/test_volumes.py b/sahara/tests/unit/service/test_volumes.py index 5878ee13..3708312e 100644 --- a/sahara/tests/unit/service/test_volumes.py +++ b/sahara/tests/unit/service/test_volumes.py @@ -79,13 +79,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase): volumes.detach_from_instance(instance)) @base.mock_thread_group - @mock.patch('sahara.service.volumes._mount_volume') + @mock.patch('sahara.service.volumes.mount_to_instances') @mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._create_attach_volume') @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') - def test_attach(self, add_step, add_event, - p_create_attach_vol, p_await, p_mount): + def test_attach(self, add_step, add_event, p_create_attach_vol, p_await, + p_mount): p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_await.return_value = None p_mount.return_value = None @@ -115,7 +115,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase): volumes.attach_to_instances(cluster_utils.get_instances(cluster)) self.assertEqual(4, p_create_attach_vol.call_count) self.assertEqual(2, p_await.call_count) - self.assertEqual(4, p_mount.call_count) + self.assertEqual(1, p_mount.call_count) @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) @mock.patch('sahara.context.sleep') @@ -157,3 +157,35 @@ class TestAttachVolume(base.SaharaWithDbTestCase): inst.remote.return_value = inst_remote return inst + + def test_find_instance_volume_devices(self): + instance = self._get_instance() + ex_cmd = instance.remote().execute_command + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdc' + mounted_info = '/dev/vda1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb', '/dev/vdc'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1 /dev/vdb1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb2'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1 /dev/vdb2' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb1'], diff)