Formatting and mounting methods changed for ironic

Currently only cinder volumes can be formatted and mounted.
This patch provides an ability of formatting and mounting
any attached storage devices. Also storage_paths method was
moved from node group objects to instance and corresponding
fixes in plugins was made.

Closes-bug: #1478899
Closes-bug: #1478904

Change-Id: If92d6fdea25e374d6d5c404be5ac3677cb60c057
This commit is contained in:
Andrey Pavlov 2015-07-17 13:19:14 +03:00
parent 86caac15e5
commit 53eaa64263
21 changed files with 276 additions and 165 deletions

View File

@ -55,7 +55,8 @@ NODE_GROUP_TEMPLATE_DEFAULTS.update({"is_public": False,
"is_protected": False}) "is_protected": False})
INSTANCE_DEFAULTS = { INSTANCE_DEFAULTS = {
"volumes": [] "volumes": [],
"storage_devices_number": 0
} }
DATA_SOURCE_DEFAULTS = { DATA_SOURCE_DEFAULTS = {

View File

@ -131,18 +131,6 @@ class NodeGroup(object):
return configs.merge_configs(self.cluster.cluster_configs, return configs.merge_configs(self.cluster.cluster_configs,
self.node_configs) self.node_configs)
def storage_paths(self):
mp = []
for idx in range(1, self.volumes_per_node + 1):
mp.append(self.volume_mount_prefix + str(idx))
# Here we assume that NG's instances use ephemeral
# drives for storage if volumes_per_node == 0
if not mp:
mp = ['/mnt']
return mp
def get_image_id(self): def get_image_id(self):
return self.image_id or self.cluster.default_image_id return self.image_id or self.cluster.default_image_id
@ -158,6 +146,7 @@ class Instance(object):
internal_ip internal_ip
management_ip management_ip
volumes volumes
storage_devices_number
""" """
def hostname(self): def hostname(self):
@ -169,6 +158,16 @@ class Instance(object):
def remote(self): def remote(self):
return remote.get_remote(self) return remote.get_remote(self)
def storage_paths(self):
mp = []
for idx in range(1, self.storage_devices_number + 1):
mp.append(self.node_group.volume_mount_prefix + str(idx))
if not mp:
mp = ['/mnt']
return mp
class ClusterTemplate(object): class ClusterTemplate(object):
"""An object representing Cluster Template. """An object representing Cluster Template.

View File

@ -0,0 +1,35 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add_storage_devices_number
Revision ID: 028
Revises: 027
Create Date: 2015-07-20 16:56:23.562710
"""
# revision identifiers, used by Alembic.
revision = '028'
down_revision = '027'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('instances',
sa.Column('storage_devices_number', sa.Integer(),
nullable=True))

View File

@ -157,6 +157,7 @@ class Instance(mb.SaharaBase):
internal_ip = sa.Column(sa.String(45)) internal_ip = sa.Column(sa.String(45))
management_ip = sa.Column(sa.String(45)) management_ip = sa.Column(sa.String(45))
volumes = sa.Column(st.JsonListType()) volumes = sa.Column(st.JsonListType())
storage_devices_number = sa.Column(sa.Integer)
# Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation # Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation

View File

@ -167,10 +167,10 @@ def _make_paths(dirs, suffix):
return ",".join([d + suffix for d in dirs]) return ",".join([d + suffix for d in dirs])
def get_ng_params(node_group): def get_instance_params(inst):
configs = _create_ambari_configs(node_group.node_configs, configs = _create_ambari_configs(inst.node_group.node_configs,
node_group.cluster.hadoop_version) inst.node_group.cluster.hadoop_version)
storage_paths = node_group.storage_paths() storage_paths = inst.storage_paths()
configs.setdefault("hdfs-site", {}) configs.setdefault("hdfs-site", {})
configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths( configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths(
storage_paths, "/hdfs/data") storage_paths, "/hdfs/data")
@ -188,7 +188,6 @@ def get_ng_params(node_group):
configs["yarn-site"][ configs["yarn-site"][
"yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths( "yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths(
storage_paths, "/yarn/timeline") storage_paths, "/yarn/timeline")
return _serialize_ambari_configs(configs) return _serialize_ambari_configs(configs)

View File

@ -183,13 +183,14 @@ def create_blueprint(cluster):
cluster = conductor.cluster_get(context.ctx(), cluster.id) cluster = conductor.cluster_get(context.ctx(), cluster.id)
host_groups = [] host_groups = []
for ng in cluster.node_groups: for ng in cluster.node_groups:
hg = {
"name": ng.name,
"configurations": configs.get_ng_params(ng),
"components": []
}
procs = p_common.get_ambari_proc_list(ng) procs = p_common.get_ambari_proc_list(ng)
procs.extend(p_common.get_clients(cluster)) procs.extend(p_common.get_clients(cluster))
for instance in ng.instances:
hg = {
"name": instance.instance_name,
"configurations": configs.get_instance_params(instance),
"components": []
}
for proc in procs: for proc in procs:
hg["components"].append({"name": proc}) hg["components"].append({"name": proc})
host_groups.append(hg) host_groups.append(hg)
@ -214,9 +215,10 @@ def start_cluster(cluster):
"host_groups": [] "host_groups": []
} }
for ng in cluster.node_groups: for ng in cluster.node_groups:
for instance in ng.instances:
cl_tmpl["host_groups"].append({ cl_tmpl["host_groups"].append({
"name": ng.name, "name": instance.instance_name,
"hosts": map(lambda x: {"fqdn": x.fqdn()}, ng.instances) "hosts": [{"fqdn": instance.fqdn()}]
}) })
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"] password = cluster.extra["ambari_password"]

View File

@ -284,7 +284,7 @@ class ClouderaUtils(object):
role = service.create_role(self.pu.get_role_name(instance, process), role = service.create_role(self.pu.get_role_name(instance, process),
role_type, instance.fqdn()) role_type, instance.fqdn())
role.update_config(self._get_configs(process, cluster, role.update_config(self._get_configs(process, cluster,
node_group=instance.node_group)) instance=instance))
def get_cloudera_manager_info(self, cluster): def get_cloudera_manager_info(self, cluster):
mng = self.pu.get_manager(cluster) mng = self.pu.get_manager(cluster)
@ -297,6 +297,6 @@ class ClouderaUtils(object):
} }
return info return info
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
# Defined in derived class. # Defined in derived class.
return return

View File

@ -145,7 +145,7 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE, hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -214,10 +214,10 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, hive_confs) all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -240,8 +240,8 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -205,7 +205,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE, impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -344,10 +344,10 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -382,8 +382,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -222,7 +222,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
kms.update_config(self._get_configs(KMS_SERVICE_TYPE, kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -363,10 +363,10 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -401,9 +401,9 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -210,7 +210,6 @@ class ClusterSpec(object):
node_group.count = ng.count node_group.count = ng.count
node_group.id = ng.id node_group.id = ng.id
node_group.components = ng.node_processes[:] node_group.components = ng.node_processes[:]
node_group.ng_storage_paths = ng.storage_paths()
for instance in ng.instances: for instance in ng.instances:
node_group.instances.add(Instance(instance)) node_group.instances.add(Instance(instance))
self.node_groups[node_group.name] = node_group self.node_groups[node_group.name] = node_group
@ -266,14 +265,10 @@ class NodeGroup(object):
self.cardinality = None self.cardinality = None
self.count = None self.count = None
self.instances = set() self.instances = set()
self.ng_storage_paths = []
def add_component(self, component): def add_component(self, component):
self.components.append(component) self.components.append(component)
def storage_paths(self):
return self.ng_storage_paths
class User(object): class User(object):
def __init__(self, name, password, groups): def __init__(self, name, password, groups):

View File

@ -102,16 +102,12 @@ class Service(object):
config[prop_name] = value config[prop_name] = value
def _get_common_paths(self, node_groups): def _get_common_paths(self, node_groups):
if len(node_groups) == 1: sets = []
paths = node_groups[0].storage_paths() for node_group in node_groups:
else: for instance in node_group.instances:
sets = [set(ng.storage_paths()) for ng in node_groups] sets.append(set(instance.sahara_instance.storage_paths()))
paths = list(set.intersection(*sets))
if len(paths) > 1 and '/mnt' in paths: return list(set.intersection(*sets)) if sets else []
paths.remove('/mnt')
return paths
def _generate_storage_path(self, storage_paths, path): def _generate_storage_path(self, storage_paths, path):
return ",".join([p + path for p in storage_paths]) return ",".join([p + path for p in storage_paths])
@ -201,7 +197,7 @@ class HdfsService(Service):
hdfs_site_config = cluster_spec.configurations['hdfs-site'] hdfs_site_config = cluster_spec.configurations['hdfs-site']
hdfs_site_config['dfs.namenode.name.dir'] = ( hdfs_site_config['dfs.namenode.name.dir'] = (
self._generate_storage_path( self._generate_storage_path(
nn_ng.storage_paths(), '/hadoop/hdfs/namenode')) self._get_common_paths([nn_ng]), '/hadoop/hdfs/namenode'))
if common_paths: if common_paths:
hdfs_site_config['dfs.datanode.data.dir'] = ( hdfs_site_config['dfs.datanode.data.dir'] = (
self._generate_storage_path( self._generate_storage_path(

View File

@ -107,7 +107,7 @@ class MapRFS(s.Service):
def _generate_disk_list_file(self, instance, path_to_disk_setup_script): def _generate_disk_list_file(self, instance, path_to_disk_setup_script):
LOG.debug('Creating disk list file') LOG.debug('Creating disk list file')
g.run_script(instance, path_to_disk_setup_script, 'root', g.run_script(instance, path_to_disk_setup_script, 'root',
*instance.node_group.storage_paths()) *instance.storage_paths())
def _execute_disksetup(self, instance): def _execute_disksetup(self, instance):
with instance.remote() as rmt: with instance.remote() as rmt:

View File

@ -163,7 +163,6 @@ class SparkProvider(p.ProvisioningPluginBase):
cluster) cluster)
def _extract_configs_to_extra(self, cluster): def _extract_configs_to_extra(self, cluster):
nn = utils.get_instance(cluster, "namenode")
sp_master = utils.get_instance(cluster, "master") sp_master = utils.get_instance(cluster, "master")
sp_slaves = utils.get_instances(cluster, "slave") sp_slaves = utils.get_instances(cluster, "slave")
@ -186,22 +185,10 @@ class SparkProvider(p.ProvisioningPluginBase):
config_defaults = c_helper.generate_spark_executor_classpath(cluster) config_defaults = c_helper.generate_spark_executor_classpath(cluster)
extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster)
for ng in cluster.node_groups:
extra[ng.id] = { extra['sp_master'] = config_master
'xml': c_helper.generate_xml_configs( extra['sp_slaves'] = config_slaves
ng.configuration(), extra['sp_defaults'] = config_defaults
ng.storage_paths(),
nn.hostname(), None
),
'setup_script': c_helper.generate_hadoop_setup_script(
ng.storage_paths(),
c_helper.extract_hadoop_environment_confs(
ng.configuration())
),
'sp_master': config_master,
'sp_slaves': config_slaves,
'sp_defaults': config_defaults
}
if c_helper.is_data_locality_enabled(cluster): if c_helper.is_data_locality_enabled(cluster):
topology_data = th.generate_topology_map( topology_data = th.generate_topology_map(
@ -211,6 +198,19 @@ class SparkProvider(p.ProvisioningPluginBase):
return extra return extra
def _add_instance_ng_related_to_extra(self, cluster, instance, extra):
extra = extra.copy()
ng = instance.node_group
nn = utils.get_instance(cluster, "namenode")
extra['xml'] = c_helper.generate_xml_configs(
ng.configuration(), instance.storage_paths(), nn.hostname(), None)
extra['setup_script'] = c_helper.generate_hadoop_setup_script(
instance.storage_paths(),
c_helper.extract_hadoop_environment_confs(ng.configuration()))
return extra
def _start_datanode_processes(self, dn_instances): def _start_datanode_processes(self, dn_instances):
if len(dn_instances) == 0: if len(dn_instances) == 0:
return return
@ -243,6 +243,8 @@ class SparkProvider(p.ProvisioningPluginBase):
cluster.id, _("Push configs to nodes"), len(all_instances)) cluster.id, _("Push configs to nodes"), len(all_instances))
with context.ThreadGroup() as tg: with context.ThreadGroup() as tg:
for instance in all_instances: for instance in all_instances:
extra = self._add_instance_ng_related_to_extra(
cluster, instance, extra)
if instance in new_instances: if instance in new_instances:
tg.spawn('spark-configure-%s' % instance.instance_name, tg.spawn('spark-configure-%s' % instance.instance_name,
self._push_configs_to_new_node, cluster, self._push_configs_to_new_node, cluster,
@ -254,25 +256,23 @@ class SparkProvider(p.ProvisioningPluginBase):
@cpo.event_wrapper(mark_successful_on_exit=True) @cpo.event_wrapper(mark_successful_on_exit=True)
def _push_configs_to_new_node(self, cluster, extra, instance): def _push_configs_to_new_node(self, cluster, extra, instance):
ng_extra = extra[instance.node_group.id]
files_hadoop = { files_hadoop = {
os.path.join(c_helper.HADOOP_CONF_DIR, os.path.join(c_helper.HADOOP_CONF_DIR,
"core-site.xml"): ng_extra['xml']['core-site'], "core-site.xml"): extra['xml']['core-site'],
os.path.join(c_helper.HADOOP_CONF_DIR, os.path.join(c_helper.HADOOP_CONF_DIR,
"hdfs-site.xml"): ng_extra['xml']['hdfs-site'], "hdfs-site.xml"): extra['xml']['hdfs-site'],
} }
sp_home = self._spark_home(cluster) sp_home = self._spark_home(cluster)
files_spark = { files_spark = {
os.path.join(sp_home, 'conf/spark-env.sh'): ng_extra['sp_master'], os.path.join(sp_home, 'conf/spark-env.sh'): extra['sp_master'],
os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'],
os.path.join(sp_home, os.path.join(sp_home,
'conf/spark-defaults.conf'): ng_extra['sp_defaults'] 'conf/spark-defaults.conf'): extra['sp_defaults']
} }
files_init = { files_init = {
'/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'], '/tmp/sahara-hadoop-init.sh': extra['setup_script'],
'id_rsa': cluster.management_private_key, 'id_rsa': cluster.management_private_key,
'authorized_keys': cluster.management_public_key 'authorized_keys': cluster.management_public_key
} }
@ -283,7 +283,7 @@ class SparkProvider(p.ProvisioningPluginBase):
'sudo chown $USER $HOME/.ssh/id_rsa; ' 'sudo chown $USER $HOME/.ssh/id_rsa; '
'sudo chmod 600 $HOME/.ssh/id_rsa') 'sudo chmod 600 $HOME/.ssh/id_rsa')
storage_paths = instance.node_group.storage_paths() storage_paths = instance.storage_paths()
dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths,
'/dfs/dn')) '/dfs/dn'))
nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths,
@ -336,15 +336,14 @@ class SparkProvider(p.ProvisioningPluginBase):
'slave' in node_processes) 'slave' in node_processes)
if need_update_spark: if need_update_spark:
ng_extra = extra[instance.node_group.id]
sp_home = self._spark_home(cluster) sp_home = self._spark_home(cluster)
files = { files = {
os.path.join(sp_home, os.path.join(sp_home,
'conf/spark-env.sh'): ng_extra['sp_master'], 'conf/spark-env.sh'): extra['sp_master'],
os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'],
os.path.join( os.path.join(
sp_home, sp_home,
'conf/spark-defaults.conf'): ng_extra['sp_defaults'] 'conf/spark-defaults.conf'): extra['sp_defaults']
} }
r = remote.get_remote(instance) r = remote.get_remote(instance)
r.write_files_to(files) r.write_files_to(files)

View File

@ -73,24 +73,25 @@ def _configure_instance(pctx, instance):
def _provisioning_configs(pctx, instance): def _provisioning_configs(pctx, instance):
xmls, env = _generate_configs(pctx, instance.node_group) xmls, env = _generate_configs(pctx, instance)
_push_xml_configs(instance, xmls) _push_xml_configs(instance, xmls)
_push_env_configs(instance, env) _push_env_configs(instance, env)
def _generate_configs(pctx, node_group): def _generate_configs(pctx, instance):
hadoop_xml_confs = _get_hadoop_configs(pctx, node_group) hadoop_xml_confs = _get_hadoop_configs(pctx, instance)
user_xml_confs, user_env_confs = _get_user_configs(pctx, node_group) user_xml_confs, user_env_confs = _get_user_configs(
pctx, instance.node_group)
xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs) xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs)
env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs) env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs)
return xml_confs, env_confs return xml_confs, env_confs
def _get_hadoop_configs(pctx, node_group): def _get_hadoop_configs(pctx, instance):
cluster = node_group.cluster cluster = instance.node_group.cluster
nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster)) nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster))
dirs = _get_hadoop_dirs(node_group) dirs = _get_hadoop_dirs(instance)
confs = { confs = {
'Hadoop': { 'Hadoop': {
'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname 'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname
@ -282,8 +283,7 @@ def _push_configs_to_instance(instance, configs):
def _post_configuration(pctx, instance): def _post_configuration(pctx, instance):
node_group = instance.node_group dirs = _get_hadoop_dirs(instance)
dirs = _get_hadoop_dirs(node_group)
args = { args = {
'hadoop_user': HADOOP_USER, 'hadoop_user': HADOOP_USER,
'hadoop_group': HADOOP_GROUP, 'hadoop_group': HADOOP_GROUP,
@ -313,9 +313,9 @@ def _post_configuration(pctx, instance):
r.execute_command('chmod +x ' + t_script, run_as_root=True) r.execute_command('chmod +x ' + t_script, run_as_root=True)
def _get_hadoop_dirs(node_group): def _get_hadoop_dirs(instance):
dirs = {} dirs = {}
storage_paths = node_group.storage_paths() storage_paths = instance.storage_paths()
dirs['hadoop_name_dirs'] = _make_hadoop_paths( dirs['hadoop_name_dirs'] = _make_hadoop_paths(
storage_paths, '/hdfs/namenode') storage_paths, '/hdfs/namenode')
dirs['hadoop_data_dirs'] = _make_hadoop_paths( dirs['hadoop_data_dirs'] = _make_hadoop_paths(

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import re
import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -21,6 +24,7 @@ from sahara import context
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.i18n import _LE from sahara.i18n import _LE
from sahara.i18n import _LW
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import base as b from sahara.utils.openstack import base as b
from sahara.utils.openstack import cinder from sahara.utils.openstack import cinder
@ -51,6 +55,7 @@ def _count_volumes_to_mount(instances):
def attach_to_instances(instances): def attach_to_instances(instances):
instances_to_attach = _count_instances_to_attach(instances) instances_to_attach = _count_instances_to_attach(instances)
if instances_to_attach == 0: if instances_to_attach == 0:
mount_to_instances(instances)
return return
cpo.add_provisioning_step( cpo.add_provisioning_step(
@ -66,6 +71,8 @@ def attach_to_instances(instances):
% instance.instance_name, _attach_volumes_to_node, % instance.instance_name, _attach_volumes_to_node,
instance.node_group, instance) instance.node_group, instance)
mount_to_instances(instances)
@poll_utils.poll_status( @poll_utils.poll_status(
'await_attach_volumes', _("Await for attaching volumes to instances"), 'await_attach_volumes', _("Await for attaching volumes to instances"),
@ -91,13 +98,6 @@ def _attach_volumes_to_node(node_group, instance):
_await_attach_volumes(instance, devices) _await_attach_volumes(instance, devices)
paths = instance.node_group.storage_paths()
for idx in range(0, instance.node_group.volumes_per_node):
LOG.debug("Mounting volume {volume} to instance"
.format(volume=devices[idx]))
_mount_volume(instance, devices[idx], paths[idx])
LOG.debug("Mounted volume to instance")
@poll_utils.poll_status( @poll_utils.poll_status(
'volume_available_timeout', _("Await for volume become available"), 'volume_available_timeout', _("Await for volume become available"),
@ -155,51 +155,90 @@ def mount_to_instances(instances):
instances[0].cluster_id, instances[0].cluster_id,
_("Mount volumes to instances"), _count_volumes_to_mount(instances)) _("Mount volumes to instances"), _count_volumes_to_mount(instances))
with context.ThreadGroup() as tg:
for instance in instances: for instance in instances:
with context.set_current_instance_id(instance.instance_id): with context.set_current_instance_id(instance.instance_id):
devices = _find_instance_volume_devices(instance) devices = _find_instance_devices(instance)
formatted_devices = []
lock = threading.Lock()
with context.ThreadGroup() as tg:
# Since formating can take several minutes (for large disks) # Since formating can take several minutes (for large disks)
# and can be done in parallel, launch one thread per disk. # and can be done in parallel, launch one thread per disk.
for idx in range(0, instance.node_group.volumes_per_node): for device in devices:
tg.spawn( tg.spawn('format-device-%s' % device, _format_device,
'mount-volume-%d-to-node-%s' % instance, device, formatted_devices, lock)
(idx, instance.instance_name),
_mount_volume_to_node, instance, idx, devices[idx]) conductor.instance_update(
context.current(), instance,
{"storage_devices_number": len(formatted_devices)})
for idx, dev in enumerate(formatted_devices):
_mount_volume_to_node(instance, idx+1, dev)
def _find_instance_volume_devices(instance): def _find_instance_devices(instance):
volumes = b.execute_with_retries(nova.client().volumes.get_server_volumes, with instance.remote() as r:
instance.instance_id) code, attached_info = r.execute_command(
devices = [volume.device for volume in volumes] "lsblk -r | awk '$6 ~ /disk/ || /part/ {print \"/dev/\" $1}'")
return devices attached_dev = attached_info.split()
code, mounted_info = r.execute_command(
"mount | awk '$1 ~ /^\/dev/ {print $1}'")
mounted_dev = mounted_info.split()
# filtering attached devices, that should not be mounted
for dev in attached_dev[:]:
idx = re.sub("\D", "", dev)
if idx:
if dev in mounted_dev:
attached_dev.remove(re.sub("\d", "", dev))
attached_dev.remove(dev)
for dev in attached_dev[:]:
if re.sub("\D", "", dev):
if re.sub("\d", "", dev) in attached_dev:
attached_dev.remove(dev)
return attached_dev
@cpo.event_wrapper(mark_successful_on_exit=True) @cpo.event_wrapper(mark_successful_on_exit=True)
def _mount_volume_to_node(instance, idx, device): def _mount_volume_to_node(instance, index, device):
LOG.debug("Mounting volume {device} to instance".format(device=device)) LOG.debug("Mounting volume {device} to instance".format(device=device))
mount_point = instance.node_group.storage_paths()[idx] mount_point = instance.node_group.volume_mount_prefix + str(index)
_mount_volume(instance, device, mount_point) _mount_volume(instance, device, mount_point)
LOG.debug("Mounted volume to instance") LOG.debug("Mounted volume to instance")
def _format_device(instance, device, formatted_devices=None, lock=None):
with instance.remote() as r:
try:
# Format devices with better performance options:
# - reduce number of blocks reserved for root to 1%
# - use 'dir_index' for faster directory listings
# - use 'extents' to work faster with large files
# - disable journaling
fs_opts = '-F -m 1 -O dir_index,extents,^has_journal'
r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device))
if lock:
with lock:
formatted_devices.append(device)
except Exception:
LOG.warning(
_LW("Device {dev} cannot be formatted").format(dev=device))
def _mount_volume(instance, device_path, mount_point): def _mount_volume(instance, device_path, mount_point):
with instance.remote() as r: with instance.remote() as r:
try: try:
# Mount volumes with better performance options: # Mount volumes with better performance options:
# - reduce number of blocks reserved for root to 1%
# - use 'dir_index' for faster directory listings
# - use 'extents' to work faster with large files
# - disable journaling
# - enable write-back # - enable write-back
# - do not store access time # - do not store access time
fs_opts = '-m 1 -O dir_index,extents,^has_journal'
mount_opts = '-o data=writeback,noatime,nodiratime' mount_opts = '-o data=writeback,noatime,nodiratime'
r.execute_command('sudo mkdir -p %s' % mount_point) r.execute_command('sudo mkdir -p %s' % mount_point)
r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device_path))
r.execute_command('sudo mount %s %s %s' % r.execute_command('sudo mount %s %s %s' %
(mount_opts, device_path, mount_point)) (mount_opts, device_path, mount_point))
r.execute_command(
'sudo sh -c "grep %s /etc/mtab >> /etc/fstab"' % device_path)
except Exception: except Exception:
LOG.error(_LE("Error mounting volume to instance")) LOG.error(_LE("Error mounting volume to instance"))
raise raise

View File

@ -540,6 +540,9 @@ class SaharaMigrationsCheckers(object):
self.assertColumnExists(engine, 'job_executions', self.assertColumnExists(engine, 'job_executions',
'engine_job_id') 'engine_job_id')
def _check_028(self, engine, data):
self.assertColumnExists(engine, 'instances', 'storage_devices_number')
class TestMigrationsMySQL(SaharaMigrationsCheckers, class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase, base.BaseWalkMigrationTestCase,

View File

@ -30,8 +30,10 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
self.ng.node_configs = {} self.ng.node_configs = {}
self.ng.cluster = mock.Mock() self.ng.cluster = mock.Mock()
self.ng.cluster.hadoop_version = "2.2" self.ng.cluster.hadoop_version = "2.2"
self.ng.storage_paths = mock.Mock() self.instance = mock.Mock()
self.ng.storage_paths.return_value = ["/data1", "/data2"] self.instance.node_group = self.ng
self.instance.storage_paths = mock.Mock()
self.instance.storage_paths.return_value = ["/data1", "/data2"]
def assertConfigEqual(self, expected, actual): def assertConfigEqual(self, expected, actual):
self.assertEqual(len(expected), len(actual)) self.assertEqual(len(expected), len(actual))
@ -45,8 +47,8 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
self.assertEqual(len(expected), len(cnt_ex)) self.assertEqual(len(expected), len(cnt_ex))
self.assertEqual(len(actual), len(cnt_act)) self.assertEqual(len(actual), len(cnt_act))
def test_get_ng_params_default(self): def test_get_instance_params_default(self):
ng_configs = configs.get_ng_params(self.ng) instance_configs = configs.get_instance_params(self.instance)
expected = [ expected = [
{ {
"hdfs-site": { "hdfs-site": {
@ -71,16 +73,16 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
} }
} }
] ]
self.assertConfigEqual(expected, ng_configs) self.assertConfigEqual(expected, instance_configs)
def test_get_ng_params(self): def test_get_instance_params(self):
self.ng.node_configs = { self.ng.node_configs = {
"YARN": { "YARN": {
"mapreduce.map.java.opts": "-Dk=v", "mapreduce.map.java.opts": "-Dk=v",
"yarn.scheduler.minimum-allocation-mb": "256" "yarn.scheduler.minimum-allocation-mb": "256"
} }
} }
ng_configs = configs.get_ng_params(self.ng) instance_configs = configs.get_instance_params(self.instance)
expected = [ expected = [
{ {
"hdfs-site": { "hdfs-site": {
@ -111,4 +113,4 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
} }
} }
] ]
self.assertConfigEqual(expected, ng_configs) self.assertConfigEqual(expected, instance_configs)

View File

@ -29,6 +29,11 @@ class TestServer(object):
self.public_ip = public_ip self.public_ip = public_ip
self.internal_ip = private_ip self.internal_ip = private_ip
self.node_group = None self.node_group = None
self.sahara_instance = self
self.storage_path = ['/mnt']
def storage_paths(self):
return self.storage_path
def fqdn(self): def fqdn(self):
return self.inst_fqdn return self.inst_fqdn
@ -81,10 +86,6 @@ class TestNodeGroup(object):
self.node_processes = node_processes self.node_processes = node_processes
self.count = count self.count = count
self.id = name self.id = name
self.ng_storage_paths = []
def storage_paths(self):
return self.ng_storage_paths
class TestUserInputConfig(object): class TestUserInputConfig(object):

View File

@ -763,25 +763,32 @@ class ServicesTest(base.SaharaTestCase):
for version in versions: for version in versions:
s = self.get_services_processor(version=version) s = self.get_services_processor(version=version)
service = s.create_service('AMBARI') service = s.create_service('AMBARI')
ng1 = hdp_test_base.TestNodeGroup(None, None, None) server1 = hdp_test_base.TestServer(
ng1.ng_storage_paths = ['/mnt', '/volume/disk1'] 'host1', 'test-master', '11111', 3, '1.1.1.1', '2.2.2.2')
ng2 = hdp_test_base.TestNodeGroup(None, None, None) server2 = hdp_test_base.TestServer(
ng2.ng_storage_paths = ['/mnt'] 'host2', 'test-slave', '11111', 3, '3.3.3.3', '4.4.4.4')
server3 = hdp_test_base.TestServer(
'host3', 'another-test', '11111', 3, '6.6.6.6', '5.5.5.5')
ng1 = hdp_test_base.TestNodeGroup('ng1', [server1], None)
ng2 = hdp_test_base.TestNodeGroup('ng2', [server2], None)
ng3 = hdp_test_base.TestNodeGroup('ng3', [server3], None)
server1.storage_path = ['/volume/disk1']
server2.storage_path = ['/mnt']
paths = service._get_common_paths([ng1, ng2]) paths = service._get_common_paths([ng1, ng2])
self.assertEqual(['/mnt'], paths) self.assertEqual([], paths)
ng3 = hdp_test_base.TestNodeGroup(None, None, None) server1.storage_path = ['/volume/disk1', '/volume/disk2']
ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] server2.storage_path = ['/mnt']
ng2.ng_storage_paths = ['/mnt'] server3.storage_path = ['/volume/disk1']
ng3.ng_storage_paths = ['/mnt', '/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3]) paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual(['/mnt'], paths) self.assertEqual([], paths)
ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] server1.storage_path = ['/volume/disk1', '/volume/disk2']
ng2.ng_storage_paths = ['/mnt', '/volume/disk1'] server2.storage_path = ['/volume/disk1']
ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] server3.storage_path = ['/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3]) paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual(['/volume/disk1'], paths) self.assertEqual(['/volume/disk1'], paths)

View File

@ -79,13 +79,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
volumes.detach_from_instance(instance)) volumes.detach_from_instance(instance))
@base.mock_thread_group @base.mock_thread_group
@mock.patch('sahara.service.volumes._mount_volume') @mock.patch('sahara.service.volumes.mount_to_instances')
@mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._await_attach_volumes')
@mock.patch('sahara.service.volumes._create_attach_volume') @mock.patch('sahara.service.volumes._create_attach_volume')
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
def test_attach(self, add_step, add_event, def test_attach(self, add_step, add_event, p_create_attach_vol, p_await,
p_create_attach_vol, p_await, p_mount): p_mount):
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
p_await.return_value = None p_await.return_value = None
p_mount.return_value = None p_mount.return_value = None
@ -115,7 +115,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
volumes.attach_to_instances(cluster_utils.get_instances(cluster)) volumes.attach_to_instances(cluster_utils.get_instances(cluster))
self.assertEqual(4, p_create_attach_vol.call_count) self.assertEqual(4, p_create_attach_vol.call_count)
self.assertEqual(2, p_await.call_count) self.assertEqual(2, p_await.call_count)
self.assertEqual(4, p_mount.call_count) self.assertEqual(1, p_mount.call_count)
@mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0)
@mock.patch('sahara.context.sleep') @mock.patch('sahara.context.sleep')
@ -157,3 +157,35 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
inst.remote.return_value = inst_remote inst.remote.return_value = inst_remote
return inst return inst
def test_find_instance_volume_devices(self):
instance = self._get_instance()
ex_cmd = instance.remote().execute_command
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdc'
mounted_info = '/dev/vda1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb', '/dev/vdc'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1 /dev/vdb1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb2'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1 /dev/vdb2'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb1'], diff)