Merge "Added periodic clean up of old inactive clusters"

This commit is contained in:
Jenkins 2015-02-19 03:49:13 +00:00 committed by Gerrit Code Review
commit 96d992767b
3 changed files with 143 additions and 49 deletions

View File

@ -427,6 +427,14 @@
# guaranteed to be "alive" within this time period. (integer value)
#min_transient_cluster_active_time = 30
# Maximal time (in hours) for clusters allowed to be in states other
# than "Active", "Deleting" or "Error". If a cluster is not in
# "Active", "Deleting" or "Error" state and last update of it was
# longer than "cleanup_time_for_incomplete_clusters" hours ago then it
# will be deleted automatically. (0 value means that automatic clean
# up is disabled). (integer value)
#cleanup_time_for_incomplete_clusters = 0
# Timeout for detaching volumes from instance (in seconds). (integer
# value)
#detach_volume_timeout = 300

View File

@ -22,7 +22,7 @@ import six
from sahara import conductor as c
from sahara import context
from sahara.i18n import _LI
from sahara.i18n import _LW
from sahara.openstack.common import periodic_task
from sahara.openstack.common import threadgroup
from sahara.service.edp import job_manager
@ -52,6 +52,15 @@ periodic_opts = [
help='Minimal "lifetime" in seconds for a transient cluster. '
'Cluster is guaranteed to be "alive" within this time '
'period.'),
cfg.IntOpt('cleanup_time_for_incomplete_clusters',
default=0,
help='Maximal time (in hours) for clusters allowed to be in '
'states other than "Active", "Deleting" or "Error". If a '
'cluster is not in "Active", "Deleting" or "Error" state '
'and last update of it was longer than '
'"cleanup_time_for_incomplete_clusters" hours ago then it '
'will be deleted automatically. (0 value means that '
'automatic clean up is disabled).')
]
CONF = cfg.CONF
@ -60,6 +69,45 @@ CONF.register_opts(periodic_opts)
conductor = c.API
def get_time_since_last_update(cluster):
cluster_updated_at = timeutils.normalize_time(
timeutils.parse_isotime(cluster.updated_at))
current_time = timeutils.utcnow()
spacing = timeutils.delta_seconds(cluster_updated_at,
current_time)
return spacing
def terminate_cluster(ctx, cluster, description):
if CONF.use_identity_api_v3:
trusts.use_os_admin_auth_token(cluster)
LOG.debug('Terminating %(description)s cluster %(cluster)s '
'in "%(status)s" state with id %(id)s',
{'cluster': cluster.name, 'id': cluster.id,
'status': cluster.status,
'description': description})
try:
ops.terminate_cluster(cluster.id)
except Exception as e:
LOG.warn(_LW('Failed to terminate %(description)s cluster '
'%(cluster)s in "%(status)s" state with id %(id)s: '
'%(error)s.'),
{'cluster': cluster.name,
'id': cluster.id,
'error': six.text_type(e),
'status': cluster.status,
'description': description})
else:
if cluster.status != 'AwaitingTermination':
conductor.cluster_update(
ctx,
cluster,
{'status': 'AwaitingTermination'})
def _make_periodic_tasks():
'''Return the periodic tasks object
@ -79,7 +127,7 @@ def _make_periodic_tasks():
context.set_ctx(None)
@periodic_task.periodic_task(spacing=90)
def terminate_unneeded_clusters(self, ctx):
def terminate_unneeded_transient_clusters(self, ctx):
LOG.debug('Terminating unneeded transient clusters')
ctx = context.get_admin_context()
context.set_ctx(ctx)
@ -94,36 +142,11 @@ def _make_periodic_tasks():
if jc > 0:
continue
cluster_updated_at = timeutils.normalize_time(
timeutils.parse_isotime(cluster.updated_at))
current_time = timeutils.utcnow()
spacing = timeutils.delta_seconds(cluster_updated_at,
current_time)
spacing = get_time_since_last_update(cluster)
if spacing < CONF.min_transient_cluster_active_time:
continue
if CONF.use_identity_api_v3:
trusts.use_os_admin_auth_token(cluster)
LOG.info(_LI('Terminating transient cluster %(cluster)s '
'with id %(id)s'),
{'cluster': cluster.name, 'id': cluster.id})
try:
ops.terminate_cluster(cluster.id)
except Exception as e:
LOG.info(_LI('Failed to terminate transient cluster '
'%(cluster)s with id %(id)s: %(error)s.'),
{'cluster': cluster.name,
'id': cluster.id,
'error': six.text_type(e)})
else:
if cluster.status != 'AwaitingTermination':
conductor.cluster_update(
ctx,
cluster,
{'status': 'AwaitingTermination'})
terminate_cluster(ctx, cluster, description='transient')
context.set_ctx(None)
@periodic_task.periodic_task(spacing=zombie_task_spacing)
@ -141,6 +164,30 @@ def _make_periodic_tasks():
p.proxy_user_delete(user_id=user.id)
context.set_ctx(None)
@periodic_task.periodic_task(spacing=3600)
def terminate_incomplete_clusters(self, ctx):
if CONF.cleanup_time_for_incomplete_clusters <= 0:
return
LOG.debug('Terminating old clusters in non-final state')
ctx = context.get_admin_context()
context.set_ctx(ctx)
# NOTE(alazarev) Retrieving all clusters once in hour for now.
# Criteria support need to be implemented in sahara db API to
# have SQL filtering.
for cluster in conductor.cluster_get_all(ctx):
if cluster.status in ['Active', 'Error', 'Deleting']:
continue
spacing = get_time_since_last_update(cluster)
if spacing < CONF.cleanup_time_for_incomplete_clusters * 3600:
continue
terminate_cluster(ctx, cluster, description='incomplete')
context.set_ctx(None)
return SaharaPeriodicTasks()

View File

@ -51,12 +51,10 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
get_job_status.assert_has_calls([mock.call(u'2'),
mock.call(u'3')])
@mock.patch('oslo_utils.timeutils.utcnow')
@mock.patch('sahara.service.ops.terminate_cluster')
def test_cluster_terminate(self, terminate_cluster, utcnow):
def test_transient_cluster_terminate(self, terminate_cluster):
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 0)
utcnow.override_time = False
timeutils.set_time_override(datetime.datetime(2005, 2, 1, 0, 0))
ctx = context.ctx()
job = self.api.job_create(ctx, te.SAMPLE_JOB)
@ -78,46 +76,87 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
"cluster_id": "2"},
job, ds, ds)
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 1)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, 0, 1))
p._make_periodic_tasks().terminate_unneeded_clusters(None)
p._make_periodic_tasks().terminate_unneeded_transient_clusters(None)
self.assertEqual(terminate_cluster.call_count, 1)
terminate_cluster.assert_has_calls([mock.call(u'1')])
@mock.patch('oslo_utils.timeutils.utcnow')
@mock.patch('sahara.service.ops.terminate_cluster')
def test_cluster_not_killed_too_early(self, terminate_cluster, utcnow):
def test_transient_cluster_not_killed_too_early(self, terminate_cluster):
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
utcnow.override_time = False
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=0))
self._make_cluster('1')
utcnow.return_value = datetime.datetime(2005, 2, 1, second=20)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=20))
p._make_periodic_tasks().terminate_unneeded_clusters(None)
p._make_periodic_tasks().terminate_unneeded_transient_clusters(None)
self.assertEqual(terminate_cluster.call_count, 0)
@mock.patch('oslo_utils.timeutils.utcnow')
@mock.patch('sahara.service.ops.terminate_cluster')
def test_cluster_killed_in_time(self, terminate_cluster, utcnow):
def test_transient_cluster_killed_in_time(self, terminate_cluster):
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
utcnow.override_time = False
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=0))
self._make_cluster('1')
utcnow.return_value = datetime.datetime(2005, 2, 1, second=40)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=40))
p._make_periodic_tasks().terminate_unneeded_clusters(None)
p._make_periodic_tasks().terminate_unneeded_transient_clusters(None)
self.assertEqual(terminate_cluster.call_count, 1)
terminate_cluster.assert_has_calls([mock.call(u'1')])
def _make_cluster(self, id_name):
@mock.patch('sahara.service.ops.terminate_cluster')
def test_incomplete_cluster_not_killed_too_early(self, terminate_cluster):
self.override_config('cleanup_time_for_incomplete_clusters', 1)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=0))
self._make_cluster('1', status='Pending')
timeutils.set_time_override(datetime.datetime(
2005, 2, 1, minute=59, second=50))
p._make_periodic_tasks().terminate_incomplete_clusters(None)
self.assertEqual(terminate_cluster.call_count, 0)
@mock.patch('sahara.service.ops.terminate_cluster')
def test_incomplete_cluster_killed_in_time(self, terminate_cluster):
self.override_config('cleanup_time_for_incomplete_clusters', 1)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=0))
self._make_cluster('1', status='Pending')
timeutils.set_time_override(datetime.datetime(
2005, 2, 1, hour=1, second=10))
p._make_periodic_tasks().terminate_incomplete_clusters(None)
self.assertEqual(terminate_cluster.call_count, 1)
terminate_cluster.assert_has_calls([mock.call(u'1')])
@mock.patch('sahara.service.ops.terminate_cluster')
def test_active_cluster_not_killed_as_inactive(
self, terminate_cluster):
self.override_config('cleanup_time_for_incomplete_clusters', 1)
timeutils.set_time_override(datetime.datetime(2005, 2, 1, second=0))
self._make_cluster('1')
timeutils.set_time_override(datetime.datetime(
2005, 2, 1, hour=1, second=10))
p._make_periodic_tasks().terminate_incomplete_clusters(None)
self.assertEqual(terminate_cluster.call_count, 0)
def _make_cluster(self, id_name, status='Active'):
ctx = context.ctx()
c = tc.SAMPLE_CLUSTER.copy()
c["status"] = "Active"
c["status"] = status
c["id"] = id_name
c["name"] = id_name
c['updated_at'] = timeutils.utcnow()