Avoid deleting transient cluster before job is started
In order to run job on a transient cluster client needs to execute two commands: * create cluster with is_transient=true * run job on it in a regular manner We terminate unneeded transient clusters in a periodic job, which terminates cluster if cluster is transient and no job is running on it at a time. We also do not terminate cluster if its lifetime is smaller then config parameter min_transient_cluster_active_time. For some reason the parameter is set to 0 by default, which could cause premature cluster termination if periodic task runs between cluster creation and job execution. Also added unit tests to verify min_transient_cluster_active_time. Change-Id: I5330a969ea7cd81ec2759a7fe32bab4a5de3fb4c
This commit is contained in:
parent
52110abbe3
commit
400d90da07
@ -258,7 +258,7 @@
|
|||||||
# Minimal "lifetime" in seconds for a transient cluster.
|
# Minimal "lifetime" in seconds for a transient cluster.
|
||||||
# Cluster is guarantied to be "alive" within this time period.
|
# Cluster is guarantied to be "alive" within this time period.
|
||||||
# (integer value)
|
# (integer value)
|
||||||
#min_transient_cluster_active_time=0
|
#min_transient_cluster_active_time=30
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -44,7 +44,7 @@ periodic_opts = [
|
|||||||
help='Max interval size between periodic tasks execution in '
|
help='Max interval size between periodic tasks execution in '
|
||||||
'seconds'),
|
'seconds'),
|
||||||
cfg.IntOpt('min_transient_cluster_active_time',
|
cfg.IntOpt('min_transient_cluster_active_time',
|
||||||
default=0,
|
default=30,
|
||||||
help='Minimal "lifetime" in seconds for a transient cluster. '
|
help='Minimal "lifetime" in seconds for a transient cluster. '
|
||||||
'Cluster is guarantied to be "alive" within this time '
|
'Cluster is guarantied to be "alive" within this time '
|
||||||
'period.'),
|
'period.'),
|
||||||
|
@ -51,23 +51,20 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
|
|||||||
get_job_status.assert_has_calls([mock.call(u'2'),
|
get_job_status.assert_has_calls([mock.call(u'2'),
|
||||||
mock.call(u'3')])
|
mock.call(u'3')])
|
||||||
|
|
||||||
@mock.patch('sahara.service.edp.job_manager.get_job_status')
|
@mock.patch('sahara.openstack.common.timeutils.utcnow')
|
||||||
@mock.patch('sahara.service.api.terminate_cluster')
|
@mock.patch('sahara.service.api.terminate_cluster')
|
||||||
def test_cluster_terminate(self, terminate_cluster, get_job_status):
|
def test_cluster_terminate(self, terminate_cluster, utcnow):
|
||||||
self.override_config("use_identity_api_v3", True)
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 0)
|
||||||
|
|
||||||
ctx = context.ctx()
|
ctx = context.ctx()
|
||||||
job = self.api.job_create(ctx, te.SAMPLE_JOB)
|
job = self.api.job_create(ctx, te.SAMPLE_JOB)
|
||||||
ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE)
|
ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE)
|
||||||
c = tc.SAMPLE_CLUSTER.copy()
|
|
||||||
c["status"] = "Active"
|
self._make_cluster('1')
|
||||||
c["id"] = "1"
|
self._make_cluster('2')
|
||||||
c["name"] = "1"
|
|
||||||
c['updated_at'] = timeutils.utcnow()
|
self._create_job_execution({"end_time": timeutils.utcnow(),
|
||||||
self.api.cluster_create(ctx, c)
|
|
||||||
c["id"] = "2"
|
|
||||||
c["name"] = "2"
|
|
||||||
self.api.cluster_create(ctx, c)
|
|
||||||
self._create_job_execution({"end_time": datetime.datetime.now(),
|
|
||||||
"id": 1,
|
"id": 1,
|
||||||
"cluster_id": "1"},
|
"cluster_id": "1"},
|
||||||
job, ds, ds)
|
job, ds, ds)
|
||||||
@ -79,10 +76,50 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
|
|||||||
"id": 3,
|
"id": 3,
|
||||||
"cluster_id": "2"},
|
"cluster_id": "2"},
|
||||||
job, ds, ds)
|
job, ds, ds)
|
||||||
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 1)
|
||||||
|
|
||||||
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
|
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
|
||||||
self.assertEqual(terminate_cluster.call_count, 1)
|
self.assertEqual(terminate_cluster.call_count, 1)
|
||||||
terminate_cluster.assert_has_calls([mock.call(u'1')])
|
terminate_cluster.assert_has_calls([mock.call(u'1')])
|
||||||
|
|
||||||
|
@mock.patch('sahara.openstack.common.timeutils.utcnow')
|
||||||
|
@mock.patch('sahara.service.api.terminate_cluster')
|
||||||
|
def test_cluster_not_killed_too_early(self, terminate_cluster, utcnow):
|
||||||
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
|
||||||
|
|
||||||
|
self._make_cluster('1')
|
||||||
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, second=20)
|
||||||
|
|
||||||
|
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
|
||||||
|
self.assertEqual(terminate_cluster.call_count, 0)
|
||||||
|
|
||||||
|
@mock.patch('sahara.openstack.common.timeutils.utcnow')
|
||||||
|
@mock.patch('sahara.service.api.terminate_cluster')
|
||||||
|
def test_cluster_killed_in_time(self, terminate_cluster, utcnow):
|
||||||
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
|
||||||
|
|
||||||
|
self._make_cluster('1')
|
||||||
|
|
||||||
|
utcnow.return_value = datetime.datetime(2005, 2, 1, second=40)
|
||||||
|
|
||||||
|
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
|
||||||
|
self.assertEqual(terminate_cluster.call_count, 1)
|
||||||
|
terminate_cluster.assert_has_calls([mock.call(u'1')])
|
||||||
|
|
||||||
|
def _make_cluster(self, id_name):
|
||||||
|
ctx = context.ctx()
|
||||||
|
|
||||||
|
c = tc.SAMPLE_CLUSTER.copy()
|
||||||
|
c["status"] = "Active"
|
||||||
|
c["id"] = id_name
|
||||||
|
c["name"] = id_name
|
||||||
|
c['updated_at'] = timeutils.utcnow()
|
||||||
|
self.api.cluster_create(ctx, c)
|
||||||
|
|
||||||
def _create_job_execution(self, values, job, input, output):
|
def _create_job_execution(self, values, job, input, output):
|
||||||
values.update({"job_id": job['id'],
|
values.update({"job_id": job['id'],
|
||||||
"input_id": input['id'],
|
"input_id": input['id'],
|
||||||
|
Loading…
Reference in New Issue
Block a user