Refactoring vanilla 2 plugin

Separation vanilla 2 plugin in two parts:
* version specific code (configs)
* common code (provisioning, scaling, validation, etc...)

This patch needed to support various hadoop versions

Partially implements blueprint: add-vanilla-2-hadoop-2-4-0

Change-Id: I41678aed2bfb767c04af8bd52d28cd265fa125d7
This commit is contained in:
Sergey Reshetnyak 2014-04-28 20:37:13 +04:00
parent 6cb3d14b88
commit a0503b839d
20 changed files with 254 additions and 219 deletions

View File

@ -9,13 +9,13 @@ include sahara/db/migration/alembic_migrations/versions/README
recursive-include sahara/locale *
include sahara/plugins/vanilla/v1_2_1/resources/*.xml
include sahara/plugins/vanilla/hadoop2/resources/*.sh
include sahara/plugins/vanilla/hadoop2/resources/*.sql
include sahara/plugins/vanilla/hadoop2/resources/*.template
include sahara/plugins/vanilla/v1_2_1/resources/*.sh
include sahara/plugins/vanilla/v1_2_1/resources/*.sql
include sahara/plugins/vanilla/v1_2_1/resources/*.xml
include sahara/plugins/vanilla/v2_3_0/resources/*.xml
include sahara/plugins/vanilla/v2_3_0/resources/*.sh
include sahara/plugins/vanilla/v2_3_0/resources/*.sql
include sahara/plugins/vanilla/v2_3_0/resources/*.template
include sahara/plugins/hdp/versions/version_1_3_2/resources/*.template
include sahara/plugins/hdp/versions/version_1_3_2/resources/*.json
include sahara/plugins/hdp/versions/version_1_3_2/resources/*.sh
@ -28,7 +28,7 @@ include sahara/plugins/spark/resources/*.template
include sahara/resources/*.heat
include sahara/service/edp/resources/*.xml
include sahara/swift/resources/*.xml
include sahara/tests/unit/plugins/vanilla/v2_3_0/resources/*.txt
include sahara/tests/unit/plugins/vanilla/hadoop2/resources/*.txt
include sahara/tests/unit/resources/*.heat
include sahara/tests/unit/resources/*.xml
include sahara/tests/unit/resources/*.txt

View File

@ -16,9 +16,9 @@
import six
from sahara.openstack.common import log as logging
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.plugins.vanilla.hadoop2 import oozie_helper as o_helper
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.plugins.vanilla.v2_3_0 import oozie_helper as o_helper
from sahara.swift import swift_helper as swift
from sahara.topology import topology_helper as th
from sahara.utils import files as f
@ -32,40 +32,39 @@ HADOOP_USER = 'hadoop'
HADOOP_GROUP = 'hadoop'
def configure_cluster(cluster):
def configure_cluster(pctx, cluster):
LOG.debug("Configuring cluster \"%s\"", cluster.name)
instances = []
for node_group in cluster.node_groups:
for instance in node_group.instances:
instances.append(instance)
configure_instances(instances)
configure_topology_data(cluster)
configure_instances(pctx, instances)
configure_topology_data(pctx, cluster)
def configure_instances(instances):
def configure_instances(pctx, instances):
for instance in instances:
_provisioning_configs(instance)
_post_configuration(instance)
_provisioning_configs(pctx, instance)
_post_configuration(pctx, instance)
def _provisioning_configs(instance):
xmls, env = _generate_configs(instance.node_group)
def _provisioning_configs(pctx, instance):
xmls, env = _generate_configs(pctx, instance.node_group)
_push_xml_configs(instance, xmls)
_push_env_configs(instance, env)
def _generate_configs(node_group):
user_xml_confs, user_env_confs = _get_user_configs(node_group)
hadoop_xml_confs, default_env_confs = _get_hadoop_configs(node_group)
def _generate_configs(pctx, node_group):
hadoop_xml_confs = _get_hadoop_configs(pctx, node_group)
user_xml_confs, user_env_confs = _get_user_configs(pctx, node_group)
xml_confs = _merge_configs(user_xml_confs, hadoop_xml_confs)
env_confs = _merge_configs(default_env_confs, user_env_confs)
env_confs = _merge_configs(pctx['env_confs'], user_env_confs)
return xml_confs, env_confs
def _get_hadoop_configs(node_group):
def _get_hadoop_configs(pctx, node_group):
cluster = node_group.cluster
nn_hostname = vu.get_instance_hostname(vu.get_namenode(cluster))
dirs = _get_hadoop_dirs(node_group)
@ -108,39 +107,38 @@ def _get_hadoop_configs(node_group):
confs['Hadoop'].update(hadoop_cfg)
oozie_cfg = o_helper.get_oozie_required_xml_configs(HADOOP_CONF_DIR)
if c_helper.is_mysql_enabled(cluster):
if c_helper.is_mysql_enabled(pctx, cluster):
oozie_cfg.update(o_helper.get_oozie_mysql_configs())
confs['JobFlow'] = oozie_cfg
if c_helper.get_config_value(c_helper.ENABLE_SWIFT.applicable_target,
c_helper.ENABLE_SWIFT.name, cluster):
if c_helper.is_swift_enabled(pctx, cluster):
swift_configs = {}
for config in swift.get_swift_configs():
swift_configs[config['name']] = config['value']
confs['Hadoop'].update(swift_configs)
if c_helper.is_data_locality_enabled(cluster):
if c_helper.is_data_locality_enabled(pctx, cluster):
confs['Hadoop'].update(th.TOPOLOGY_CONFIG)
confs['Hadoop'].update({"topology.script.file.name":
HADOOP_CONF_DIR + "/topology.sh"})
return confs, c_helper.get_env_configs()
return confs
def _get_user_configs(node_group):
ng_xml_confs, ng_env_confs = _separate_configs(node_group.node_configs)
def _get_user_configs(pctx, node_group):
ng_xml_confs, ng_env_confs = _separate_configs(node_group.node_configs,
pctx['env_confs'])
cl_xml_confs, cl_env_confs = _separate_configs(
node_group.cluster.cluster_configs)
node_group.cluster.cluster_configs, pctx['env_confs'])
xml_confs = _merge_configs(cl_xml_confs, ng_xml_confs)
env_confs = _merge_configs(cl_env_confs, ng_env_confs)
return xml_confs, env_confs
def _separate_configs(configs):
all_env_configs = c_helper.get_env_configs()
def _separate_configs(configs, all_env_configs):
xml_configs = {}
env_configs = {}
for service, params in six.iteritems(configs):
@ -228,7 +226,7 @@ def _push_configs_to_instance(instance, configs):
r.write_file_to(fl, data, run_as_root=True)
def _post_configuration(instance):
def _post_configuration(pctx, instance):
node_group = instance.node_group
dirs = _get_hadoop_dirs(node_group)
args = {
@ -243,7 +241,7 @@ def _post_configuration(instance):
'yarn_log_dir': dirs['yarn_log_dir']
}
post_conf_script = f.get_file_text(
'plugins/vanilla/v2_3_0/resources/post_conf.template')
'plugins/vanilla/hadoop2/resources/post_conf.template')
post_conf_script = post_conf_script.format(**args)
with instance.remote() as r:
@ -251,10 +249,11 @@ def _post_configuration(instance):
r.execute_command('chmod +x /tmp/post_conf.sh')
r.execute_command('sudo /tmp/post_conf.sh')
if c_helper.is_data_locality_enabled(instance.node_group.cluster):
if c_helper.is_data_locality_enabled(pctx,
instance.node_group.cluster):
t_script = HADOOP_CONF_DIR + '/topology.sh'
r.write_file_to(t_script, f.get_file_text(
'plugins/vanilla/v2_3_0/resources/topology.sh'),
'plugins/vanilla/hadoop2/resources/topology.sh'),
run_as_root=True)
r.execute_command('chmod +x ' + t_script, run_as_root=True)
@ -295,8 +294,8 @@ def _merge_configs(a, b):
return res
def configure_topology_data(cluster):
if c_helper.is_data_locality_enabled(cluster):
def configure_topology_data(pctx, cluster):
if c_helper.is_data_locality_enabled(pctx, cluster):
LOG.info("Node group awareness is not implemented in YARN yet "
"so enable_hypervisor_awareness set to False explicitly")
tpl_map = th.generate_topology_map(cluster, is_node_awareness=False)

View File

@ -0,0 +1,177 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from sahara import exceptions as ex
from sahara.plugins import provisioning as p
from sahara.utils import types
CONF = cfg.CONF
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
HIDDEN_CONFS = [
'dfs.hosts',
'dfs.hosts.exclude',
'dfs.namenode.data.dir',
'dfs.namenode.name.dir',
'fs.default.name',
'fs.defaultFS',
'fs.swift.impl',
'hadoop.proxyuser.hadoop.groups',
'hadoop.proxyuser.hadoop.hosts',
'mapreduce.framework.name',
'mapreduce.jobhistory.address',
'mapreduce.jobhistory.done.dir',
'mapreduce.jobhistory.intermediate-done-dir',
'mapreduce.jobhistory.webapp.address',
'yarn.nodemanager.aux-services',
'yarn.resourcemanager.address',
'yarn.resourcemanager.admin.address',
'yarn.resourcemanager.hostname',
'yarn.resourcemanager.nodes.exclude-path',
'yarn.resourcemanager.nodes.include-path',
'yarn.resourcemanager.resource-tracker.address',
'yarn.resourcemanager.scheduler.address',
'yarn.resourcemanager.webapp.address'
]
CLUSTER_WIDE_CONFS = [
'dfs.blocksize', 'dfs.namenode.replication.min', 'dfs.permissions.enabled',
'dfs.replication', 'dfs.replication.max', 'io.compression.codecs',
'io.file.buffer.size', 'mapreduce.job.counters.max',
'mapreduce.map.output.compress.codec',
'mapreduce.output.fileoutputformat.compress.codec',
'mapreduce.output.fileoutputformat.compress.type',
'mapredude.map.output.compress',
'mapredude.output.fileoutputformat.compress'
]
PRIORITY_1_CONFS = [
'dfs.datanode.du.reserved',
'dfs.datanode.failed.volumes.tolerated',
'dfs.datanode.handler.count',
'dfs.datanode.max.transfer.threads',
'dfs.namenode.handler.count',
'mapred.child.java.opts',
'mapred.jobtracker.maxtasks.per.job',
'mapreduce.jobtracker.handler.count',
'mapreduce.map.java.opts',
'mapreduce.reduce.java.opts',
'mapreduce.task.io.sort.mb',
'mapreduce.tasktracker.map.tasks.maximum',
'mapreduce.tasktracker.reduce.tasks.maximum',
'yarn.nodemanager.resource.cpu-vcores',
'yarn.nodemanager.resource.memory-mb',
'yarn.scheduler.maximum-allocation-mb',
'yarn.scheduler.maximum-allocation-vcores',
'yarn.scheduler.minimum-allocation-mb',
'yarn.scheduler.minimum-allocation-vcores'
]
# for now we have not so many cluster-wide configs
# lets consider all of them having high priority
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
def init_xml_configs(xml_confs):
configs = []
for service, config_lists in xml_confs.iteritems():
for config_list in config_lists:
for config in config_list:
if config['name'] not in HIDDEN_CONFS:
cfg = p.Config(config['name'], service, "node",
is_optional=True, config_type="string",
default_value=str(config['value']),
description=config['description'])
if cfg.default_value in ["true", "false"]:
cfg.config_type = "bool"
cfg.default_value = (cfg.default_value == 'true')
elif types.is_int(cfg.default_value):
cfg.config_type = "int"
cfg.default_value = int(cfg.default_value)
if config['name'] in CLUSTER_WIDE_CONFS:
cfg.scope = 'cluster'
if config['name'] in PRIORITY_1_CONFS:
cfg.priority = 1
configs.append(cfg)
return configs
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=False)
ENABLE_MYSQL = p.Config('Enable MySQL', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
def init_env_configs(env_confs):
configs = []
for service, config_items in env_confs.iteritems():
for name, value in config_items.iteritems():
configs.append(p.Config(name, service, "node",
default_value=value, priority=1,
config_type="int"))
return configs
def _init_general_configs():
configs = [ENABLE_SWIFT, ENABLE_MYSQL]
if CONF.enable_data_locality:
configs.append(ENABLE_DATA_LOCALITY)
return configs
PLUGIN_GENERAL_CONFIGS = _init_general_configs()
def get_config_value(pctx, service, name, cluster=None):
if cluster:
for ng in cluster.node_groups:
cl_param = ng.configuration().get(service, {}).get(name)
if cl_param is not None:
return cl_param
for c in pctx['all_confs']:
if c.applicable_target == service and c.name == name:
return c.default_value
raise ex.NotFoundException(
name, "Unable to get parameter '%s' from service %s" % (name, service))
def is_swift_enabled(pctx, cluster):
return get_config_value(pctx, ENABLE_SWIFT.applicable_target,
ENABLE_SWIFT.name, cluster)
def is_mysql_enabled(pctx, cluster):
return get_config_value(
pctx, ENABLE_MYSQL.applicable_target, ENABLE_MYSQL.name, cluster)
def is_data_locality_enabled(pctx, cluster):
if not CONF.enable_data_locality:
return False
return get_config_value(pctx, ENABLE_DATA_LOCALITY.applicable_target,
ENABLE_DATA_LOCALITY.name, cluster)

View File

@ -16,8 +16,8 @@
from sahara import context
from sahara.openstack.common import log as logging
from sahara.plugins.general import exceptions as ex
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.utils import files
from sahara.utils import general as g
@ -51,12 +51,12 @@ def start_historyserver(instance):
'sudo su - -c "mr-jobhistory-daemon.sh start historyserver" hadoop')
def start_oozie_process(instance):
def start_oozie_process(pctx, instance):
with instance.remote() as r:
if c_helper.is_mysql_enabled(instance.node_group.cluster):
if c_helper.is_mysql_enabled(pctx, instance.node_group.cluster):
_start_mysql(r)
sql_script = files.get_file_text(
'plugins/vanilla/v2_3_0/resources/create_oozie_db.sql')
'plugins/vanilla/hadoop2/resources/create_oozie_db.sql')
r.write_file_to('/tmp/create_oozie_db.sql', sql_script)
_oozie_create_db(r)
@ -92,7 +92,7 @@ def _oozie_share_lib(remote):
remote.execute_command(
'sudo su - -c "mkdir /tmp/oozielib && '
'tar zxf /opt/oozie/oozie-sharelib-4.0.0.tar.gz -C '
'tar zxf /opt/oozie/oozie-sharelib-*.tar.gz -C '
'/tmp/oozielib && '
'hadoop fs -mkdir /user && '
'hadoop fs -mkdir /user/hadoop && '

View File

@ -17,20 +17,20 @@ from sahara import context
from sahara.openstack.common import timeutils
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla.hadoop2 import config
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
from sahara.plugins.vanilla.hadoop2 import utils as pu
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config
from sahara.plugins.vanilla.v2_3_0 import run_scripts as run
from sahara.plugins.vanilla.v2_3_0 import utils as pu
HADOOP_CONF_DIR = config.HADOOP_CONF_DIR
def scale_cluster(cluster, instances):
config.configure_instances(instances)
def scale_cluster(pctx, cluster, instances):
config.configure_instances(pctx, instances)
_update_include_files(cluster)
run.refresh_hadoop_nodes(cluster)
run.refresh_yarn_nodes(cluster)
config.configure_topology_data(cluster)
config.configure_topology_data(pctx, cluster)
for instance in instances:
run.start_instance(instance)
@ -61,7 +61,7 @@ def _update_include_files(cluster):
nm_hosts, HADOOP_CONF_DIR))
def decommission_nodes(cluster, instances):
def decommission_nodes(pctx, cluster, instances):
datanodes = _get_instances_with_service(instances, 'datanode')
nodemanagers = _get_instances_with_service(instances, 'nodemanager')
_update_exclude_files(cluster, instances)
@ -75,7 +75,7 @@ def decommission_nodes(cluster, instances):
_update_include_files(cluster)
_clear_exclude_files(cluster)
config.configure_topology_data(cluster)
config.configure_topology_data(pctx, cluster)
def _update_exclude_files(cluster, instances):

View File

@ -15,8 +15,8 @@
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla.hadoop2 import config_helper as cu
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.utils import general as gu
@ -82,7 +82,7 @@ def validate_additional_ng_scaling(cluster, additional):
raise ex.NodeGroupCannotBeScaled(ng.name, msg)
def validate_existing_ng_scaling(cluster, existing):
def validate_existing_ng_scaling(pctx, cluster, existing):
scalable_processes = _get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
@ -97,7 +97,7 @@ def validate_existing_ng_scaling(cluster, existing):
ng.name, msg % ' '.join(ng.node_processes))
dn_amount = len(vu.get_datanodes(cluster))
rep_factor = c_helper.get_config_value('HDFS', 'dfs.replication', cluster)
rep_factor = cu.get_config_value(pctx, 'HDFS', 'dfs.replication', cluster)
if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor:
msg = ("Vanilla plugin cannot shrink cluster because it would be not "

View File

@ -15,10 +15,8 @@
from oslo.config import cfg
from sahara import exceptions as ex
from sahara.openstack.common import log as logging
from sahara.plugins import provisioning as p
from sahara.utils import types as types
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.utils import xmlutils as x
CONF = cfg.CONF
@ -67,136 +65,17 @@ ENV_CONFS = {
}
}
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=False)
ENABLE_MYSQL = p.Config('Enable MySQL', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster',
config_type="bool", priority=1,
default_value=True, is_optional=True)
HIDDEN_CONFS = [
'dfs.hosts',
'dfs.hosts.exclude',
'dfs.namenode.data.dir',
'dfs.namenode.name.dir',
'fs.default.name',
'fs.defaultFS',
'fs.swift.impl',
'hadoop.proxyuser.hadoop.groups',
'hadoop.proxyuser.hadoop.hosts',
'mapreduce.framework.name',
'mapreduce.jobhistory.address',
'mapreduce.jobhistory.done.dir',
'mapreduce.jobhistory.intermediate-done-dir',
'mapreduce.jobhistory.webapp.address',
'yarn.nodemanager.aux-services',
'yarn.resourcemanager.address',
'yarn.resourcemanager.admin.address',
'yarn.resourcemanager.hostname',
'yarn.resourcemanager.nodes.exclude-path',
'yarn.resourcemanager.nodes.include-path',
'yarn.resourcemanager.resource-tracker.address',
'yarn.resourcemanager.scheduler.address',
'yarn.resourcemanager.webapp.address'
]
CLUSTER_WIDE_CONFS = [
'dfs.blocksize', 'dfs.namenode.replication.min', 'dfs.permissions.enabled',
'dfs.replication', 'dfs.replication.max', 'io.compression.codecs',
'io.file.buffer.size', 'mapreduce.job.counters.max',
'mapreduce.map.output.compress.codec',
'mapreduce.output.fileoutputformat.compress.codec',
'mapreduce.output.fileoutputformat.compress.type',
'mapredude.map.output.compress',
'mapredude.output.fileoutputformat.compress'
]
PRIORITY_1_CONFS = [
'dfs.datanode.du.reserved',
'dfs.datanode.failed.volumes.tolerated',
'dfs.datanode.handler.count',
'dfs.datanode.max.transfer.threads',
'dfs.namenode.handler.count',
'mapred.child.java.opts',
'mapred.jobtracker.maxtasks.per.job',
'mapreduce.jobtracker.handler.count',
'mapreduce.map.java.opts',
'mapreduce.reduce.java.opts',
'mapreduce.task.io.sort.mb',
'mapreduce.tasktracker.map.tasks.maximum',
'mapreduce.tasktracker.reduce.tasks.maximum',
'yarn.nodemanager.resource.cpu-vcores',
'yarn.nodemanager.resource.memory-mb',
'yarn.scheduler.maximum-allocation-mb',
'yarn.scheduler.maximum-allocation-vcores',
'yarn.scheduler.minimum-allocation-mb',
'yarn.scheduler.minimum-allocation-vcores'
]
# for now we have not so many cluster-wide configs
# lets consider all of them having high priority
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
def _init_xml_configs():
configs = []
for service, config_lists in XML_CONFS.iteritems():
for config_list in config_lists:
for config in config_list:
if config['name'] not in HIDDEN_CONFS:
cfg = p.Config(config['name'], service, "node",
is_optional=True, config_type="string",
default_value=str(config['value']),
description=config['description'])
if cfg.default_value in ["true", "false"]:
cfg.config_type = "bool"
cfg.default_value = (cfg.default_value == 'true')
elif types.is_int(cfg.default_value):
cfg.config_type = "int"
cfg.default_value = int(cfg.default_value)
if config['name'] in CLUSTER_WIDE_CONFS:
cfg.scope = 'cluster'
if config['name'] in PRIORITY_1_CONFS:
cfg.priority = 1
configs.append(cfg)
return configs
def _init_env_configs():
configs = []
for service, config_items in ENV_CONFS.iteritems():
for name, value in config_items.iteritems():
configs.append(p.Config(name, service, "node",
default_value=value, priority=1,
config_type="int"))
return configs
def _init_general_configs():
configs = [ENABLE_SWIFT, ENABLE_MYSQL]
if CONF.enable_data_locality:
configs.append(ENABLE_DATA_LOCALITY)
return configs
# Initialise plugin Hadoop configurations
PLUGIN_XML_CONFIGS = _init_xml_configs()
PLUGIN_ENV_CONFIGS = _init_env_configs()
PLUGIN_GENERAL_CONFIGS = _init_general_configs()
PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS)
PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS)
def _init_all_configs():
configs = []
configs.extend(PLUGIN_XML_CONFIGS)
configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(PLUGIN_GENERAL_CONFIGS)
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
return configs
@ -213,30 +92,3 @@ def get_xml_configs():
def get_env_configs():
return ENV_CONFS
def get_config_value(service, name, cluster=None):
if cluster:
for ng in cluster.node_groups:
cl_param = ng.configuration().get(service, {}).get(name)
if cl_param is not None:
return cl_param
for c in get_plugin_configs():
if c.applicable_target == service and c.name == name:
return c.default_value
raise ex.NotFoundException(
name, "Unable to get parameter '%s' from service %s" % (name, service))
def is_mysql_enabled(cluster):
return get_config_value(
ENABLE_MYSQL.applicable_target, ENABLE_MYSQL.name, cluster)
def is_data_locality_enabled(cluster):
if not CONF.enable_data_locality:
return False
return get_config_value(ENABLE_DATA_LOCALITY.applicable_target,
ENABLE_DATA_LOCALITY.name, cluster)

View File

@ -19,12 +19,13 @@ from sahara import conductor
from sahara import context
from sahara.openstack.common import log as logging
from sahara.plugins.vanilla import abstractversionhandler as avm
from sahara.plugins.vanilla.hadoop2 import config as c
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
from sahara.plugins.vanilla.hadoop2 import scaling as sc
from sahara.plugins.vanilla.hadoop2 import validation as vl
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config as c
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.plugins.vanilla.v2_3_0 import run_scripts as run
from sahara.plugins.vanilla.v2_3_0 import scaling as sc
from sahara.plugins.vanilla.v2_3_0 import validation as vl
conductor = conductor.API
LOG = logging.getLogger(__name__)
@ -32,8 +33,14 @@ CONF = cfg.CONF
class VersionHandler(avm.AbstractVersionHandler):
def __init__(self):
self.pctx = {
'env_confs': c_helper.get_env_configs(),
'all_confs': c_helper.get_plugin_configs()
}
def get_plugin_configs(self):
return c_helper.get_plugin_configs()
return self.pctx['all_confs']
def get_node_processes(self):
return {
@ -51,7 +58,7 @@ class VersionHandler(avm.AbstractVersionHandler):
pass
def configure_cluster(self, cluster):
c.configure_cluster(cluster)
c.configure_cluster(self.pctx, cluster)
def start_cluster(self, cluster):
nn = vu.get_namenode(cluster)
@ -79,19 +86,19 @@ class VersionHandler(avm.AbstractVersionHandler):
oo = vu.get_oozie(cluster)
if oo:
run.start_oozie_process(oo)
run.start_oozie_process(self.pctx, oo)
self._set_cluster_info(cluster)
def decommission_nodes(self, cluster, instances):
sc.decommission_nodes(cluster, instances)
sc.decommission_nodes(self.pctx, cluster, instances)
def validate_scaling(self, cluster, existing, additional):
vl.validate_additional_ng_scaling(cluster, additional)
vl.validate_existing_ng_scaling(cluster, existing)
vl.validate_existing_ng_scaling(self.pctx, cluster, existing)
def scale_cluster(self, cluster, instances):
sc.scale_cluster(cluster, instances)
sc.scale_cluster(self.pctx, cluster, instances)
def _set_cluster_info(self, cluster):
nn = vu.get_namenode(cluster)

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.vanilla.v2_3_0 import config as c
from sahara.plugins.vanilla.hadoop2 import config as c
from sahara.tests.unit import base

View File

@ -15,7 +15,7 @@
import mock
from sahara.plugins.vanilla.v2_3_0 import utils as u
from sahara.plugins.vanilla.hadoop2 import utils as u
from sahara.tests.unit import base
from sahara.utils import files
@ -24,7 +24,7 @@ class UtilsTestCase(base.SaharaTestCase):
@mock.patch('sahara.plugins.vanilla.utils.get_namenode')
def test_datanodes_status(self, nn):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/dfs-report.txt')
'tests/unit/plugins/vanilla/hadoop2/resources/dfs-report.txt')
nn.return_value = self._get_instance(report)
statuses = u.get_datanodes_status(None)
@ -41,7 +41,7 @@ class UtilsTestCase(base.SaharaTestCase):
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
def test_nodemanagers_status(self, rm):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/yarn-report.txt')
'tests/unit/plugins/vanilla/hadoop2/resources/yarn-report.txt')
rm.return_value = self._get_instance(report)
statuses = u.get_nodemanagers_status(None)