Support searching job executions by job status

As a convenience, allow job execution searches to be filtered by 'status'.
This search key will refer to the "info['status']" field in a JobExecution
record.  Because the "info" dictionary is stored as a string in the database,
and because it contains values from clients that may also contain 'status'
entries in nested objects, a substring search filter on the query is not reliable.
Instead results are filtered by Sahara after they are returned from sqlalchemy.

Partial-Implements: blueprint enable-result-filtering
Change-Id: I090b46514c7c920c3cac1a59a45e527ff87e488d
This commit is contained in:
Trevor McKay 2014-12-02 10:09:47 -05:00
parent 118edd54e7
commit f7bc1ad25a
5 changed files with 61 additions and 27 deletions

View File

@ -256,11 +256,12 @@ class LocalApi(object):
def job_execution_get_all(self, context, **kwargs):
"""Get all JobExecutions filtered by **kwargs.
kwargs key values are the names of fields in a JobExecution
with the addition of two special fields -- 'cluster.name'
and 'job.name'. These two fields support searching on the
names of the Cluster and/or Job objects referenced by the
JobExecution.
kwargs key values may be the names of fields in a JobExecution
plus the following special values with the indicated meaning:
'cluster.name' -- name of the Cluster referenced by the JobExecution
'job.name' -- name of the Job referenced by the JobExecution
'status' -- JobExecution['info']['status']
e.g. job_execution_get_all(cluster_id=12, input_id=123)
job_execution_get_all(**{'cluster.name': 'test',

View File

@ -310,11 +310,12 @@ class ConductorManager(db_base.Base):
def job_execution_get_all(self, context, **kwargs):
"""Get all JobExecutions filtered by **kwargs.
kwargs key values are the names of fields in a JobExecution
with the addition of two special fields -- 'cluster.name'
and 'job.name'. These two fields support searching on the
names of the Cluster and/or Job objects referenced by the
JobExecution.
kwargs key values may be the names of fields in a JobExecution
plus the following special values with the indicated meaning:
'cluster.name' -- name of the Cluster referenced by the JobExecution
'job.name' -- name of the Job referenced by the JobExecution
'status' -- JobExecution['info']['status']
e.g. job_execution_get_all(cluster_id=12, input_id=123)
job_execution_get_all(**{'cluster.name': 'test',

View File

@ -285,11 +285,13 @@ def job_execution_get(context, job_execution):
def job_execution_get_all(context, **kwargs):
"""Get all JobExecutions filtered by **kwargs.
kwargs key values are the names of fields in a JobExecution
with the addition of two special fields -- 'cluster.name'
and 'job.name'. These two fields support searching on the
names of the Cluster and/or Job objects referenced by the
JobExecution.
kwargs key values may be the names of fields in a JobExecution
plus the following special values with the indicated meaning:
'cluster.name' -- name of the Cluster referenced by the JobExecution
'job.name' -- name of the Job referenced by the JobExecution
'status' -- JobExecution['info']['status']
e.g. job_execution_get_all(cluster_id=12, input_id=123)
job_execution_get_all(**{'cluster.name': 'test',

View File

@ -547,11 +547,12 @@ def job_execution_get(context, job_execution_id):
def job_execution_get_all(context, **kwargs):
"""Get all JobExecutions filtered by **kwargs.
kwargs key values are the names of fields in a JobExecution
with the addition of two special fields -- 'cluster.name'
and 'job.name'. These two fields support searching on the
names of the Cluster and/or Job objects referenced by the
JobExecution.
kwargs key values may be the names of fields in a JobExecution
plus the following special values with the indicated meaning:
'cluster.name' -- name of the Cluster referenced by the JobExecution
'job.name' -- name of the Job referenced by the JobExecution
'status' -- JobExecution['info']['status']
e.g. job_execution_get_all(cluster_id=12, input_id=123)
job_execution_get_all(**{'cluster.name': 'test',
@ -562,7 +563,8 @@ def job_execution_get_all(context, **kwargs):
# Remove the external fields if present, they'll
# be handled with a join and filter
externals = {k: kwargs.pop(k) for k in ['cluster.name',
'job.name'] if k in kwargs}
'job.name',
'status'] if k in kwargs}
# Filter JobExecution by the remaining kwargs. This has to be done
# before application of the joins and filters because those
@ -586,7 +588,20 @@ def job_execution_get_all(context, **kwargs):
query = query.join(m.Job).filter(
m.Job.name == externals['job.name'])
return query.all()
res = query.all()
# 'info' is a JsonDictType which is stored as a string.
# It would be possible to search for the substring containing
# the value of 'status' in 'info', but 'info' also contains
# data returned from a client and not managed by Sahara.
# In the case of Oozie jobs, for example, other fields (actions)
# also contain 'status'. Therefore we can't filter on it reliably
# by a substring search in the query.
if 'status' in externals:
status = externals['status'].lower()
res = [je for je in res if (
je['info'] and je['info'].get('status', '').lower() == status)]
return res
def job_execution_count(context, **kwargs):

View File

@ -302,8 +302,9 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
# Run job on cluster 1
self.api.job_execution_create(ctx, my_sample_job_exec)
# Run the same job on cluster 2
# Run the same job on cluster 2 and set status
my_sample_job_exec['cluster_id'] = cl2['id']
my_sample_job_exec['info'] = {'status': 'KiLLeD'}
self.api.job_execution_create(ctx, my_sample_job_exec)
# Search only with job exeuction fields (finds both)
@ -311,18 +312,26 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
self.assertEqual(len(lst), 2)
# Search on cluster name
kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'],
kwargs = {'cluster.name': cl1['name'],
'return_code': 1}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 1)
# Search on cluster name and job name
kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'],
kwargs = {'cluster.name': cl1['name'],
'job.name': SAMPLE_JOB['name'],
'return_code': 1}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 1)
# Search on cluster name, job name, and status
kwargs = {'cluster.name': cl2['name'],
'job.name': SAMPLE_JOB['name'],
'status': 'killed',
'return_code': 1}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 1)
# Search on job name (finds both)
kwargs = {'job.name': SAMPLE_JOB['name'],
'return_code': 1}
@ -330,17 +339,23 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
self.assertEqual(len(lst), 2)
# invalid cluster name value
kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name']+'foo',
kwargs = {'cluster.name': cl1['name']+'foo',
'job.name': SAMPLE_JOB['name']}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 0)
# invalid job name value
kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'],
kwargs = {'cluster.name': cl1['name'],
'job.name': SAMPLE_JOB['name']+'foo'}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 0)
# invalid status value
kwargs = {'cluster.name': cl1['name'],
'status': 'PENDING'}
lst = self.api.job_execution_get_all(ctx, **kwargs)
self.assertEqual(len(lst), 0)
class JobTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):