diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index e3c3508058..16e4a6673b 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -55,7 +55,8 @@ NODE_GROUP_TEMPLATE_DEFAULTS.update({"is_public": False, "is_protected": False}) INSTANCE_DEFAULTS = { - "volumes": [] + "volumes": [], + "storage_devices_number": 0 } DATA_SOURCE_DEFAULTS = { diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 1ab88de4ce..6c8566ebda 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -131,18 +131,6 @@ class NodeGroup(object): return configs.merge_configs(self.cluster.cluster_configs, self.node_configs) - def storage_paths(self): - mp = [] - for idx in range(1, self.volumes_per_node + 1): - mp.append(self.volume_mount_prefix + str(idx)) - - # Here we assume that NG's instances use ephemeral - # drives for storage if volumes_per_node == 0 - if not mp: - mp = ['/mnt'] - - return mp - def get_image_id(self): return self.image_id or self.cluster.default_image_id @@ -158,6 +146,7 @@ class Instance(object): internal_ip management_ip volumes + storage_devices_number """ def hostname(self): @@ -169,6 +158,16 @@ class Instance(object): def remote(self): return remote.get_remote(self) + def storage_paths(self): + mp = [] + for idx in range(1, self.storage_devices_number + 1): + mp.append(self.node_group.volume_mount_prefix + str(idx)) + + if not mp: + mp = ['/mnt'] + + return mp + class ClusterTemplate(object): """An object representing Cluster Template. diff --git a/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py b/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py new file mode 100644 index 0000000000..7cc521b632 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/028_storage_devices_number.py @@ -0,0 +1,35 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add_storage_devices_number + +Revision ID: 028 +Revises: 027 +Create Date: 2015-07-20 16:56:23.562710 + +""" + +# revision identifiers, used by Alembic. +revision = '028' +down_revision = '027' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('instances', + sa.Column('storage_devices_number', sa.Integer(), + nullable=True)) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index 5a40c8ac0f..d1289fdfcf 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -157,6 +157,7 @@ class Instance(mb.SaharaBase): internal_ip = sa.Column(sa.String(45)) management_ip = sa.Column(sa.String(45)) volumes = sa.Column(st.JsonListType()) + storage_devices_number = sa.Column(sa.Integer) # Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation diff --git a/sahara/plugins/ambari/configs.py b/sahara/plugins/ambari/configs.py index 49f3991a69..685cad7dee 100644 --- a/sahara/plugins/ambari/configs.py +++ b/sahara/plugins/ambari/configs.py @@ -167,10 +167,10 @@ def _make_paths(dirs, suffix): return ",".join([d + suffix for d in dirs]) -def get_ng_params(node_group): - configs = _create_ambari_configs(node_group.node_configs, - node_group.cluster.hadoop_version) - storage_paths = node_group.storage_paths() +def get_instance_params(inst): + configs = _create_ambari_configs(inst.node_group.node_configs, + inst.node_group.cluster.hadoop_version) + storage_paths = inst.storage_paths() configs.setdefault("hdfs-site", {}) configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths( storage_paths, "/hdfs/data") @@ -188,7 +188,6 @@ def get_ng_params(node_group): configs["yarn-site"][ "yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths( storage_paths, "/yarn/timeline") - return _serialize_ambari_configs(configs) diff --git a/sahara/plugins/ambari/deploy.py b/sahara/plugins/ambari/deploy.py index d244fa0322..ae68e8565e 100644 --- a/sahara/plugins/ambari/deploy.py +++ b/sahara/plugins/ambari/deploy.py @@ -183,16 +183,17 @@ def create_blueprint(cluster): cluster = conductor.cluster_get(context.ctx(), cluster.id) host_groups = [] for ng in cluster.node_groups: - hg = { - "name": ng.name, - "configurations": configs.get_ng_params(ng), - "components": [] - } procs = p_common.get_ambari_proc_list(ng) procs.extend(p_common.get_clients(cluster)) - for proc in procs: - hg["components"].append({"name": proc}) - host_groups.append(hg) + for instance in ng.instances: + hg = { + "name": instance.instance_name, + "configurations": configs.get_instance_params(instance), + "components": [] + } + for proc in procs: + hg["components"].append({"name": proc}) + host_groups.append(hg) bp = { "Blueprints": { "stack_name": "HDP", @@ -214,10 +215,11 @@ def start_cluster(cluster): "host_groups": [] } for ng in cluster.node_groups: - cl_tmpl["host_groups"].append({ - "name": ng.name, - "hosts": map(lambda x: {"fqdn": x.fqdn()}, ng.instances) - }) + for instance in ng.instances: + cl_tmpl["host_groups"].append({ + "name": instance.instance_name, + "hosts": [{"fqdn": instance.fqdn()}] + }) ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) password = cluster.extra["ambari_password"] with ambari_client.AmbariClient(ambari, password=password) as client: diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index 0a6e3c375c..7ce41ae2ba 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -284,7 +284,7 @@ class ClouderaUtils(object): role = service.create_role(self.pu.get_role_name(instance, process), role_type, instance.fqdn()) role.update_config(self._get_configs(process, cluster, - node_group=instance.node_group)) + instance=instance)) def get_cloudera_manager_info(self, cluster): mng = self.pu.get_manager(cluster) @@ -297,6 +297,6 @@ class ClouderaUtils(object): } return info - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): # Defined in derived class. return diff --git a/sahara/plugins/cdh/v5/cloudera_utils.py b/sahara/plugins/cdh/v5/cloudera_utils.py index 34bfc113fa..b14f646ab6 100644 --- a/sahara/plugins/cdh/v5/cloudera_utils.py +++ b/sahara/plugins/cdh/v5/cloudera_utils.py @@ -145,7 +145,7 @@ class ClouderaUtilsV5(cu.ClouderaUtils): hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -214,10 +214,10 @@ class ClouderaUtilsV5(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, hive_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -240,8 +240,8 @@ class ClouderaUtilsV5(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py index 509b87ca9c..1b8b399d87 100644 --- a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py @@ -205,7 +205,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils): impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -344,10 +344,10 @@ class ClouderaUtilsV530(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -382,8 +382,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py index 54ec974be8..42b015654c 100644 --- a/sahara/plugins/cdh/v5_4_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_4_0/cloudera_utils.py @@ -222,7 +222,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils): kms.update_config(self._get_configs(KMS_SERVICE_TYPE, cluster=cluster)) - def _get_configs(self, service, cluster=None, node_group=None): + def _get_configs(self, service, cluster=None, instance=None): def get_hadoop_dirs(mount_points, suffix): return ','.join([x + suffix for x in mount_points]) @@ -363,10 +363,10 @@ class ClouderaUtilsV540(cu.ClouderaUtils): all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) - if node_group: - paths = node_group.storage_paths() + if instance: + paths = instance.storage_paths() - ng_default_confs = { + instance_default_confs = { 'NAMENODE': { 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') }, @@ -401,9 +401,9 @@ class ClouderaUtilsV540(cu.ClouderaUtils): } ng_user_confs = self.pu.convert_process_configs( - node_group.node_configs) + instance.node_group.node_configs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) - all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) + all_confs = s_cfg.merge_configs(all_confs, instance_default_confs) return all_confs.get(service, {}) diff --git a/sahara/plugins/hdp/clusterspec.py b/sahara/plugins/hdp/clusterspec.py index 57b681ad41..1efc212808 100644 --- a/sahara/plugins/hdp/clusterspec.py +++ b/sahara/plugins/hdp/clusterspec.py @@ -210,7 +210,6 @@ class ClusterSpec(object): node_group.count = ng.count node_group.id = ng.id node_group.components = ng.node_processes[:] - node_group.ng_storage_paths = ng.storage_paths() for instance in ng.instances: node_group.instances.add(Instance(instance)) self.node_groups[node_group.name] = node_group @@ -270,14 +269,10 @@ class NodeGroup(object): self.cardinality = None self.count = None self.instances = set() - self.ng_storage_paths = [] def add_component(self, component): self.components.append(component) - def storage_paths(self): - return self.ng_storage_paths - class User(object): def __init__(self, name, password, groups): diff --git a/sahara/plugins/hdp/versions/version_2_0_6/services.py b/sahara/plugins/hdp/versions/version_2_0_6/services.py index a88a7a59ac..16c814479c 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/services.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/services.py @@ -102,16 +102,12 @@ class Service(object): config[prop_name] = value def _get_common_paths(self, node_groups): - if len(node_groups) == 1: - paths = node_groups[0].storage_paths() - else: - sets = [set(ng.storage_paths()) for ng in node_groups] - paths = list(set.intersection(*sets)) + sets = [] + for node_group in node_groups: + for instance in node_group.instances: + sets.append(set(instance.sahara_instance.storage_paths())) - if len(paths) > 1 and '/mnt' in paths: - paths.remove('/mnt') - - return paths + return list(set.intersection(*sets)) if sets else [] def _generate_storage_path(self, storage_paths, path): return ",".join([p + path for p in storage_paths]) @@ -201,7 +197,7 @@ class HdfsService(Service): hdfs_site_config = cluster_spec.configurations['hdfs-site'] hdfs_site_config['dfs.namenode.name.dir'] = ( self._generate_storage_path( - nn_ng.storage_paths(), '/hadoop/hdfs/namenode')) + self._get_common_paths([nn_ng]), '/hadoop/hdfs/namenode')) if common_paths: hdfs_site_config['dfs.datanode.data.dir'] = ( self._generate_storage_path( diff --git a/sahara/plugins/mapr/services/maprfs/maprfs.py b/sahara/plugins/mapr/services/maprfs/maprfs.py index 6cd211b65c..bea23fe39d 100644 --- a/sahara/plugins/mapr/services/maprfs/maprfs.py +++ b/sahara/plugins/mapr/services/maprfs/maprfs.py @@ -107,7 +107,7 @@ class MapRFS(s.Service): def _generate_disk_list_file(self, instance, path_to_disk_setup_script): LOG.debug('Creating disk list file') g.run_script(instance, path_to_disk_setup_script, 'root', - *instance.node_group.storage_paths()) + *instance.storage_paths()) def _execute_disksetup(self, instance): with instance.remote() as rmt: diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index af86f732d3..c143bafbe9 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -163,7 +163,6 @@ class SparkProvider(p.ProvisioningPluginBase): cluster) def _extract_configs_to_extra(self, cluster): - nn = utils.get_instance(cluster, "namenode") sp_master = utils.get_instance(cluster, "master") sp_slaves = utils.get_instances(cluster, "slave") @@ -186,22 +185,10 @@ class SparkProvider(p.ProvisioningPluginBase): config_defaults = c_helper.generate_spark_executor_classpath(cluster) extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) - for ng in cluster.node_groups: - extra[ng.id] = { - 'xml': c_helper.generate_xml_configs( - ng.configuration(), - ng.storage_paths(), - nn.hostname(), None - ), - 'setup_script': c_helper.generate_hadoop_setup_script( - ng.storage_paths(), - c_helper.extract_hadoop_environment_confs( - ng.configuration()) - ), - 'sp_master': config_master, - 'sp_slaves': config_slaves, - 'sp_defaults': config_defaults - } + + extra['sp_master'] = config_master + extra['sp_slaves'] = config_slaves + extra['sp_defaults'] = config_defaults if c_helper.is_data_locality_enabled(cluster): topology_data = th.generate_topology_map( @@ -211,6 +198,19 @@ class SparkProvider(p.ProvisioningPluginBase): return extra + def _add_instance_ng_related_to_extra(self, cluster, instance, extra): + extra = extra.copy() + ng = instance.node_group + nn = utils.get_instance(cluster, "namenode") + + extra['xml'] = c_helper.generate_xml_configs( + ng.configuration(), instance.storage_paths(), nn.hostname(), None) + extra['setup_script'] = c_helper.generate_hadoop_setup_script( + instance.storage_paths(), + c_helper.extract_hadoop_environment_confs(ng.configuration())) + + return extra + def _start_datanode_processes(self, dn_instances): if len(dn_instances) == 0: return @@ -243,6 +243,8 @@ class SparkProvider(p.ProvisioningPluginBase): cluster.id, _("Push configs to nodes"), len(all_instances)) with context.ThreadGroup() as tg: for instance in all_instances: + extra = self._add_instance_ng_related_to_extra( + cluster, instance, extra) if instance in new_instances: tg.spawn('spark-configure-%s' % instance.instance_name, self._push_configs_to_new_node, cluster, @@ -254,25 +256,23 @@ class SparkProvider(p.ProvisioningPluginBase): @cpo.event_wrapper(mark_successful_on_exit=True) def _push_configs_to_new_node(self, cluster, extra, instance): - ng_extra = extra[instance.node_group.id] - files_hadoop = { os.path.join(c_helper.HADOOP_CONF_DIR, - "core-site.xml"): ng_extra['xml']['core-site'], + "core-site.xml"): extra['xml']['core-site'], os.path.join(c_helper.HADOOP_CONF_DIR, - "hdfs-site.xml"): ng_extra['xml']['hdfs-site'], + "hdfs-site.xml"): extra['xml']['hdfs-site'], } sp_home = self._spark_home(cluster) files_spark = { - os.path.join(sp_home, 'conf/spark-env.sh'): ng_extra['sp_master'], - os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], + os.path.join(sp_home, 'conf/spark-env.sh'): extra['sp_master'], + os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'], os.path.join(sp_home, - 'conf/spark-defaults.conf'): ng_extra['sp_defaults'] + 'conf/spark-defaults.conf'): extra['sp_defaults'] } files_init = { - '/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'], + '/tmp/sahara-hadoop-init.sh': extra['setup_script'], 'id_rsa': cluster.management_private_key, 'authorized_keys': cluster.management_public_key } @@ -283,7 +283,7 @@ class SparkProvider(p.ProvisioningPluginBase): 'sudo chown $USER $HOME/.ssh/id_rsa; ' 'sudo chmod 600 $HOME/.ssh/id_rsa') - storage_paths = instance.node_group.storage_paths() + storage_paths = instance.storage_paths() dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, '/dfs/dn')) nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, @@ -336,15 +336,14 @@ class SparkProvider(p.ProvisioningPluginBase): 'slave' in node_processes) if need_update_spark: - ng_extra = extra[instance.node_group.id] sp_home = self._spark_home(cluster) files = { os.path.join(sp_home, - 'conf/spark-env.sh'): ng_extra['sp_master'], - os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], + 'conf/spark-env.sh'): extra['sp_master'], + os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'], os.path.join( sp_home, - 'conf/spark-defaults.conf'): ng_extra['sp_defaults'] + 'conf/spark-defaults.conf'): extra['sp_defaults'] } r = remote.get_remote(instance) r.write_files_to(files) diff --git a/sahara/plugins/vanilla/hadoop2/config.py b/sahara/plugins/vanilla/hadoop2/config.py index ed6ce8d83d..78c482f44a 100644 --- a/sahara/plugins/vanilla/hadoop2/config.py +++ b/sahara/plugins/vanilla/hadoop2/config.py @@ -73,24 +73,25 @@ def _configure_instance(pctx, instance): def _provisioning_configs(pctx, instance): - xmls, env = _generate_configs(pctx, instance.node_group) + xmls, env = _generate_configs(pctx, instance) _push_xml_configs(instance, xmls) _push_env_configs(instance, env) -def _generate_configs(pctx, node_group): - hadoop_xml_confs = _get_hadoop_configs(pctx, node_group) - user_xml_confs, user_env_confs = _get_user_configs(pctx, node_group) +def _generate_configs(pctx, instance): + hadoop_xml_confs = _get_hadoop_configs(pctx, instance) + user_xml_confs, user_env_confs = _get_user_configs( + pctx, instance.node_group) xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs) env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs) return xml_confs, env_confs -def _get_hadoop_configs(pctx, node_group): - cluster = node_group.cluster +def _get_hadoop_configs(pctx, instance): + cluster = instance.node_group.cluster nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster)) - dirs = _get_hadoop_dirs(node_group) + dirs = _get_hadoop_dirs(instance) confs = { 'Hadoop': { 'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname @@ -282,8 +283,7 @@ def _push_configs_to_instance(instance, configs): def _post_configuration(pctx, instance): - node_group = instance.node_group - dirs = _get_hadoop_dirs(node_group) + dirs = _get_hadoop_dirs(instance) args = { 'hadoop_user': HADOOP_USER, 'hadoop_group': HADOOP_GROUP, @@ -313,9 +313,9 @@ def _post_configuration(pctx, instance): r.execute_command('chmod +x ' + t_script, run_as_root=True) -def _get_hadoop_dirs(node_group): +def _get_hadoop_dirs(instance): dirs = {} - storage_paths = node_group.storage_paths() + storage_paths = instance.storage_paths() dirs['hadoop_name_dirs'] = _make_hadoop_paths( storage_paths, '/hdfs/namenode') dirs['hadoop_data_dirs'] = _make_hadoop_paths( diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index e97584ced3..cb0a4eb520 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +import threading + from oslo_config import cfg from oslo_log import log as logging @@ -21,6 +24,7 @@ from sahara import context from sahara import exceptions as ex from sahara.i18n import _ from sahara.i18n import _LE +from sahara.i18n import _LW from sahara.utils import cluster_progress_ops as cpo from sahara.utils.openstack import base as b from sahara.utils.openstack import cinder @@ -51,6 +55,7 @@ def _count_volumes_to_mount(instances): def attach_to_instances(instances): instances_to_attach = _count_instances_to_attach(instances) if instances_to_attach == 0: + mount_to_instances(instances) return cpo.add_provisioning_step( @@ -66,6 +71,8 @@ def attach_to_instances(instances): % instance.instance_name, _attach_volumes_to_node, instance.node_group, instance) + mount_to_instances(instances) + @poll_utils.poll_status( 'await_attach_volumes', _("Await for attaching volumes to instances"), @@ -91,13 +98,6 @@ def _attach_volumes_to_node(node_group, instance): _await_attach_volumes(instance, devices) - paths = instance.node_group.storage_paths() - for idx in range(0, instance.node_group.volumes_per_node): - LOG.debug("Mounting volume {volume} to instance" - .format(volume=devices[idx])) - _mount_volume(instance, devices[idx], paths[idx]) - LOG.debug("Mounted volume to instance") - @poll_utils.poll_status( 'volume_available_timeout', _("Await for volume become available"), @@ -155,51 +155,90 @@ def mount_to_instances(instances): instances[0].cluster_id, _("Mount volumes to instances"), _count_volumes_to_mount(instances)) - with context.ThreadGroup() as tg: - for instance in instances: - with context.set_current_instance_id(instance.instance_id): - devices = _find_instance_volume_devices(instance) + for instance in instances: + with context.set_current_instance_id(instance.instance_id): + devices = _find_instance_devices(instance) + formatted_devices = [] + lock = threading.Lock() + with context.ThreadGroup() as tg: # Since formating can take several minutes (for large disks) # and can be done in parallel, launch one thread per disk. - for idx in range(0, instance.node_group.volumes_per_node): - tg.spawn( - 'mount-volume-%d-to-node-%s' % - (idx, instance.instance_name), - _mount_volume_to_node, instance, idx, devices[idx]) + for device in devices: + tg.spawn('format-device-%s' % device, _format_device, + instance, device, formatted_devices, lock) + + conductor.instance_update( + context.current(), instance, + {"storage_devices_number": len(formatted_devices)}) + for idx, dev in enumerate(formatted_devices): + _mount_volume_to_node(instance, idx+1, dev) -def _find_instance_volume_devices(instance): - volumes = b.execute_with_retries(nova.client().volumes.get_server_volumes, - instance.instance_id) - devices = [volume.device for volume in volumes] - return devices +def _find_instance_devices(instance): + with instance.remote() as r: + code, attached_info = r.execute_command( + "lsblk -r | awk '$6 ~ /disk/ || /part/ {print \"/dev/\" $1}'") + attached_dev = attached_info.split() + code, mounted_info = r.execute_command( + "mount | awk '$1 ~ /^\/dev/ {print $1}'") + mounted_dev = mounted_info.split() + + # filtering attached devices, that should not be mounted + for dev in attached_dev[:]: + idx = re.sub("\D", "", dev) + if idx: + if dev in mounted_dev: + attached_dev.remove(re.sub("\d", "", dev)) + attached_dev.remove(dev) + + for dev in attached_dev[:]: + if re.sub("\D", "", dev): + if re.sub("\d", "", dev) in attached_dev: + attached_dev.remove(dev) + + return attached_dev @cpo.event_wrapper(mark_successful_on_exit=True) -def _mount_volume_to_node(instance, idx, device): +def _mount_volume_to_node(instance, index, device): LOG.debug("Mounting volume {device} to instance".format(device=device)) - mount_point = instance.node_group.storage_paths()[idx] + mount_point = instance.node_group.volume_mount_prefix + str(index) _mount_volume(instance, device, mount_point) LOG.debug("Mounted volume to instance") +def _format_device(instance, device, formatted_devices=None, lock=None): + with instance.remote() as r: + try: + # Format devices with better performance options: + # - reduce number of blocks reserved for root to 1% + # - use 'dir_index' for faster directory listings + # - use 'extents' to work faster with large files + # - disable journaling + fs_opts = '-F -m 1 -O dir_index,extents,^has_journal' + r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device)) + if lock: + with lock: + formatted_devices.append(device) + except Exception: + LOG.warning( + _LW("Device {dev} cannot be formatted").format(dev=device)) + + def _mount_volume(instance, device_path, mount_point): with instance.remote() as r: try: # Mount volumes with better performance options: - # - reduce number of blocks reserved for root to 1% - # - use 'dir_index' for faster directory listings - # - use 'extents' to work faster with large files - # - disable journaling # - enable write-back # - do not store access time - fs_opts = '-m 1 -O dir_index,extents,^has_journal' mount_opts = '-o data=writeback,noatime,nodiratime' r.execute_command('sudo mkdir -p %s' % mount_point) - r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device_path)) r.execute_command('sudo mount %s %s %s' % (mount_opts, device_path, mount_point)) + r.execute_command( + 'sudo sh -c "grep %s /etc/mtab >> /etc/fstab"' % device_path) + except Exception: LOG.error(_LE("Error mounting volume to instance")) raise diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 0ccd3cb43b..70582caf19 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -540,6 +540,9 @@ class SaharaMigrationsCheckers(object): self.assertColumnExists(engine, 'job_executions', 'engine_job_id') + def _check_028(self, engine, data): + self.assertColumnExists(engine, 'instances', 'storage_devices_number') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/plugins/ambari/test_configs.py b/sahara/tests/unit/plugins/ambari/test_configs.py index 0a46431427..0791230762 100644 --- a/sahara/tests/unit/plugins/ambari/test_configs.py +++ b/sahara/tests/unit/plugins/ambari/test_configs.py @@ -30,8 +30,10 @@ class AmbariConfigsTestCase(base.SaharaTestCase): self.ng.node_configs = {} self.ng.cluster = mock.Mock() self.ng.cluster.hadoop_version = "2.2" - self.ng.storage_paths = mock.Mock() - self.ng.storage_paths.return_value = ["/data1", "/data2"] + self.instance = mock.Mock() + self.instance.node_group = self.ng + self.instance.storage_paths = mock.Mock() + self.instance.storage_paths.return_value = ["/data1", "/data2"] def assertConfigEqual(self, expected, actual): self.assertEqual(len(expected), len(actual)) @@ -45,8 +47,8 @@ class AmbariConfigsTestCase(base.SaharaTestCase): self.assertEqual(len(expected), len(cnt_ex)) self.assertEqual(len(actual), len(cnt_act)) - def test_get_ng_params_default(self): - ng_configs = configs.get_ng_params(self.ng) + def test_get_instance_params_default(self): + instance_configs = configs.get_instance_params(self.instance) expected = [ { "hdfs-site": { @@ -71,16 +73,16 @@ class AmbariConfigsTestCase(base.SaharaTestCase): } } ] - self.assertConfigEqual(expected, ng_configs) + self.assertConfigEqual(expected, instance_configs) - def test_get_ng_params(self): + def test_get_instance_params(self): self.ng.node_configs = { "YARN": { "mapreduce.map.java.opts": "-Dk=v", "yarn.scheduler.minimum-allocation-mb": "256" } } - ng_configs = configs.get_ng_params(self.ng) + instance_configs = configs.get_instance_params(self.instance) expected = [ { "hdfs-site": { @@ -111,4 +113,4 @@ class AmbariConfigsTestCase(base.SaharaTestCase): } } ] - self.assertConfigEqual(expected, ng_configs) + self.assertConfigEqual(expected, instance_configs) diff --git a/sahara/tests/unit/plugins/hdp/hdp_test_base.py b/sahara/tests/unit/plugins/hdp/hdp_test_base.py index 7541a78691..aabc5e0f71 100644 --- a/sahara/tests/unit/plugins/hdp/hdp_test_base.py +++ b/sahara/tests/unit/plugins/hdp/hdp_test_base.py @@ -29,6 +29,11 @@ class TestServer(object): self.public_ip = public_ip self.internal_ip = private_ip self.node_group = None + self.sahara_instance = self + self.storage_path = ['/mnt'] + + def storage_paths(self): + return self.storage_path def fqdn(self): return self.inst_fqdn @@ -81,10 +86,6 @@ class TestNodeGroup(object): self.node_processes = node_processes self.count = count self.id = name - self.ng_storage_paths = [] - - def storage_paths(self): - return self.ng_storage_paths class TestUserInputConfig(object): diff --git a/sahara/tests/unit/plugins/hdp/test_services.py b/sahara/tests/unit/plugins/hdp/test_services.py index 01a28700d4..6ddaac5b01 100644 --- a/sahara/tests/unit/plugins/hdp/test_services.py +++ b/sahara/tests/unit/plugins/hdp/test_services.py @@ -763,25 +763,32 @@ class ServicesTest(base.SaharaTestCase): for version in versions: s = self.get_services_processor(version=version) service = s.create_service('AMBARI') - ng1 = hdp_test_base.TestNodeGroup(None, None, None) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1'] - ng2 = hdp_test_base.TestNodeGroup(None, None, None) - ng2.ng_storage_paths = ['/mnt'] + server1 = hdp_test_base.TestServer( + 'host1', 'test-master', '11111', 3, '1.1.1.1', '2.2.2.2') + server2 = hdp_test_base.TestServer( + 'host2', 'test-slave', '11111', 3, '3.3.3.3', '4.4.4.4') + server3 = hdp_test_base.TestServer( + 'host3', 'another-test', '11111', 3, '6.6.6.6', '5.5.5.5') + ng1 = hdp_test_base.TestNodeGroup('ng1', [server1], None) + ng2 = hdp_test_base.TestNodeGroup('ng2', [server2], None) + ng3 = hdp_test_base.TestNodeGroup('ng3', [server3], None) + + server1.storage_path = ['/volume/disk1'] + server2.storage_path = ['/mnt'] paths = service._get_common_paths([ng1, ng2]) - self.assertEqual(['/mnt'], paths) + self.assertEqual([], paths) - ng3 = hdp_test_base.TestNodeGroup(None, None, None) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] - ng2.ng_storage_paths = ['/mnt'] - ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] + server1.storage_path = ['/volume/disk1', '/volume/disk2'] + server2.storage_path = ['/mnt'] + server3.storage_path = ['/volume/disk1'] paths = service._get_common_paths([ng1, ng2, ng3]) - self.assertEqual(['/mnt'], paths) + self.assertEqual([], paths) - ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] - ng2.ng_storage_paths = ['/mnt', '/volume/disk1'] - ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] + server1.storage_path = ['/volume/disk1', '/volume/disk2'] + server2.storage_path = ['/volume/disk1'] + server3.storage_path = ['/volume/disk1'] paths = service._get_common_paths([ng1, ng2, ng3]) self.assertEqual(['/volume/disk1'], paths) diff --git a/sahara/tests/unit/service/test_volumes.py b/sahara/tests/unit/service/test_volumes.py index 5878ee13ee..3708312ed1 100644 --- a/sahara/tests/unit/service/test_volumes.py +++ b/sahara/tests/unit/service/test_volumes.py @@ -79,13 +79,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase): volumes.detach_from_instance(instance)) @base.mock_thread_group - @mock.patch('sahara.service.volumes._mount_volume') + @mock.patch('sahara.service.volumes.mount_to_instances') @mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._create_attach_volume') @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') - def test_attach(self, add_step, add_event, - p_create_attach_vol, p_await, p_mount): + def test_attach(self, add_step, add_event, p_create_attach_vol, p_await, + p_mount): p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_await.return_value = None p_mount.return_value = None @@ -115,7 +115,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase): volumes.attach_to_instances(cluster_utils.get_instances(cluster)) self.assertEqual(4, p_create_attach_vol.call_count) self.assertEqual(2, p_await.call_count) - self.assertEqual(4, p_mount.call_count) + self.assertEqual(1, p_mount.call_count) @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) @mock.patch('sahara.context.sleep') @@ -157,3 +157,35 @@ class TestAttachVolume(base.SaharaWithDbTestCase): inst.remote.return_value = inst_remote return inst + + def test_find_instance_volume_devices(self): + instance = self._get_instance() + ex_cmd = instance.remote().execute_command + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdc' + mounted_info = '/dev/vda1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb', '/dev/vdc'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1 /dev/vdb1' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb2'], diff) + + attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2' + mounted_info = '/dev/vda1 /dev/vdb2' + ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)] + + diff = volumes._find_instance_devices(instance) + self.assertEqual(['/dev/vdb1'], diff)