From 400d90da07ecabeffca17bf9ca7bbe85339345cf Mon Sep 17 00:00:00 2001 From: Dmitry Mescheryakov Date: Tue, 15 Apr 2014 18:22:50 +0400 Subject: [PATCH] Avoid deleting transient cluster before job is started In order to run job on a transient cluster client needs to execute two commands: * create cluster with is_transient=true * run job on it in a regular manner We terminate unneeded transient clusters in a periodic job, which terminates cluster if cluster is transient and no job is running on it at a time. We also do not terminate cluster if its lifetime is smaller then config parameter min_transient_cluster_active_time. For some reason the parameter is set to 0 by default, which could cause premature cluster termination if periodic task runs between cluster creation and job execution. Also added unit tests to verify min_transient_cluster_active_time. Change-Id: I5330a969ea7cd81ec2759a7fe32bab4a5de3fb4c --- etc/sahara/sahara.conf.sample | 2 +- sahara/service/periodic.py | 2 +- sahara/tests/unit/service/test_periodic.py | 63 +++++++++++++++++----- 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/etc/sahara/sahara.conf.sample b/etc/sahara/sahara.conf.sample index 7d14ed5b..a14c6b2e 100644 --- a/etc/sahara/sahara.conf.sample +++ b/etc/sahara/sahara.conf.sample @@ -258,7 +258,7 @@ # Minimal "lifetime" in seconds for a transient cluster. # Cluster is guarantied to be "alive" within this time period. # (integer value) -#min_transient_cluster_active_time=0 +#min_transient_cluster_active_time=30 # diff --git a/sahara/service/periodic.py b/sahara/service/periodic.py index ab897853..a60d2449 100644 --- a/sahara/service/periodic.py +++ b/sahara/service/periodic.py @@ -44,7 +44,7 @@ periodic_opts = [ help='Max interval size between periodic tasks execution in ' 'seconds'), cfg.IntOpt('min_transient_cluster_active_time', - default=0, + default=30, help='Minimal "lifetime" in seconds for a transient cluster. ' 'Cluster is guarantied to be "alive" within this time ' 'period.'), diff --git a/sahara/tests/unit/service/test_periodic.py b/sahara/tests/unit/service/test_periodic.py index 78d29eb1..23c640a7 100644 --- a/sahara/tests/unit/service/test_periodic.py +++ b/sahara/tests/unit/service/test_periodic.py @@ -51,23 +51,20 @@ class TestPeriodicBack(base.SaharaWithDbTestCase): get_job_status.assert_has_calls([mock.call(u'2'), mock.call(u'3')]) - @mock.patch('sahara.service.edp.job_manager.get_job_status') + @mock.patch('sahara.openstack.common.timeutils.utcnow') @mock.patch('sahara.service.api.terminate_cluster') - def test_cluster_terminate(self, terminate_cluster, get_job_status): - self.override_config("use_identity_api_v3", True) + def test_cluster_terminate(self, terminate_cluster, utcnow): + + utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 0) + ctx = context.ctx() job = self.api.job_create(ctx, te.SAMPLE_JOB) ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE) - c = tc.SAMPLE_CLUSTER.copy() - c["status"] = "Active" - c["id"] = "1" - c["name"] = "1" - c['updated_at'] = timeutils.utcnow() - self.api.cluster_create(ctx, c) - c["id"] = "2" - c["name"] = "2" - self.api.cluster_create(ctx, c) - self._create_job_execution({"end_time": datetime.datetime.now(), + + self._make_cluster('1') + self._make_cluster('2') + + self._create_job_execution({"end_time": timeutils.utcnow(), "id": 1, "cluster_id": "1"}, job, ds, ds) @@ -79,10 +76,50 @@ class TestPeriodicBack(base.SaharaWithDbTestCase): "id": 3, "cluster_id": "2"}, job, ds, ds) + + utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 1) + p.SaharaPeriodicTasks().terminate_unneeded_clusters(None) self.assertEqual(terminate_cluster.call_count, 1) terminate_cluster.assert_has_calls([mock.call(u'1')]) + @mock.patch('sahara.openstack.common.timeutils.utcnow') + @mock.patch('sahara.service.api.terminate_cluster') + def test_cluster_not_killed_too_early(self, terminate_cluster, utcnow): + + utcnow.return_value = datetime.datetime(2005, 2, 1, second=0) + + self._make_cluster('1') + + utcnow.return_value = datetime.datetime(2005, 2, 1, second=20) + + p.SaharaPeriodicTasks().terminate_unneeded_clusters(None) + self.assertEqual(terminate_cluster.call_count, 0) + + @mock.patch('sahara.openstack.common.timeutils.utcnow') + @mock.patch('sahara.service.api.terminate_cluster') + def test_cluster_killed_in_time(self, terminate_cluster, utcnow): + + utcnow.return_value = datetime.datetime(2005, 2, 1, second=0) + + self._make_cluster('1') + + utcnow.return_value = datetime.datetime(2005, 2, 1, second=40) + + p.SaharaPeriodicTasks().terminate_unneeded_clusters(None) + self.assertEqual(terminate_cluster.call_count, 1) + terminate_cluster.assert_has_calls([mock.call(u'1')]) + + def _make_cluster(self, id_name): + ctx = context.ctx() + + c = tc.SAMPLE_CLUSTER.copy() + c["status"] = "Active" + c["id"] = id_name + c["name"] = id_name + c['updated_at'] = timeutils.utcnow() + self.api.cluster_create(ctx, c) + def _create_job_execution(self, values, job, input, output): values.update({"job_id": job['id'], "input_id": input['id'],