Spark Temporary Job Data Retention and Cleanup

Introduces a periodic task for the cleanup of data from Spark jobs, in order
to ease maintenance of long-lived clusters.

Change-Id: Ia7dc2dde54ab62199a630c3d1b64c76f08698181
Implements: blueprint spark-cleanup
This commit is contained in:
Ethan Gafford 2015-01-26 13:25:04 -05:00
parent d4fd4ca997
commit 7cf61bd313
6 changed files with 180 additions and 2 deletions

View File

@ -22,6 +22,7 @@ from sahara.openstack.common import log as logging
from sahara.plugins import provisioning as p
from sahara.plugins import utils
from sahara.topology import topology_helper as topology
from sahara.utils import files as f
from sahara.utils import types as types
from sahara.utils import xmlutils as x
@ -98,7 +99,28 @@ SPARK_CONFS = {
' (default: /opt/spark)',
'default': '/opt/spark',
'priority': 2,
}
},
{
'name': 'Minimum cleanup seconds',
'description': 'Job data will never be purged before this'
' amount of time elapses (default: 86400 = 1 day)',
'default': '86400',
'priority': 2,
},
{
'name': 'Maximum cleanup seconds',
'description': 'Job data will always be purged after this'
' amount of time elapses (default: 1209600 = 14 days)',
'default': '1209600',
'priority': 2,
},
{
'name': 'Minimum cleanup megabytes',
'description': 'No job data will be purged unless the total'
' job data exceeds this size (default: 4096 = 4GB)',
'default': '4096',
'priority': 2,
},
]
}
}
@ -375,6 +397,27 @@ def generate_hadoop_setup_script(storage_paths, env_configs):
return "\n".join(script_lines)
def generate_job_cleanup_config(cluster):
args = {
'minimum_cleanup_megabytes': get_config_value(
"Spark", "Minimum cleanup megabytes", cluster),
'minimum_cleanup_seconds': get_config_value(
"Spark", "Minimum cleanup seconds", cluster),
'maximum_cleanup_seconds': get_config_value(
"Spark", "Maximum cleanup seconds", cluster)
}
job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and
(args['minimum_cleanup_megabytes'] > 0
and args['minimum_cleanup_seconds'] > 0))}
if job_conf['valid']:
job_conf['cron'] = f.get_file_text(
'plugins/spark/resources/spark-cleanup.cron'),
job_cleanup_script = f.get_file_text(
'plugins/spark/resources/tmp-cleanup.sh.template')
job_conf['script'] = job_cleanup_script.format(**args)
return job_conf
def extract_name_values(configs):
return dict((cfg['name'], cfg['value']) for cfg in configs)

View File

