From f7d1ec55a868a722f9f231f7fdbbb007b2552e57 Mon Sep 17 00:00:00 2001 From: Nikita Konovalov Date: Wed, 10 Jun 2015 17:01:04 +0300 Subject: [PATCH] Removed dependency on Spark plugin in edp code The EDP Spark engine was importing a config helper from the Spark plugin. The helper was moved to common plugin utils and now is imported from there by both the plugin and the engine. This is the part of sahara and plugins split. Partially-implements bp: move-plugins-to-separate-repo Change-Id: Ie84cc163a09bf1e7b58fcdb08e0647a85492593b --- sahara/plugins/spark/config_helper.py | 57 +++++++++---------- sahara/plugins/spark/plugin.py | 32 ++++++----- sahara/plugins/spark/scaling.py | 2 +- sahara/plugins/utils.py | 22 +++++++ sahara/service/edp/spark/engine.py | 16 +++--- .../unit/plugins/spark/test_config_helper.py | 2 +- .../unit/service/edp/spark/test_spark.py | 2 +- 7 files changed, 80 insertions(+), 53 deletions(-) diff --git a/sahara/plugins/spark/config_helper.py b/sahara/plugins/spark/config_helper.py index 6e3abb7a..6c783bb3 100644 --- a/sahara/plugins/spark/config_helper.py +++ b/sahara/plugins/spark/config_helper.py @@ -18,7 +18,6 @@ from oslo_log import log as logging import six from sahara import conductor as c -from sahara.i18n import _ from sahara.plugins import provisioning as p from sahara.plugins import utils from sahara.swift import swift_helper as swift @@ -237,22 +236,6 @@ def get_plugin_configs(): return PLUGIN_CONFIGS -def get_config_value(service, name, cluster=None): - if cluster: - for ng in cluster.node_groups: - if (ng.configuration().get(service) and - ng.configuration()[service].get(name)): - return ng.configuration()[service][name] - - for configs in PLUGIN_CONFIGS: - if configs.applicable_target == service and configs.name == name: - return configs.default_value - - raise RuntimeError(_("Unable to get parameter '%(param_name)s' from " - "service %(service)s"), - {'param_name': name, 'service': service}) - - def generate_cfg_from_general(cfg, configs, general_config, rest_excluded=False): if 'general' in configs: @@ -336,35 +319,49 @@ def generate_spark_env_configs(cluster): # to /opt/spark/conf configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR) - masterport = get_config_value("Spark", "Master port", cluster) + masterport = utils.get_config_value_or_default("Spark", + "Master port", + cluster) if masterport and masterport != _get_spark_opt_default("Master port"): configs.append('SPARK_MASTER_PORT=' + str(masterport)) - masterwebport = get_config_value("Spark", "Master webui port", cluster) + masterwebport = utils.get_config_value_or_default("Spark", + "Master webui port", + cluster) if (masterwebport and masterwebport != _get_spark_opt_default("Master webui port")): configs.append('SPARK_MASTER_WEBUI_PORT=' + str(masterwebport)) # configuration for workers - workercores = get_config_value("Spark", "Worker cores", cluster) + workercores = utils.get_config_value_or_default("Spark", + "Worker cores", + cluster) if workercores and workercores != _get_spark_opt_default("Worker cores"): configs.append('SPARK_WORKER_CORES=' + str(workercores)) - workermemory = get_config_value("Spark", "Worker memory", cluster) + workermemory = utils.get_config_value_or_default("Spark", + "Worker memory", + cluster) if (workermemory and workermemory != _get_spark_opt_default("Worker memory")): configs.append('SPARK_WORKER_MEMORY=' + str(workermemory)) - workerport = get_config_value("Spark", "Worker port", cluster) + workerport = utils.get_config_value_or_default("Spark", + "Worker port", + cluster) if workerport and workerport != _get_spark_opt_default("Worker port"): configs.append('SPARK_WORKER_PORT=' + str(workerport)) - workerwebport = get_config_value("Spark", "Worker webui port", cluster) + workerwebport = utils.get_config_value_or_default("Spark", + "Worker webui port", + cluster) if (workerwebport and workerwebport != _get_spark_opt_default("Worker webui port")): configs.append('SPARK_WORKER_WEBUI_PORT=' + str(workerwebport)) - workerinstances = get_config_value("Spark", "Worker instances", cluster) + workerinstances = utils.get_config_value_or_default("Spark", + "Worker instances", + cluster) if (workerinstances and workerinstances != _get_spark_opt_default("Worker instances")): configs.append('SPARK_WORKER_INSTANCES=' + str(workerinstances)) @@ -377,7 +374,9 @@ def generate_spark_slaves_configs(workernames): def generate_spark_executor_classpath(cluster): - cp = get_config_value("Spark", "Executor extra classpath", cluster) + cp = utils.get_config_value_or_default("Spark", + "Executor extra classpath", + cluster) if cp: return "spark.executor.extraClassPath " + cp return "\n" @@ -444,11 +443,11 @@ def generate_hadoop_setup_script(storage_paths, env_configs): def generate_job_cleanup_config(cluster): args = { - 'minimum_cleanup_megabytes': get_config_value( + 'minimum_cleanup_megabytes': utils.get_config_value_or_default( "Spark", "Minimum cleanup megabytes", cluster), - 'minimum_cleanup_seconds': get_config_value( + 'minimum_cleanup_seconds': utils.get_config_value_or_default( "Spark", "Minimum cleanup seconds", cluster), - 'maximum_cleanup_seconds': get_config_value( + 'maximum_cleanup_seconds': utils.get_config_value_or_default( "Spark", "Maximum cleanup seconds", cluster) } job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and @@ -510,5 +509,5 @@ def get_decommissioning_timeout(cluster): def get_port_from_config(service, name, cluster=None): - address = get_config_value(service, name, cluster) + address = utils.get_config_value_or_default(service, name, cluster) return utils.get_port_from_address(address) diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index a9ae13a6..15d5247d 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -76,8 +76,9 @@ class SparkProvider(p.ProvisioningPluginBase): raise ex.InvalidComponentCountException("datanode", _("1 or more"), nn_count) - rep_factor = c_helper.get_config_value('HDFS', "dfs.replication", - cluster) + rep_factor = utils.get_config_value_or_default('HDFS', + "dfs.replication", + cluster) if dn_count < rep_factor: raise ex.InvalidComponentCountException( 'datanode', _('%s or more') % rep_factor, dn_count, @@ -149,7 +150,9 @@ class SparkProvider(p.ProvisioningPluginBase): self._set_cluster_info(cluster) def _spark_home(self, cluster): - return c_helper.get_config_value("Spark", "Spark home", cluster) + return utils.get_config_value_or_default("Spark", + "Spark home", + cluster) def _extract_configs_to_extra(self, cluster): nn = utils.get_instance(cluster, "namenode") @@ -378,7 +381,7 @@ class SparkProvider(p.ProvisioningPluginBase): info = {} if nn: - address = c_helper.get_config_value( + address = utils.get_config_value_or_default( 'HDFS', 'dfs.http.address', cluster) port = address[address.rfind(':') + 1:] info['HDFS'] = { @@ -387,7 +390,7 @@ class SparkProvider(p.ProvisioningPluginBase): info['HDFS']['NameNode'] = 'hdfs://%s:8020' % nn.hostname() if sp_master: - port = c_helper.get_config_value( + port = utils.get_config_value_or_default( 'Spark', 'Master webui port', cluster) if port is not None: info['Spark'] = { @@ -469,8 +472,9 @@ class SparkProvider(p.ProvisioningPluginBase): ' '.join(ng.node_processes)) dn_amount = len(utils.get_instances(cluster, "datanode")) - rep_factor = c_helper.get_config_value('HDFS', "dfs.replication", - cluster) + rep_factor = utils.get_config_value_or_default('HDFS', + "dfs.replication", + cluster) if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor: raise ex.ClusterCannotBeScaled( @@ -504,14 +508,16 @@ class SparkProvider(p.ProvisioningPluginBase): 'namenode': [8020, 50070, 50470], 'datanode': [50010, 1004, 50075, 1006, 50020], 'master': [ - int(c_helper.get_config_value("Spark", "Master port", - cluster)), - int(c_helper.get_config_value("Spark", "Master webui port", - cluster)), + int(utils.get_config_value_or_default("Spark", "Master port", + cluster)), + int(utils.get_config_value_or_default("Spark", + "Master webui port", + cluster)), ], 'slave': [ - int(c_helper.get_config_value("Spark", "Worker webui port", - cluster)) + int(utils.get_config_value_or_default("Spark", + "Worker webui port", + cluster)) ] } diff --git a/sahara/plugins/spark/scaling.py b/sahara/plugins/spark/scaling.py index 21742e7e..5389523c 100644 --- a/sahara/plugins/spark/scaling.py +++ b/sahara/plugins/spark/scaling.py @@ -38,7 +38,7 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst): slaves_content = "\n" cluster = master.cluster - sp_home = c_helper.get_config_value("Spark", "Spark home", cluster) + sp_home = utils.get_config_value_or_default("Spark", "Spark home", cluster) r_master = remote.get_remote(master) run.stop_spark(r_master, sp_home) diff --git a/sahara/plugins/utils.py b/sahara/plugins/utils.py index 4f2a9e83..d2cf7f4b 100644 --- a/sahara/plugins/utils.py +++ b/sahara/plugins/utils.py @@ -19,6 +19,7 @@ from oslo_utils import netutils from six.moves.urllib import parse as urlparse from sahara.i18n import _ +from sahara.plugins import base as plugins_base from sahara.plugins import exceptions as ex @@ -74,3 +75,24 @@ def instances_with_services(instances, node_processes): def start_process_event_message(process): return _("Start the following process(es): {process}").format( process=process) + + +def get_config_value_or_default(service, name, cluster): + # Try getting config from the cluster. + for ng in cluster.node_groups: + if (ng.configuration().get(service) and + ng.configuration()[service].get(name)): + return ng.configuration()[service][name] + + # Find and return the default + + plugin = plugins_base.PLUGINS.get_plugin(cluster.plugin_name) + configs = plugin.get_configs(cluster.hadoop_version) + + for config in configs: + if config.applicable_target == service and config.name == name: + return config.default_value + + raise RuntimeError(_("Unable to get parameter '%(param_name)s' from " + "service %(service)s"), + {'param_name': name, 'service': service}) diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 3efa7195..bbda8101 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -23,7 +23,6 @@ from sahara import conductor as c from sahara import context from sahara import exceptions as e from sahara.i18n import _ -from sahara.plugins.spark import config_helper as c_helper from sahara.plugins import utils as plugin_utils from sahara.service.edp import base_engine from sahara.service.edp.binary_retrievers import dispatch @@ -243,11 +242,13 @@ class SparkJobEngine(base_engine.JobEngine): # Launch the spark job using spark-submit and deploy_mode = client host = master.hostname() - port = c_helper.get_config_value("Spark", "Master port", self.cluster) + port = plugin_utils.get_config_value_or_default("Spark", + "Master port", + self.cluster) spark_submit = os.path.join( - c_helper.get_config_value("Spark", - "Spark home", - self.cluster), + plugin_utils.get_config_value_or_default("Spark", + "Spark home", + self.cluster), "bin/spark-submit") # TODO(tmckay): we need to clean up wf_dirs on long running clusters @@ -332,9 +333,8 @@ class SparkJobEngine(base_engine.JobEngine): return [edp.JOB_TYPE_SPARK] def get_driver_classpath(self): - cp = c_helper.get_config_value("Spark", - "Executor extra classpath", - self.cluster) + cp = plugin_utils.get_config_value_or_default( + "Spark", "Executor extra classpath", self.cluster) if cp: cp = " --driver-class-path " + cp return cp diff --git a/sahara/tests/unit/plugins/spark/test_config_helper.py b/sahara/tests/unit/plugins/spark/test_config_helper.py index 9743207a..793056a1 100644 --- a/sahara/tests/unit/plugins/spark/test_config_helper.py +++ b/sahara/tests/unit/plugins/spark/test_config_helper.py @@ -31,7 +31,7 @@ class ConfigHelperUtilsTest(test_base.SaharaTestCase): expected = ['/mnt/one/spam', '/mnt/two/spam'] self.assertEqual(expected, paths) - @mock.patch('sahara.plugins.spark.config_helper.get_config_value') + @mock.patch('sahara.plugins.utils.get_config_value_or_default') def test_cleanup_configs(self, get_config_value): getter = lambda plugin, key, cluster: plugin_configs[key] get_config_value.side_effect = getter diff --git a/sahara/tests/unit/service/edp/spark/test_spark.py b/sahara/tests/unit/service/edp/spark/test_spark.py index cb9ead4b..7d672865 100644 --- a/sahara/tests/unit/service/edp/spark/test_spark.py +++ b/sahara/tests/unit/service/edp/spark/test_spark.py @@ -366,7 +366,7 @@ class TestSpark(base.SaharaTestCase): @mock.patch('sahara.conductor.API.job_execution_update') @mock.patch('sahara.conductor.API.job_execution_get') @mock.patch('sahara.utils.remote.get_remote') - @mock.patch('sahara.plugins.spark.config_helper.get_config_value') + @mock.patch('sahara.plugins.utils.get_config_value_or_default') @mock.patch('sahara.service.edp.job_utils.create_workflow_dir') @mock.patch('sahara.plugins.utils.get_instance') @mock.patch('sahara.conductor.API.job_get')