diff --git a/sahara/plugins/spark/config_helper.py b/sahara/plugins/spark/config_helper.py index c9924e1c..b68aa161 100644 --- a/sahara/plugins/spark/config_helper.py +++ b/sahara/plugins/spark/config_helper.py @@ -22,6 +22,7 @@ from sahara.openstack.common import log as logging from sahara.plugins import provisioning as p from sahara.plugins import utils from sahara.topology import topology_helper as topology +from sahara.utils import files as f from sahara.utils import types as types from sahara.utils import xmlutils as x @@ -98,7 +99,28 @@ SPARK_CONFS = { ' (default: /opt/spark)', 'default': '/opt/spark', 'priority': 2, - } + }, + { + 'name': 'Minimum cleanup seconds', + 'description': 'Job data will never be purged before this' + ' amount of time elapses (default: 86400 = 1 day)', + 'default': '86400', + 'priority': 2, + }, + { + 'name': 'Maximum cleanup seconds', + 'description': 'Job data will always be purged after this' + ' amount of time elapses (default: 1209600 = 14 days)', + 'default': '1209600', + 'priority': 2, + }, + { + 'name': 'Minimum cleanup megabytes', + 'description': 'No job data will be purged unless the total' + ' job data exceeds this size (default: 4096 = 4GB)', + 'default': '4096', + 'priority': 2, + }, ] } } @@ -375,6 +397,27 @@ def generate_hadoop_setup_script(storage_paths, env_configs): return "\n".join(script_lines) +def generate_job_cleanup_config(cluster): + args = { + 'minimum_cleanup_megabytes': get_config_value( + "Spark", "Minimum cleanup megabytes", cluster), + 'minimum_cleanup_seconds': get_config_value( + "Spark", "Minimum cleanup seconds", cluster), + 'maximum_cleanup_seconds': get_config_value( + "Spark", "Maximum cleanup seconds", cluster) + } + job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and + (args['minimum_cleanup_megabytes'] > 0 + and args['minimum_cleanup_seconds'] > 0))} + if job_conf['valid']: + job_conf['cron'] = f.get_file_text( + 'plugins/spark/resources/spark-cleanup.cron'), + job_cleanup_script = f.get_file_text( + 'plugins/spark/resources/tmp-cleanup.sh.template') + job_conf['script'] = job_cleanup_script.format(**args) + return job_conf + + def extract_name_values(configs): return dict((cfg['name'], cfg['value']) for cfg in configs) diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py index 22228cc6..23b23008 100644 --- a/sahara/plugins/spark/plugin.py +++ b/sahara/plugins/spark/plugin.py @@ -150,6 +150,7 @@ class SparkProvider(p.ProvisioningPluginBase): else: config_slaves = "\n" + extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) for ng in cluster.node_groups: extra[ng.id] = { 'xml': c_helper.generate_xml_configs( @@ -273,6 +274,7 @@ class SparkProvider(p.ProvisioningPluginBase): self._write_topology_data(r, cluster, extra) self._push_master_configs(r, cluster, extra, instance) + self._push_cleanup_job(r, cluster, extra, instance) def _push_configs_to_existing_node(self, cluster, extra, instance): node_processes = instance.node_group.node_processes @@ -291,6 +293,7 @@ class SparkProvider(p.ProvisioningPluginBase): } r = remote.get_remote(instance) r.write_files_to(files) + self._push_cleanup_job(r, cluster, extra, instance) if need_update_hadoop: with remote.get_remote(instance) as r: self._write_topology_data(r, cluster, extra) @@ -303,10 +306,22 @@ class SparkProvider(p.ProvisioningPluginBase): def _push_master_configs(self, r, cluster, extra, instance): node_processes = instance.node_group.node_processes - if 'namenode' in node_processes: self._push_namenode_configs(cluster, r) + def _push_cleanup_job(self, r, cluster, extra, instance): + node_processes = instance.node_group.node_processes + if 'master' in node_processes: + if extra['job_cleanup']['valid']: + r.write_file_to('/etc/hadoop/tmp-cleanup.sh', + extra['job_cleanup']['script']) + r.execute_command("chmod 755 /etc/hadoop/tmp-cleanup.sh") + cmd = 'sudo sh -c \'echo "%s" > /etc/cron.d/spark-cleanup\'' + r.execute_command(cmd % extra['job_cleanup']['cron']) + else: + r.execute_command("sudo rm -f /etc/hadoop/tmp-cleanup.sh") + r.execute_command("sudo rm -f /etc/crond.d/spark-cleanup") + def _push_namenode_configs(self, cluster, r): r.write_file_to('/etc/hadoop/dn.incl', utils.generate_fqdn_host_names( diff --git a/sahara/plugins/spark/resources/spark-cleanup.cron b/sahara/plugins/spark/resources/spark-cleanup.cron new file mode 100644 index 00000000..e182a0e2 --- /dev/null +++ b/sahara/plugins/spark/resources/spark-cleanup.cron @@ -0,0 +1,2 @@ +# Cleans up old Spark job directories once per hour. +0 * * * * root /etc/hadoop/tmp-cleanup.sh \ No newline at end of file diff --git a/sahara/plugins/spark/resources/tmp-cleanup.sh.template b/sahara/plugins/spark/resources/tmp-cleanup.sh.template new file mode 100644 index 00000000..e715719b --- /dev/null +++ b/sahara/plugins/spark/resources/tmp-cleanup.sh.template @@ -0,0 +1,48 @@ +#!/bin/sh + +MINIMUM_CLEANUP_MEGABYTES={minimum_cleanup_megabytes} +MINIMUM_CLEANUP_SECONDS={minimum_cleanup_seconds} +MAXIMUM_CLEANUP_SECONDS={maximum_cleanup_seconds} + +CURRENT_TIMESTAMP=`date +%s` +POSSIBLE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MINIMUM_CLEANUP_SECONDS)) +DEFINITE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MAXIMUM_CLEANUP_SECONDS)) + +unset MAY_DELETE +unset WILL_DELETE + +if [ ! -d /tmp/spark-edp ] +then + exit 0 +fi + +cd /tmp/spark-edp +for JOB in $(find . -maxdepth 1 -mindepth 1 -type d -printf '%f\n') +do + for EXECUTION in $(find $JOB -maxdepth 1 -mindepth 1 -type d -printf '%f\n') + do + TIMESTAMP=`stat $JOB/$EXECUTION --printf '%Y'` + if [[ $TIMESTAMP -lt $DEFINITE_CLEANUP_THRESHOLD ]] + then + WILL_DELETE="$WILL_DELETE $JOB/$EXECUTION" + else + if [[ $TIMESTAMP -lt $POSSIBLE_CLEANUP_THRESHOLD ]] + then + MAY_DELETE="$MAY_DELETE $JOB/$EXECUTION" + fi + fi + done +done + +for EXECUTION in $WILL_DELETE +do + rm -Rf $EXECUTION +done + +for EXECUTION in $(ls $MAY_DELETE -trd) +do + if [[ `du -s -BM | grep -o '[0-9]\+'` -le $MINIMUM_CLEANUP_MEGABYTES ]]; then + break + fi + rm -Rf $EXECUTION +done diff --git a/sahara/tests/unit/plugins/spark/test_config_helper.py b/sahara/tests/unit/plugins/spark/test_config_helper.py index 25339815..53909dd9 100644 --- a/sahara/tests/unit/plugins/spark/test_config_helper.py +++ b/sahara/tests/unit/plugins/spark/test_config_helper.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock + from sahara.plugins.spark import config_helper as c_helper from sahara.tests.unit import base as test_base @@ -23,3 +25,34 @@ class ConfigHelperUtilsTest(test_base.SaharaTestCase): paths = c_helper.make_hadoop_path(storage_paths, '/spam') expected = ['/mnt/one/spam', '/mnt/two/spam'] self.assertEqual(expected, paths) + + @mock.patch('sahara.plugins.spark.config_helper.get_config_value') + def test_cleanup_configs(self, get_config_value): + getter = lambda plugin, key, cluster: plugin_configs[key] + get_config_value.side_effect = getter + plugin_configs = {"Minimum cleanup megabytes": 4096, + "Minimum cleanup seconds": 86400, + "Maximum cleanup seconds": 1209600} + configs = c_helper.generate_job_cleanup_config(None) + self.assertTrue(configs['valid']) + expected = ["MINIMUM_CLEANUP_MEGABYTES=4096", + "MINIMUM_CLEANUP_SECONDS=86400", + "MAXIMUM_CLEANUP_SECONDS=1209600"] + for config_value in expected: + self.assertIn(config_value, configs['script']) + self.assertIn("0 * * * * root /etc/hadoop/tmp-cleanup.sh", + configs['cron'][0]) + + plugin_configs['Maximum cleanup seconds'] = 0 + configs = c_helper.generate_job_cleanup_config(None) + self.assertFalse(configs['valid']) + self.assertNotIn(configs, 'script') + self.assertNotIn(configs, 'cron') + + plugin_configs = {"Minimum cleanup megabytes": 0, + "Minimum cleanup seconds": 0, + "Maximum cleanup seconds": 1209600} + configs = c_helper.generate_job_cleanup_config(None) + self.assertFalse(configs['valid']) + self.assertNotIn(configs, 'script') + self.assertNotIn(configs, 'cron') diff --git a/sahara/tests/unit/plugins/spark/test_plugin.py b/sahara/tests/unit/plugins/spark/test_plugin.py index f0880d11..4e129bfa 100644 --- a/sahara/tests/unit/plugins/spark/test_plugin.py +++ b/sahara/tests/unit/plugins/spark/test_plugin.py @@ -65,3 +65,40 @@ class SparkPluginTest(base.SaharaWithDbTestCase): self.assertIsInstance( plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK), engine.SparkJobEngine) + + def test_cleanup_configs(self): + remote = mock.Mock() + instance = mock.Mock() + + extra_conf = {'job_cleanup': { + 'valid': True, + 'script': 'script_text', + 'cron': 'cron_text'}} + instance.node_group.node_processes = ["master"] + instance.node_group.id = id + cluster_dict = { + 'name': 'cluster', + 'plugin_name': 'spark', + 'hadoop_version': '1.0.0', + 'default_image_id': 'image'} + + cluster = conductor.cluster_create(context.ctx(), cluster_dict) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + plugin._push_cleanup_job(remote, cluster, extra_conf, instance) + remote.write_file_to.assert_called_with( + '/etc/hadoop/tmp-cleanup.sh', + 'script_text') + remote.execute_command.assert_called_with( + 'sudo sh -c \'echo "cron_text" > /etc/cron.d/spark-cleanup\'') + + remote.reset_mock() + instance.node_group.node_processes = ["worker"] + plugin._push_cleanup_job(remote, cluster, extra_conf, instance) + self.assertFalse(remote.called) + + remote.reset_mock() + instance.node_group.node_processes = ["master"] + extra_conf['job_cleanup']['valid'] = False + plugin._push_cleanup_job(remote, cluster, extra_conf, instance) + remote.execute_command.assert_called_with( + 'sudo rm -f /etc/crond.d/spark-cleanup')