@ -150,6 +150,7 @@ class SparkProvider(p.ProvisioningPluginBase):
else:
config_slaves = "\n"
extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster)
for ng in cluster.node_groups:
extra[ng.id] = {
'xml': c_helper.generate_xml_configs(
@ -273,6 +274,7 @@ class SparkProvider(p.ProvisioningPluginBase):
self._write_topology_data(r, cluster, extra)
self._push_master_configs(r, cluster, extra, instance)
self._push_cleanup_job(r, cluster, extra, instance)
def _push_configs_to_existing_node(self, cluster, extra, instance):
node_processes = instance.node_group.node_processes
@ -291,6 +293,7 @@ class SparkProvider(p.ProvisioningPluginBase):
}
r = remote.get_remote(instance)
r.write_files_to(files)
self._push_cleanup_job(r, cluster, extra, instance)
if need_update_hadoop:
with remote.get_remote(instance) as r:
self._write_topology_data(r, cluster, extra)
@ -303,10 +306,22 @@ class SparkProvider(p.ProvisioningPluginBase):
def _push_master_configs(self, r, cluster, extra, instance):
node_processes = instance.node_group.node_processes
if 'namenode' in node_processes:
self._push_namenode_configs(cluster, r)
def _push_cleanup_job(self, r, cluster, extra, instance):
node_processes = instance.node_group.node_processes
if 'master' in node_processes:
if extra['job_cleanup']['valid']:
r.write_file_to('/etc/hadoop/tmp-cleanup.sh',
extra['job_cleanup']['script'])
r.execute_command("chmod 755 /etc/hadoop/tmp-cleanup.sh")
cmd = 'sudo sh -c \'echo "%s" > /etc/cron.d/spark-cleanup\''
r.execute_command(cmd % extra['job_cleanup']['cron'])
else:
r.execute_command("sudo rm -f /etc/hadoop/tmp-cleanup.sh")
r.execute_command("sudo rm -f /etc/crond.d/spark-cleanup")
def _push_namenode_configs(self, cluster, r):
r.write_file_to('/etc/hadoop/dn.incl',
utils.generate_fqdn_host_names(

View File

@ -0,0 +1,2 @@
# Cleans up old Spark job directories once per hour.
0 * * * * root /etc/hadoop/tmp-cleanup.sh

View File

@ -0,0 +1,48 @@
#!/bin/sh
MINIMUM_CLEANUP_MEGABYTES={minimum_cleanup_megabytes}
MINIMUM_CLEANUP_SECONDS={minimum_cleanup_seconds}
MAXIMUM_CLEANUP_SECONDS={maximum_cleanup_seconds}
CURRENT_TIMESTAMP=`date +%s`
POSSIBLE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MINIMUM_CLEANUP_SECONDS))
DEFINITE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MAXIMUM_CLEANUP_SECONDS))
unset MAY_DELETE
unset WILL_DELETE
if [ ! -d /tmp/spark-edp ]
then
exit 0
fi
cd /tmp/spark-edp
for JOB in $(find . -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
do
for EXECUTION in $(find $JOB -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
do
TIMESTAMP=`stat $JOB/$EXECUTION --printf '%Y'`
if [[ $TIMESTAMP -lt $DEFINITE_CLEANUP_THRESHOLD ]]
then
WILL_DELETE="$WILL_DELETE $JOB/$EXECUTION"
else
if [[ $TIMESTAMP -lt $POSSIBLE_CLEANUP_THRESHOLD ]]
then
MAY_DELETE="$MAY_DELETE $JOB/$EXECUTION"
fi
fi
done
done
for EXECUTION in $WILL_DELETE
do
rm -Rf $EXECUTION
done
for EXECUTION in $(ls $MAY_DELETE -trd)
do
if [[ `du -s -BM | grep -o '[0-9]\+'` -le $MINIMUM_CLEANUP_MEGABYTES ]]; then
break
fi
rm -Rf $EXECUTION
done

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.spark import config_helper as c_helper
from sahara.tests.unit import base as test_base
@ -23,3 +25,34 @@ class ConfigHelperUtilsTest(test_base.SaharaTestCase):
paths = c_helper.make_hadoop_path(storage_paths, '/spam')
expected = ['/mnt/one/spam', '/mnt/two/spam']
self.assertEqual(expected, paths)
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
def test_cleanup_configs(self, get_config_value):
getter = lambda plugin, key, cluster: plugin_configs[key]
get_config_value.side_effect = getter
plugin_configs = {"Minimum cleanup megabytes": 4096,
"Minimum cleanup seconds": 86400,
"Maximum cleanup seconds": 1209600}
configs = c_helper.generate_job_cleanup_config(None)
self.assertTrue(configs['valid'])
expected = ["MINIMUM_CLEANUP_MEGABYTES=4096",
"MINIMUM_CLEANUP_SECONDS=86400",
"MAXIMUM_CLEANUP_SECONDS=1209600"]
for config_value in expected:
self.assertIn(config_value, configs['script'])
self.assertIn("0 * * * * root /etc/hadoop/tmp-cleanup.sh",
configs['cron'][0])
plugin_configs['Maximum cleanup seconds'] = 0
configs = c_helper.generate_job_cleanup_config(None)
self.assertFalse(configs['valid'])
self.assertNotIn(configs, 'script')
self.assertNotIn(configs, 'cron')
plugin_configs = {"Minimum cleanup megabytes": 0,
"Minimum cleanup seconds": 0,
"Maximum cleanup seconds": 1209600}
configs = c_helper.generate_job_cleanup_config(None)
self.assertFalse(configs['valid'])
self.assertNotIn(configs, 'script')
self.assertNotIn(configs, 'cron')

View File

@ -65,3 +65,40 @@ class SparkPluginTest(base.SaharaWithDbTestCase):
self.assertIsInstance(
plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK),
engine.SparkJobEngine)
def test_cleanup_configs(self):
remote = mock.Mock()
instance = mock.Mock()
extra_conf = {'job_cleanup': {
'valid': True,
'script': 'script_text',
'cron': 'cron_text'}}
instance.node_group.node_processes = ["master"]
instance.node_group.id = id
cluster_dict = {
'name': 'cluster',
'plugin_name': 'spark',
'hadoop_version': '1.0.0',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
plugin._push_cleanup_job(remote, cluster, extra_conf, instance)
remote.write_file_to.assert_called_with(
'/etc/hadoop/tmp-cleanup.sh',
'script_text')
remote.execute_command.assert_called_with(
'sudo sh -c \'echo "cron_text" > /etc/cron.d/spark-cleanup\'')
remote.reset_mock()
instance.node_group.node_processes = ["worker"]
plugin._push_cleanup_job(remote, cluster, extra_conf, instance)
self.assertFalse(remote.called)
remote.reset_mock()
instance.node_group.node_processes = ["master"]
extra_conf['job_cleanup']['valid'] = False
plugin._push_cleanup_job(remote, cluster, extra_conf, instance)
remote.execute_command.assert_called_with(
'sudo rm -f /etc/crond.d/spark-cleanup')