Merge "Formatting and mounting methods changed for ironic"

This commit is contained in:
Jenkins 2015-09-21 19:44:18 +00:00 committed by Gerrit Code Review
commit 9f76fe01b3
21 changed files with 276 additions and 165 deletions

View File

@ -55,7 +55,8 @@ NODE_GROUP_TEMPLATE_DEFAULTS.update({"is_public": False,
"is_protected": False}) "is_protected": False})
INSTANCE_DEFAULTS = { INSTANCE_DEFAULTS = {
"volumes": [] "volumes": [],
"storage_devices_number": 0
} }
DATA_SOURCE_DEFAULTS = { DATA_SOURCE_DEFAULTS = {

View File

@ -131,18 +131,6 @@ class NodeGroup(object):
return configs.merge_configs(self.cluster.cluster_configs, return configs.merge_configs(self.cluster.cluster_configs,
self.node_configs) self.node_configs)
def storage_paths(self):
mp = []
for idx in range(1, self.volumes_per_node + 1):
mp.append(self.volume_mount_prefix + str(idx))
# Here we assume that NG's instances use ephemeral
# drives for storage if volumes_per_node == 0
if not mp:
mp = ['/mnt']
return mp
def get_image_id(self): def get_image_id(self):
return self.image_id or self.cluster.default_image_id return self.image_id or self.cluster.default_image_id
@ -158,6 +146,7 @@ class Instance(object):
internal_ip internal_ip
management_ip management_ip
volumes volumes
storage_devices_number
""" """
def hostname(self): def hostname(self):
@ -169,6 +158,16 @@ class Instance(object):
def remote(self): def remote(self):
return remote.get_remote(self) return remote.get_remote(self)
def storage_paths(self):
mp = []
for idx in range(1, self.storage_devices_number + 1):
mp.append(self.node_group.volume_mount_prefix + str(idx))
if not mp:
mp = ['/mnt']
return mp
class ClusterTemplate(object): class ClusterTemplate(object):
"""An object representing Cluster Template. """An object representing Cluster Template.

View File

@ -0,0 +1,35 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add_storage_devices_number
Revision ID: 028
Revises: 027
Create Date: 2015-07-20 16:56:23.562710
"""
# revision identifiers, used by Alembic.
revision = '028'
down_revision = '027'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('instances',
sa.Column('storage_devices_number', sa.Integer(),
nullable=True))

View File

@ -157,6 +157,7 @@ class Instance(mb.SaharaBase):
internal_ip = sa.Column(sa.String(45)) internal_ip = sa.Column(sa.String(45))
management_ip = sa.Column(sa.String(45)) management_ip = sa.Column(sa.String(45))
volumes = sa.Column(st.JsonListType()) volumes = sa.Column(st.JsonListType())
storage_devices_number = sa.Column(sa.Integer)
# Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation # Template objects: ClusterTemplate, NodeGroupTemplate, TemplatesRelation

View File

@ -167,10 +167,10 @@ def _make_paths(dirs, suffix):
return ",".join([d + suffix for d in dirs]) return ",".join([d + suffix for d in dirs])
def get_ng_params(node_group): def get_instance_params(inst):
configs = _create_ambari_configs(node_group.node_configs, configs = _create_ambari_configs(inst.node_group.node_configs,
node_group.cluster.hadoop_version) inst.node_group.cluster.hadoop_version)
storage_paths = node_group.storage_paths() storage_paths = inst.storage_paths()
configs.setdefault("hdfs-site", {}) configs.setdefault("hdfs-site", {})
configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths( configs["hdfs-site"]["dfs.datanode.data.dir"] = _make_paths(
storage_paths, "/hdfs/data") storage_paths, "/hdfs/data")
@ -188,7 +188,6 @@ def get_ng_params(node_group):
configs["yarn-site"][ configs["yarn-site"][
"yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths( "yarn.timeline-service.leveldb-timeline-store.path"] = _make_paths(
storage_paths, "/yarn/timeline") storage_paths, "/yarn/timeline")
return _serialize_ambari_configs(configs) return _serialize_ambari_configs(configs)

View File

@ -183,16 +183,17 @@ def create_blueprint(cluster):
cluster = conductor.cluster_get(context.ctx(), cluster.id) cluster = conductor.cluster_get(context.ctx(), cluster.id)
host_groups = [] host_groups = []
for ng in cluster.node_groups: for ng in cluster.node_groups:
hg = {
"name": ng.name,
"configurations": configs.get_ng_params(ng),
"components": []
}
procs = p_common.get_ambari_proc_list(ng) procs = p_common.get_ambari_proc_list(ng)
procs.extend(p_common.get_clients(cluster)) procs.extend(p_common.get_clients(cluster))
for proc in procs: for instance in ng.instances:
hg["components"].append({"name": proc}) hg = {
host_groups.append(hg) "name": instance.instance_name,
"configurations": configs.get_instance_params(instance),
"components": []
}
for proc in procs:
hg["components"].append({"name": proc})
host_groups.append(hg)
bp = { bp = {
"Blueprints": { "Blueprints": {
"stack_name": "HDP", "stack_name": "HDP",
@ -214,10 +215,11 @@ def start_cluster(cluster):
"host_groups": [] "host_groups": []
} }
for ng in cluster.node_groups: for ng in cluster.node_groups:
cl_tmpl["host_groups"].append({ for instance in ng.instances:
"name": ng.name, cl_tmpl["host_groups"].append({
"hosts": map(lambda x: {"fqdn": x.fqdn()}, ng.instances) "name": instance.instance_name,
}) "hosts": [{"fqdn": instance.fqdn()}]
})
ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER) ambari = plugin_utils.get_instance(cluster, p_common.AMBARI_SERVER)
password = cluster.extra["ambari_password"] password = cluster.extra["ambari_password"]
with ambari_client.AmbariClient(ambari, password=password) as client: with ambari_client.AmbariClient(ambari, password=password) as client:

View File

@ -284,7 +284,7 @@ class ClouderaUtils(object):
role = service.create_role(self.pu.get_role_name(instance, process), role = service.create_role(self.pu.get_role_name(instance, process),
role_type, instance.fqdn()) role_type, instance.fqdn())
role.update_config(self._get_configs(process, cluster, role.update_config(self._get_configs(process, cluster,
node_group=instance.node_group)) instance=instance))
def get_cloudera_manager_info(self, cluster): def get_cloudera_manager_info(self, cluster):
mng = self.pu.get_manager(cluster) mng = self.pu.get_manager(cluster)
@ -297,6 +297,6 @@ class ClouderaUtils(object):
} }
return info return info
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
# Defined in derived class. # Defined in derived class.
return return

View File

@ -145,7 +145,7 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE, hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -214,10 +214,10 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, hive_confs) all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -240,8 +240,8 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -205,7 +205,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE, impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -344,10 +344,10 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -382,8 +382,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -222,7 +222,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
kms.update_config(self._get_configs(KMS_SERVICE_TYPE, kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster)) cluster=cluster))
def _get_configs(self, service, cluster=None, node_group=None): def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix): def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points]) return ','.join([x + suffix for x in mount_points])
@ -363,10 +363,10 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
all_confs = s_cfg.merge_configs(all_confs, sentry_confs) all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs) all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if node_group: if instance:
paths = node_group.storage_paths() paths = instance.storage_paths()
ng_default_confs = { instance_default_confs = {
'NAMENODE': { 'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn') 'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
}, },
@ -401,9 +401,9 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
} }
ng_user_confs = self.pu.convert_process_configs( ng_user_confs = self.pu.convert_process_configs(
node_group.node_configs) instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs) all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, ng_default_confs) all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {}) return all_confs.get(service, {})

View File

@ -210,7 +210,6 @@ class ClusterSpec(object):
node_group.count = ng.count node_group.count = ng.count
node_group.id = ng.id node_group.id = ng.id
node_group.components = ng.node_processes[:] node_group.components = ng.node_processes[:]
node_group.ng_storage_paths = ng.storage_paths()
for instance in ng.instances: for instance in ng.instances:
node_group.instances.add(Instance(instance)) node_group.instances.add(Instance(instance))
self.node_groups[node_group.name] = node_group self.node_groups[node_group.name] = node_group
@ -270,14 +269,10 @@ class NodeGroup(object):
self.cardinality = None self.cardinality = None
self.count = None self.count = None
self.instances = set() self.instances = set()
self.ng_storage_paths = []
def add_component(self, component): def add_component(self, component):
self.components.append(component) self.components.append(component)
def storage_paths(self):
return self.ng_storage_paths
class User(object): class User(object):
def __init__(self, name, password, groups): def __init__(self, name, password, groups):

View File

@ -102,16 +102,12 @@ class Service(object):
config[prop_name] = value config[prop_name] = value
def _get_common_paths(self, node_groups): def _get_common_paths(self, node_groups):
if len(node_groups) == 1: sets = []
paths = node_groups[0].storage_paths() for node_group in node_groups:
else: for instance in node_group.instances:
sets = [set(ng.storage_paths()) for ng in node_groups] sets.append(set(instance.sahara_instance.storage_paths()))
paths = list(set.intersection(*sets))
if len(paths) > 1 and '/mnt' in paths: return list(set.intersection(*sets)) if sets else []
paths.remove('/mnt')
return paths
def _generate_storage_path(self, storage_paths, path): def _generate_storage_path(self, storage_paths, path):
return ",".join([p + path for p in storage_paths]) return ",".join([p + path for p in storage_paths])
@ -201,7 +197,7 @@ class HdfsService(Service):
hdfs_site_config = cluster_spec.configurations['hdfs-site'] hdfs_site_config = cluster_spec.configurations['hdfs-site']
hdfs_site_config['dfs.namenode.name.dir'] = ( hdfs_site_config['dfs.namenode.name.dir'] = (
self._generate_storage_path( self._generate_storage_path(
nn_ng.storage_paths(), '/hadoop/hdfs/namenode')) self._get_common_paths([nn_ng]), '/hadoop/hdfs/namenode'))
if common_paths: if common_paths:
hdfs_site_config['dfs.datanode.data.dir'] = ( hdfs_site_config['dfs.datanode.data.dir'] = (
self._generate_storage_path( self._generate_storage_path(

View File

@ -107,7 +107,7 @@ class MapRFS(s.Service):
def _generate_disk_list_file(self, instance, path_to_disk_setup_script): def _generate_disk_list_file(self, instance, path_to_disk_setup_script):
LOG.debug('Creating disk list file') LOG.debug('Creating disk list file')
g.run_script(instance, path_to_disk_setup_script, 'root', g.run_script(instance, path_to_disk_setup_script, 'root',
*instance.node_group.storage_paths()) *instance.storage_paths())
def _execute_disksetup(self, instance): def _execute_disksetup(self, instance):
with instance.remote() as rmt: with instance.remote() as rmt:

View File

@ -163,7 +163,6 @@ class SparkProvider(p.ProvisioningPluginBase):
cluster) cluster)
def _extract_configs_to_extra(self, cluster): def _extract_configs_to_extra(self, cluster):
nn = utils.get_instance(cluster, "namenode")
sp_master = utils.get_instance(cluster, "master") sp_master = utils.get_instance(cluster, "master")
sp_slaves = utils.get_instances(cluster, "slave") sp_slaves = utils.get_instances(cluster, "slave")
@ -186,22 +185,10 @@ class SparkProvider(p.ProvisioningPluginBase):
config_defaults = c_helper.generate_spark_executor_classpath(cluster) config_defaults = c_helper.generate_spark_executor_classpath(cluster)
extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster)
for ng in cluster.node_groups:
extra[ng.id] = { extra['sp_master'] = config_master
'xml': c_helper.generate_xml_configs( extra['sp_slaves'] = config_slaves
ng.configuration(), extra['sp_defaults'] = config_defaults
ng.storage_paths(),
nn.hostname(), None
),
'setup_script': c_helper.generate_hadoop_setup_script(
ng.storage_paths(),
c_helper.extract_hadoop_environment_confs(
ng.configuration())
),
'sp_master': config_master,
'sp_slaves': config_slaves,
'sp_defaults': config_defaults
}
if c_helper.is_data_locality_enabled(cluster): if c_helper.is_data_locality_enabled(cluster):
topology_data = th.generate_topology_map( topology_data = th.generate_topology_map(
@ -211,6 +198,19 @@ class SparkProvider(p.ProvisioningPluginBase):
return extra return extra
def _add_instance_ng_related_to_extra(self, cluster, instance, extra):
extra = extra.copy()
ng = instance.node_group
nn = utils.get_instance(cluster, "namenode")
extra['xml'] = c_helper.generate_xml_configs(
ng.configuration(), instance.storage_paths(), nn.hostname(), None)
extra['setup_script'] = c_helper.generate_hadoop_setup_script(
instance.storage_paths(),
c_helper.extract_hadoop_environment_confs(ng.configuration()))
return extra
def _start_datanode_processes(self, dn_instances): def _start_datanode_processes(self, dn_instances):
if len(dn_instances) == 0: if len(dn_instances) == 0:
return return
@ -243,6 +243,8 @@ class SparkProvider(p.ProvisioningPluginBase):
cluster.id, _("Push configs to nodes"), len(all_instances)) cluster.id, _("Push configs to nodes"), len(all_instances))
with context.ThreadGroup() as tg: with context.ThreadGroup() as tg:
for instance in all_instances: for instance in all_instances:
extra = self._add_instance_ng_related_to_extra(
cluster, instance, extra)
if instance in new_instances: if instance in new_instances:
tg.spawn('spark-configure-%s' % instance.instance_name, tg.spawn('spark-configure-%s' % instance.instance_name,
self._push_configs_to_new_node, cluster, self._push_configs_to_new_node, cluster,
@ -254,25 +256,23 @@ class SparkProvider(p.ProvisioningPluginBase):
@cpo.event_wrapper(mark_successful_on_exit=True) @cpo.event_wrapper(mark_successful_on_exit=True)
def _push_configs_to_new_node(self, cluster, extra, instance): def _push_configs_to_new_node(self, cluster, extra, instance):
ng_extra = extra[instance.node_group.id]
files_hadoop = { files_hadoop = {
os.path.join(c_helper.HADOOP_CONF_DIR, os.path.join(c_helper.HADOOP_CONF_DIR,
"core-site.xml"): ng_extra['xml']['core-site'], "core-site.xml"): extra['xml']['core-site'],
os.path.join(c_helper.HADOOP_CONF_DIR, os.path.join(c_helper.HADOOP_CONF_DIR,
"hdfs-site.xml"): ng_extra['xml']['hdfs-site'], "hdfs-site.xml"): extra['xml']['hdfs-site'],
} }
sp_home = self._spark_home(cluster) sp_home = self._spark_home(cluster)
files_spark = { files_spark = {
os.path.join(sp_home, 'conf/spark-env.sh'): ng_extra['sp_master'], os.path.join(sp_home, 'conf/spark-env.sh'): extra['sp_master'],
os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'],
os.path.join(sp_home, os.path.join(sp_home,
'conf/spark-defaults.conf'): ng_extra['sp_defaults'] 'conf/spark-defaults.conf'): extra['sp_defaults']
} }
files_init = { files_init = {
'/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'], '/tmp/sahara-hadoop-init.sh': extra['setup_script'],
'id_rsa': cluster.management_private_key, 'id_rsa': cluster.management_private_key,
'authorized_keys': cluster.management_public_key 'authorized_keys': cluster.management_public_key
} }
@ -283,7 +283,7 @@ class SparkProvider(p.ProvisioningPluginBase):
'sudo chown $USER $HOME/.ssh/id_rsa; ' 'sudo chown $USER $HOME/.ssh/id_rsa; '
'sudo chmod 600 $HOME/.ssh/id_rsa') 'sudo chmod 600 $HOME/.ssh/id_rsa')
storage_paths = instance.node_group.storage_paths() storage_paths = instance.storage_paths()
dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, dn_path = ' '.join(c_helper.make_hadoop_path(storage_paths,
'/dfs/dn')) '/dfs/dn'))
nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths, nn_path = ' '.join(c_helper.make_hadoop_path(storage_paths,
@ -336,15 +336,14 @@ class SparkProvider(p.ProvisioningPluginBase):
'slave' in node_processes) 'slave' in node_processes)
if need_update_spark: if need_update_spark:
ng_extra = extra[instance.node_group.id]
sp_home = self._spark_home(cluster) sp_home = self._spark_home(cluster)
files = { files = {
os.path.join(sp_home, os.path.join(sp_home,
'conf/spark-env.sh'): ng_extra['sp_master'], 'conf/spark-env.sh'): extra['sp_master'],
os.path.join(sp_home, 'conf/slaves'): ng_extra['sp_slaves'], os.path.join(sp_home, 'conf/slaves'): extra['sp_slaves'],
os.path.join( os.path.join(
sp_home, sp_home,
'conf/spark-defaults.conf'): ng_extra['sp_defaults'] 'conf/spark-defaults.conf'): extra['sp_defaults']
} }
r = remote.get_remote(instance) r = remote.get_remote(instance)
r.write_files_to(files) r.write_files_to(files)

View File

@ -73,24 +73,25 @@ def _configure_instance(pctx, instance):
def _provisioning_configs(pctx, instance): def _provisioning_configs(pctx, instance):
xmls, env = _generate_configs(pctx, instance.node_group) xmls, env = _generate_configs(pctx, instance)
_push_xml_configs(instance, xmls) _push_xml_configs(instance, xmls)
_push_env_configs(instance, env) _push_env_configs(instance, env)
def _generate_configs(pctx, node_group): def _generate_configs(pctx, instance):
hadoop_xml_confs = _get_hadoop_configs(pctx, node_group) hadoop_xml_confs = _get_hadoop_configs(pctx, instance)
user_xml_confs, user_env_confs = _get_user_configs(pctx, node_group) user_xml_confs, user_env_confs = _get_user_configs(
pctx, instance.node_group)
xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs) xml_confs = s_cfg.merge_configs(user_xml_confs, hadoop_xml_confs)
env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs) env_confs = s_cfg.merge_configs(pctx['env_confs'], user_env_confs)
return xml_confs, env_confs return xml_confs, env_confs
def _get_hadoop_configs(pctx, node_group): def _get_hadoop_configs(pctx, instance):
cluster = node_group.cluster cluster = instance.node_group.cluster
nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster)) nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster))
dirs = _get_hadoop_dirs(node_group) dirs = _get_hadoop_dirs(instance)
confs = { confs = {
'Hadoop': { 'Hadoop': {
'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname 'fs.defaultFS': 'hdfs://%s:9000' % nn_hostname
@ -282,8 +283,7 @@ def _push_configs_to_instance(instance, configs):
def _post_configuration(pctx, instance): def _post_configuration(pctx, instance):
node_group = instance.node_group dirs = _get_hadoop_dirs(instance)
dirs = _get_hadoop_dirs(node_group)
args = { args = {
'hadoop_user': HADOOP_USER, 'hadoop_user': HADOOP_USER,
'hadoop_group': HADOOP_GROUP, 'hadoop_group': HADOOP_GROUP,
@ -313,9 +313,9 @@ def _post_configuration(pctx, instance):
r.execute_command('chmod +x ' + t_script, run_as_root=True) r.execute_command('chmod +x ' + t_script, run_as_root=True)
def _get_hadoop_dirs(node_group): def _get_hadoop_dirs(instance):
dirs = {} dirs = {}
storage_paths = node_group.storage_paths() storage_paths = instance.storage_paths()
dirs['hadoop_name_dirs'] = _make_hadoop_paths( dirs['hadoop_name_dirs'] = _make_hadoop_paths(
storage_paths, '/hdfs/namenode') storage_paths, '/hdfs/namenode')
dirs['hadoop_data_dirs'] = _make_hadoop_paths( dirs['hadoop_data_dirs'] = _make_hadoop_paths(

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import re
import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -21,6 +24,7 @@ from sahara import context
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.i18n import _LE from sahara.i18n import _LE
from sahara.i18n import _LW
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import base as b from sahara.utils.openstack import base as b
from sahara.utils.openstack import cinder from sahara.utils.openstack import cinder
@ -51,6 +55,7 @@ def _count_volumes_to_mount(instances):
def attach_to_instances(instances): def attach_to_instances(instances):
instances_to_attach = _count_instances_to_attach(instances) instances_to_attach = _count_instances_to_attach(instances)
if instances_to_attach == 0: if instances_to_attach == 0:
mount_to_instances(instances)
return return
cpo.add_provisioning_step( cpo.add_provisioning_step(
@ -66,6 +71,8 @@ def attach_to_instances(instances):
% instance.instance_name, _attach_volumes_to_node, % instance.instance_name, _attach_volumes_to_node,
instance.node_group, instance) instance.node_group, instance)
mount_to_instances(instances)
@poll_utils.poll_status( @poll_utils.poll_status(
'await_attach_volumes', _("Await for attaching volumes to instances"), 'await_attach_volumes', _("Await for attaching volumes to instances"),
@ -91,13 +98,6 @@ def _attach_volumes_to_node(node_group, instance):
_await_attach_volumes(instance, devices) _await_attach_volumes(instance, devices)
paths = instance.node_group.storage_paths()
for idx in range(0, instance.node_group.volumes_per_node):
LOG.debug("Mounting volume {volume} to instance"
.format(volume=devices[idx]))
_mount_volume(instance, devices[idx], paths[idx])
LOG.debug("Mounted volume to instance")
@poll_utils.poll_status( @poll_utils.poll_status(
'volume_available_timeout', _("Await for volume become available"), 'volume_available_timeout', _("Await for volume become available"),
@ -155,51 +155,90 @@ def mount_to_instances(instances):
instances[0].cluster_id, instances[0].cluster_id,
_("Mount volumes to instances"), _count_volumes_to_mount(instances)) _("Mount volumes to instances"), _count_volumes_to_mount(instances))
with context.ThreadGroup() as tg: for instance in instances:
for instance in instances: with context.set_current_instance_id(instance.instance_id):
with context.set_current_instance_id(instance.instance_id): devices = _find_instance_devices(instance)
devices = _find_instance_volume_devices(instance) formatted_devices = []
lock = threading.Lock()
with context.ThreadGroup() as tg:
# Since formating can take several minutes (for large disks) # Since formating can take several minutes (for large disks)
# and can be done in parallel, launch one thread per disk. # and can be done in parallel, launch one thread per disk.
for idx in range(0, instance.node_group.volumes_per_node): for device in devices:
tg.spawn( tg.spawn('format-device-%s' % device, _format_device,
'mount-volume-%d-to-node-%s' % instance, device, formatted_devices, lock)
(idx, instance.instance_name),
_mount_volume_to_node, instance, idx, devices[idx]) conductor.instance_update(
context.current(), instance,
{"storage_devices_number": len(formatted_devices)})
for idx, dev in enumerate(formatted_devices):
_mount_volume_to_node(instance, idx+1, dev)
def _find_instance_volume_devices(instance): def _find_instance_devices(instance):
volumes = b.execute_with_retries(nova.client().volumes.get_server_volumes, with instance.remote() as r:
instance.instance_id) code, attached_info = r.execute_command(
devices = [volume.device for volume in volumes] "lsblk -r | awk '$6 ~ /disk/ || /part/ {print \"/dev/\" $1}'")
return devices attached_dev = attached_info.split()
code, mounted_info = r.execute_command(
"mount | awk '$1 ~ /^\/dev/ {print $1}'")
mounted_dev = mounted_info.split()
# filtering attached devices, that should not be mounted
for dev in attached_dev[:]:
idx = re.sub("\D", "", dev)
if idx:
if dev in mounted_dev:
attached_dev.remove(re.sub("\d", "", dev))
attached_dev.remove(dev)
for dev in attached_dev[:]:
if re.sub("\D", "", dev):
if re.sub("\d", "", dev) in attached_dev:
attached_dev.remove(dev)
return attached_dev
@cpo.event_wrapper(mark_successful_on_exit=True) @cpo.event_wrapper(mark_successful_on_exit=True)
def _mount_volume_to_node(instance, idx, device): def _mount_volume_to_node(instance, index, device):
LOG.debug("Mounting volume {device} to instance".format(device=device)) LOG.debug("Mounting volume {device} to instance".format(device=device))
mount_point = instance.node_group.storage_paths()[idx] mount_point = instance.node_group.volume_mount_prefix + str(index)
_mount_volume(instance, device, mount_point) _mount_volume(instance, device, mount_point)
LOG.debug("Mounted volume to instance") LOG.debug("Mounted volume to instance")
def _format_device(instance, device, formatted_devices=None, lock=None):
with instance.remote() as r:
try:
# Format devices with better performance options:
# - reduce number of blocks reserved for root to 1%
# - use 'dir_index' for faster directory listings
# - use 'extents' to work faster with large files
# - disable journaling
fs_opts = '-F -m 1 -O dir_index,extents,^has_journal'
r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device))
if lock:
with lock:
formatted_devices.append(device)
except Exception:
LOG.warning(
_LW("Device {dev} cannot be formatted").format(dev=device))
def _mount_volume(instance, device_path, mount_point): def _mount_volume(instance, device_path, mount_point):
with instance.remote() as r: with instance.remote() as r:
try: try:
# Mount volumes with better performance options: # Mount volumes with better performance options:
# - reduce number of blocks reserved for root to 1%
# - use 'dir_index' for faster directory listings
# - use 'extents' to work faster with large files
# - disable journaling
# - enable write-back # - enable write-back
# - do not store access time # - do not store access time
fs_opts = '-m 1 -O dir_index,extents,^has_journal'
mount_opts = '-o data=writeback,noatime,nodiratime' mount_opts = '-o data=writeback,noatime,nodiratime'
r.execute_command('sudo mkdir -p %s' % mount_point) r.execute_command('sudo mkdir -p %s' % mount_point)
r.execute_command('sudo mkfs.ext4 %s %s' % (fs_opts, device_path))
r.execute_command('sudo mount %s %s %s' % r.execute_command('sudo mount %s %s %s' %
(mount_opts, device_path, mount_point)) (mount_opts, device_path, mount_point))
r.execute_command(
'sudo sh -c "grep %s /etc/mtab >> /etc/fstab"' % device_path)
except Exception: except Exception:
LOG.error(_LE("Error mounting volume to instance")) LOG.error(_LE("Error mounting volume to instance"))
raise raise

View File

@ -540,6 +540,9 @@ class SaharaMigrationsCheckers(object):
self.assertColumnExists(engine, 'job_executions', self.assertColumnExists(engine, 'job_executions',
'engine_job_id') 'engine_job_id')
def _check_028(self, engine, data):
self.assertColumnExists(engine, 'instances', 'storage_devices_number')
class TestMigrationsMySQL(SaharaMigrationsCheckers, class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase, base.BaseWalkMigrationTestCase,

View File

@ -30,8 +30,10 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
self.ng.node_configs = {} self.ng.node_configs = {}
self.ng.cluster = mock.Mock() self.ng.cluster = mock.Mock()
self.ng.cluster.hadoop_version = "2.2" self.ng.cluster.hadoop_version = "2.2"
self.ng.storage_paths = mock.Mock() self.instance = mock.Mock()
self.ng.storage_paths.return_value = ["/data1", "/data2"] self.instance.node_group = self.ng
self.instance.storage_paths = mock.Mock()
self.instance.storage_paths.return_value = ["/data1", "/data2"]
def assertConfigEqual(self, expected, actual): def assertConfigEqual(self, expected, actual):
self.assertEqual(len(expected), len(actual)) self.assertEqual(len(expected), len(actual))
@ -45,8 +47,8 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
self.assertEqual(len(expected), len(cnt_ex)) self.assertEqual(len(expected), len(cnt_ex))
self.assertEqual(len(actual), len(cnt_act)) self.assertEqual(len(actual), len(cnt_act))
def test_get_ng_params_default(self): def test_get_instance_params_default(self):
ng_configs = configs.get_ng_params(self.ng) instance_configs = configs.get_instance_params(self.instance)
expected = [ expected = [
{ {
"hdfs-site": { "hdfs-site": {
@ -71,16 +73,16 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
} }
} }
] ]
self.assertConfigEqual(expected, ng_configs) self.assertConfigEqual(expected, instance_configs)
def test_get_ng_params(self): def test_get_instance_params(self):
self.ng.node_configs = { self.ng.node_configs = {
"YARN": { "YARN": {
"mapreduce.map.java.opts": "-Dk=v", "mapreduce.map.java.opts": "-Dk=v",
"yarn.scheduler.minimum-allocation-mb": "256" "yarn.scheduler.minimum-allocation-mb": "256"
} }
} }
ng_configs = configs.get_ng_params(self.ng) instance_configs = configs.get_instance_params(self.instance)
expected = [ expected = [
{ {
"hdfs-site": { "hdfs-site": {
@ -111,4 +113,4 @@ class AmbariConfigsTestCase(base.SaharaTestCase):
} }
} }
] ]
self.assertConfigEqual(expected, ng_configs) self.assertConfigEqual(expected, instance_configs)

View File

@ -29,6 +29,11 @@ class TestServer(object):
self.public_ip = public_ip self.public_ip = public_ip
self.internal_ip = private_ip self.internal_ip = private_ip
self.node_group = None self.node_group = None
self.sahara_instance = self
self.storage_path = ['/mnt']
def storage_paths(self):
return self.storage_path
def fqdn(self): def fqdn(self):
return self.inst_fqdn return self.inst_fqdn
@ -81,10 +86,6 @@ class TestNodeGroup(object):
self.node_processes = node_processes self.node_processes = node_processes
self.count = count self.count = count
self.id = name self.id = name
self.ng_storage_paths = []
def storage_paths(self):
return self.ng_storage_paths
class TestUserInputConfig(object): class TestUserInputConfig(object):

View File

@ -763,25 +763,32 @@ class ServicesTest(base.SaharaTestCase):
for version in versions: for version in versions:
s = self.get_services_processor(version=version) s = self.get_services_processor(version=version)
service = s.create_service('AMBARI') service = s.create_service('AMBARI')
ng1 = hdp_test_base.TestNodeGroup(None, None, None) server1 = hdp_test_base.TestServer(
ng1.ng_storage_paths = ['/mnt', '/volume/disk1'] 'host1', 'test-master', '11111', 3, '1.1.1.1', '2.2.2.2')
ng2 = hdp_test_base.TestNodeGroup(None, None, None) server2 = hdp_test_base.TestServer(
ng2.ng_storage_paths = ['/mnt'] 'host2', 'test-slave', '11111', 3, '3.3.3.3', '4.4.4.4')
server3 = hdp_test_base.TestServer(
'host3', 'another-test', '11111', 3, '6.6.6.6', '5.5.5.5')
ng1 = hdp_test_base.TestNodeGroup('ng1', [server1], None)
ng2 = hdp_test_base.TestNodeGroup('ng2', [server2], None)
ng3 = hdp_test_base.TestNodeGroup('ng3', [server3], None)
server1.storage_path = ['/volume/disk1']
server2.storage_path = ['/mnt']
paths = service._get_common_paths([ng1, ng2]) paths = service._get_common_paths([ng1, ng2])
self.assertEqual(['/mnt'], paths) self.assertEqual([], paths)
ng3 = hdp_test_base.TestNodeGroup(None, None, None) server1.storage_path = ['/volume/disk1', '/volume/disk2']
ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] server2.storage_path = ['/mnt']
ng2.ng_storage_paths = ['/mnt'] server3.storage_path = ['/volume/disk1']
ng3.ng_storage_paths = ['/mnt', '/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3]) paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual(['/mnt'], paths) self.assertEqual([], paths)
ng1.ng_storage_paths = ['/mnt', '/volume/disk1', '/volume/disk2'] server1.storage_path = ['/volume/disk1', '/volume/disk2']
ng2.ng_storage_paths = ['/mnt', '/volume/disk1'] server2.storage_path = ['/volume/disk1']
ng3.ng_storage_paths = ['/mnt', '/volume/disk1'] server3.storage_path = ['/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3]) paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual(['/volume/disk1'], paths) self.assertEqual(['/volume/disk1'], paths)

View File

@ -79,13 +79,13 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
volumes.detach_from_instance(instance)) volumes.detach_from_instance(instance))
@base.mock_thread_group @base.mock_thread_group
@mock.patch('sahara.service.volumes._mount_volume') @mock.patch('sahara.service.volumes.mount_to_instances')
@mock.patch('sahara.service.volumes._await_attach_volumes') @mock.patch('sahara.service.volumes._await_attach_volumes')
@mock.patch('sahara.service.volumes._create_attach_volume') @mock.patch('sahara.service.volumes._create_attach_volume')
@mock.patch('sahara.utils.cluster_progress_ops.add_successful_event') @mock.patch('sahara.utils.cluster_progress_ops.add_successful_event')
@mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step') @mock.patch('sahara.utils.cluster_progress_ops.add_provisioning_step')
def test_attach(self, add_step, add_event, def test_attach(self, add_step, add_event, p_create_attach_vol, p_await,
p_create_attach_vol, p_await, p_mount): p_mount):
p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2 p_create_attach_vol.side_effect = ['/dev/vdb', '/dev/vdc'] * 2
p_await.return_value = None p_await.return_value = None
p_mount.return_value = None p_mount.return_value = None
@ -115,7 +115,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
volumes.attach_to_instances(cluster_utils.get_instances(cluster)) volumes.attach_to_instances(cluster_utils.get_instances(cluster))
self.assertEqual(4, p_create_attach_vol.call_count) self.assertEqual(4, p_create_attach_vol.call_count)
self.assertEqual(2, p_await.call_count) self.assertEqual(2, p_await.call_count)
self.assertEqual(4, p_mount.call_count) self.assertEqual(1, p_mount.call_count)
@mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0)
@mock.patch('sahara.context.sleep') @mock.patch('sahara.context.sleep')
@ -157,3 +157,35 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
inst.remote.return_value = inst_remote inst.remote.return_value = inst_remote
return inst return inst
def test_find_instance_volume_devices(self):
instance = self._get_instance()
ex_cmd = instance.remote().execute_command
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdc'
mounted_info = '/dev/vda1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb', '/dev/vdc'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1 /dev/vdb1'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb2'], diff)
attached_info = '/dev/vda /dev/vda1 /dev/vdb /dev/vdb1 /dev/vdb2'
mounted_info = '/dev/vda1 /dev/vdb2'
ex_cmd.side_effect = [(0, attached_info), (0, mounted_info)]
diff = volumes._find_instance_devices(instance)
self.assertEqual(['/dev/vdb1'], diff)