Removed dependency on Spark plugin in edp code

The EDP Spark engine was importing a config helper from the Spark
plugin.
The helper was moved to common plugin utils and now is imported from
there by both the plugin and the engine.

This is the part of sahara and plugins split.

Partially-implements bp: move-plugins-to-separate-repo
Change-Id: Ie84cc163a09bf1e7b58fcdb08e0647a85492593b
This commit is contained in:
Nikita Konovalov 2015-06-10 17:01:04 +03:00
parent 93a6a5822d
commit f7d1ec55a8
7 changed files with 80 additions and 53 deletions

View File

@ -18,7 +18,6 @@ from oslo_log import log as logging
import six
from sahara import conductor as c
from sahara.i18n import _
from sahara.plugins import provisioning as p
from sahara.plugins import utils
from sahara.swift import swift_helper as swift
@ -237,22 +236,6 @@ def get_plugin_configs():
return PLUGIN_CONFIGS
def get_config_value(service, name, cluster=None):
if cluster:
for ng in cluster.node_groups:
if (ng.configuration().get(service) and
ng.configuration()[service].get(name)):
return ng.configuration()[service][name]
for configs in PLUGIN_CONFIGS:
if configs.applicable_target == service and configs.name == name:
return configs.default_value
raise RuntimeError(_("Unable to get parameter '%(param_name)s' from "
"service %(service)s"),
{'param_name': name, 'service': service})
def generate_cfg_from_general(cfg, configs, general_config,
rest_excluded=False):
if 'general' in configs:
@ -336,35 +319,49 @@ def generate_spark_env_configs(cluster):
# to /opt/spark/conf
configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR)
masterport = get_config_value("Spark", "Master port", cluster)
masterport = utils.get_config_value_or_default("Spark",
"Master port",
cluster)
if masterport and masterport != _get_spark_opt_default("Master port"):
configs.append('SPARK_MASTER_PORT=' + str(masterport))
masterwebport = get_config_value("Spark", "Master webui port", cluster)
masterwebport = utils.get_config_value_or_default("Spark",
"Master webui port",
cluster)
if (masterwebport and
masterwebport != _get_spark_opt_default("Master webui port")):
configs.append('SPARK_MASTER_WEBUI_PORT=' + str(masterwebport))
# configuration for workers
workercores = get_config_value("Spark", "Worker cores", cluster)
workercores = utils.get_config_value_or_default("Spark",
"Worker cores",
cluster)
if workercores and workercores != _get_spark_opt_default("Worker cores"):
configs.append('SPARK_WORKER_CORES=' + str(workercores))
workermemory = get_config_value("Spark", "Worker memory", cluster)
workermemory = utils.get_config_value_or_default("Spark",
"Worker memory",
cluster)
if (workermemory and
workermemory != _get_spark_opt_default("Worker memory")):
configs.append('SPARK_WORKER_MEMORY=' + str(workermemory))
workerport = get_config_value("Spark", "Worker port", cluster)
workerport = utils.get_config_value_or_default("Spark",
"Worker port",
cluster)
if workerport and workerport != _get_spark_opt_default("Worker port"):
configs.append('SPARK_WORKER_PORT=' + str(workerport))
workerwebport = get_config_value("Spark", "Worker webui port", cluster)
workerwebport = utils.get_config_value_or_default("Spark",
"Worker webui port",
cluster)
if (workerwebport and
workerwebport != _get_spark_opt_default("Worker webui port")):
configs.append('SPARK_WORKER_WEBUI_PORT=' + str(workerwebport))
workerinstances = get_config_value("Spark", "Worker instances", cluster)
workerinstances = utils.get_config_value_or_default("Spark",
"Worker instances",
cluster)
if (workerinstances and
workerinstances != _get_spark_opt_default("Worker instances")):
configs.append('SPARK_WORKER_INSTANCES=' + str(workerinstances))
@ -377,7 +374,9 @@ def generate_spark_slaves_configs(workernames):
def generate_spark_executor_classpath(cluster):
cp = get_config_value("Spark", "Executor extra classpath", cluster)
cp = utils.get_config_value_or_default("Spark",
"Executor extra classpath",
cluster)
if cp:
return "spark.executor.extraClassPath " + cp
return "\n"
@ -444,11 +443,11 @@ def generate_hadoop_setup_script(storage_paths, env_configs):
def generate_job_cleanup_config(cluster):
args = {
'minimum_cleanup_megabytes': get_config_value(
'minimum_cleanup_megabytes': utils.get_config_value_or_default(
"Spark", "Minimum cleanup megabytes", cluster),
'minimum_cleanup_seconds': get_config_value(
'minimum_cleanup_seconds': utils.get_config_value_or_default(
"Spark", "Minimum cleanup seconds", cluster),
'maximum_cleanup_seconds': get_config_value(
'maximum_cleanup_seconds': utils.get_config_value_or_default(
"Spark", "Maximum cleanup seconds", cluster)
}
job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and
@ -510,5 +509,5 @@ def get_decommissioning_timeout(cluster):
def get_port_from_config(service, name, cluster=None):
address = get_config_value(service, name, cluster)
address = utils.get_config_value_or_default(service, name, cluster)
return utils.get_port_from_address(address)

View File

@ -76,8 +76,9 @@ class SparkProvider(p.ProvisioningPluginBase):
raise ex.InvalidComponentCountException("datanode", _("1 or more"),
nn_count)
rep_factor = c_helper.get_config_value('HDFS', "dfs.replication",
cluster)
rep_factor = utils.get_config_value_or_default('HDFS',
"dfs.replication",
cluster)
if dn_count < rep_factor:
raise ex.InvalidComponentCountException(
'datanode', _('%s or more') % rep_factor, dn_count,
@ -149,7 +150,9 @@ class SparkProvider(p.ProvisioningPluginBase):
self._set_cluster_info(cluster)
def _spark_home(self, cluster):
return c_helper.get_config_value("Spark", "Spark home", cluster)
return utils.get_config_value_or_default("Spark",
"Spark home",
cluster)
def _extract_configs_to_extra(self, cluster):
nn = utils.get_instance(cluster, "namenode")
@ -378,7 +381,7 @@ class SparkProvider(p.ProvisioningPluginBase):
info = {}
if nn:
address = c_helper.get_config_value(
address = utils.get_config_value_or_default(
'HDFS', 'dfs.http.address', cluster)
port = address[address.rfind(':') + 1:]
info['HDFS'] = {
@ -387,7 +390,7 @@ class SparkProvider(p.ProvisioningPluginBase):
info['HDFS']['NameNode'] = 'hdfs://%s:8020' % nn.hostname()
if sp_master:
port = c_helper.get_config_value(
port = utils.get_config_value_or_default(
'Spark', 'Master webui port', cluster)
if port is not None:
info['Spark'] = {
@ -469,8 +472,9 @@ class SparkProvider(p.ProvisioningPluginBase):
' '.join(ng.node_processes))
dn_amount = len(utils.get_instances(cluster, "datanode"))
rep_factor = c_helper.get_config_value('HDFS', "dfs.replication",
cluster)
rep_factor = utils.get_config_value_or_default('HDFS',
"dfs.replication",
cluster)
if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor:
raise ex.ClusterCannotBeScaled(
@ -504,14 +508,16 @@ class SparkProvider(p.ProvisioningPluginBase):
'namenode': [8020, 50070, 50470],
'datanode': [50010, 1004, 50075, 1006, 50020],
'master': [
int(c_helper.get_config_value("Spark", "Master port",
cluster)),
int(c_helper.get_config_value("Spark", "Master webui port",
cluster)),
int(utils.get_config_value_or_default("Spark", "Master port",
cluster)),
int(utils.get_config_value_or_default("Spark",
"Master webui port",
cluster)),
],
'slave': [
int(c_helper.get_config_value("Spark", "Worker webui port",
cluster))
int(utils.get_config_value_or_default("Spark",
"Worker webui port",
cluster))
]
}

View File

@ -38,7 +38,7 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst):
slaves_content = "\n"
cluster = master.cluster
sp_home = c_helper.get_config_value("Spark", "Spark home", cluster)
sp_home = utils.get_config_value_or_default("Spark", "Spark home", cluster)
r_master = remote.get_remote(master)
run.stop_spark(r_master, sp_home)

View File

@ -19,6 +19,7 @@ from oslo_utils import netutils
from six.moves.urllib import parse as urlparse
from sahara.i18n import _
from sahara.plugins import base as plugins_base
from sahara.plugins import exceptions as ex
@ -74,3 +75,24 @@ def instances_with_services(instances, node_processes):
def start_process_event_message(process):
return _("Start the following process(es): {process}").format(
process=process)
def get_config_value_or_default(service, name, cluster):
# Try getting config from the cluster.
for ng in cluster.node_groups:
if (ng.configuration().get(service) and
ng.configuration()[service].get(name)):
return ng.configuration()[service][name]
# Find and return the default
plugin = plugins_base.PLUGINS.get_plugin(cluster.plugin_name)
configs = plugin.get_configs(cluster.hadoop_version)
for config in configs:
if config.applicable_target == service and config.name == name:
return config.default_value
raise RuntimeError(_("Unable to get parameter '%(param_name)s' from "
"service %(service)s"),
{'param_name': name, 'service': service})

View File

@ -23,7 +23,6 @@ from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _
from sahara.plugins.spark import config_helper as c_helper
from sahara.plugins import utils as plugin_utils
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
@ -243,11 +242,13 @@ class SparkJobEngine(base_engine.JobEngine):
# Launch the spark job using spark-submit and deploy_mode = client
host = master.hostname()
port = c_helper.get_config_value("Spark", "Master port", self.cluster)
port = plugin_utils.get_config_value_or_default("Spark",
"Master port",
self.cluster)
spark_submit = os.path.join(
c_helper.get_config_value("Spark",
"Spark home",
self.cluster),
plugin_utils.get_config_value_or_default("Spark",
"Spark home",
self.cluster),
"bin/spark-submit")
# TODO(tmckay): we need to clean up wf_dirs on long running clusters
@ -332,9 +333,8 @@ class SparkJobEngine(base_engine.JobEngine):
return [edp.JOB_TYPE_SPARK]
def get_driver_classpath(self):
cp = c_helper.get_config_value("Spark",
"Executor extra classpath",
self.cluster)
cp = plugin_utils.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
if cp:
cp = " --driver-class-path " + cp
return cp

View File

@ -31,7 +31,7 @@ class ConfigHelperUtilsTest(test_base.SaharaTestCase):
expected = ['/mnt/one/spam', '/mnt/two/spam']
self.assertEqual(expected, paths)
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
def test_cleanup_configs(self, get_config_value):
getter = lambda plugin, key, cluster: plugin_configs[key]
get_config_value.side_effect = getter

View File

@ -366,7 +366,7 @@ class TestSpark(base.SaharaTestCase):
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')