diff --git a/sahara/conductor/api.py b/sahara/conductor/api.py index a4b3cde2..7dc7179c 100644 --- a/sahara/conductor/api.py +++ b/sahara/conductor/api.py @@ -256,11 +256,12 @@ class LocalApi(object): def job_execution_get_all(self, context, **kwargs): """Get all JobExecutions filtered by **kwargs. - kwargs key values are the names of fields in a JobExecution - with the addition of two special fields -- 'cluster.name' - and 'job.name'. These two fields support searching on the - names of the Cluster and/or Job objects referenced by the - JobExecution. + kwargs key values may be the names of fields in a JobExecution + plus the following special values with the indicated meaning: + + 'cluster.name' -- name of the Cluster referenced by the JobExecution + 'job.name' -- name of the Job referenced by the JobExecution + 'status' -- JobExecution['info']['status'] e.g. job_execution_get_all(cluster_id=12, input_id=123) job_execution_get_all(**{'cluster.name': 'test', diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 72124a5b..d00f83bf 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -310,11 +310,12 @@ class ConductorManager(db_base.Base): def job_execution_get_all(self, context, **kwargs): """Get all JobExecutions filtered by **kwargs. - kwargs key values are the names of fields in a JobExecution - with the addition of two special fields -- 'cluster.name' - and 'job.name'. These two fields support searching on the - names of the Cluster and/or Job objects referenced by the - JobExecution. + kwargs key values may be the names of fields in a JobExecution + plus the following special values with the indicated meaning: + + 'cluster.name' -- name of the Cluster referenced by the JobExecution + 'job.name' -- name of the Job referenced by the JobExecution + 'status' -- JobExecution['info']['status'] e.g. job_execution_get_all(cluster_id=12, input_id=123) job_execution_get_all(**{'cluster.name': 'test', diff --git a/sahara/db/api.py b/sahara/db/api.py index ef0320c7..c6674944 100644 --- a/sahara/db/api.py +++ b/sahara/db/api.py @@ -285,11 +285,13 @@ def job_execution_get(context, job_execution): def job_execution_get_all(context, **kwargs): """Get all JobExecutions filtered by **kwargs. - kwargs key values are the names of fields in a JobExecution - with the addition of two special fields -- 'cluster.name' - and 'job.name'. These two fields support searching on the - names of the Cluster and/or Job objects referenced by the - JobExecution. + + kwargs key values may be the names of fields in a JobExecution + plus the following special values with the indicated meaning: + + 'cluster.name' -- name of the Cluster referenced by the JobExecution + 'job.name' -- name of the Job referenced by the JobExecution + 'status' -- JobExecution['info']['status'] e.g. job_execution_get_all(cluster_id=12, input_id=123) job_execution_get_all(**{'cluster.name': 'test', diff --git a/sahara/db/sqlalchemy/api.py b/sahara/db/sqlalchemy/api.py index f2940608..13ea50e6 100644 --- a/sahara/db/sqlalchemy/api.py +++ b/sahara/db/sqlalchemy/api.py @@ -547,11 +547,12 @@ def job_execution_get(context, job_execution_id): def job_execution_get_all(context, **kwargs): """Get all JobExecutions filtered by **kwargs. - kwargs key values are the names of fields in a JobExecution - with the addition of two special fields -- 'cluster.name' - and 'job.name'. These two fields support searching on the - names of the Cluster and/or Job objects referenced by the - JobExecution. + kwargs key values may be the names of fields in a JobExecution + plus the following special values with the indicated meaning: + + 'cluster.name' -- name of the Cluster referenced by the JobExecution + 'job.name' -- name of the Job referenced by the JobExecution + 'status' -- JobExecution['info']['status'] e.g. job_execution_get_all(cluster_id=12, input_id=123) job_execution_get_all(**{'cluster.name': 'test', @@ -562,7 +563,8 @@ def job_execution_get_all(context, **kwargs): # Remove the external fields if present, they'll # be handled with a join and filter externals = {k: kwargs.pop(k) for k in ['cluster.name', - 'job.name'] if k in kwargs} + 'job.name', + 'status'] if k in kwargs} # Filter JobExecution by the remaining kwargs. This has to be done # before application of the joins and filters because those @@ -586,7 +588,20 @@ def job_execution_get_all(context, **kwargs): query = query.join(m.Job).filter( m.Job.name == externals['job.name']) - return query.all() + res = query.all() + + # 'info' is a JsonDictType which is stored as a string. + # It would be possible to search for the substring containing + # the value of 'status' in 'info', but 'info' also contains + # data returned from a client and not managed by Sahara. + # In the case of Oozie jobs, for example, other fields (actions) + # also contain 'status'. Therefore we can't filter on it reliably + # by a substring search in the query. + if 'status' in externals: + status = externals['status'].lower() + res = [je for je in res if ( + je['info'] and je['info'].get('status', '').lower() == status)] + return res def job_execution_count(context, **kwargs): diff --git a/sahara/tests/unit/conductor/manager/test_edp.py b/sahara/tests/unit/conductor/manager/test_edp.py index ef704d18..0c49dd08 100644 --- a/sahara/tests/unit/conductor/manager/test_edp.py +++ b/sahara/tests/unit/conductor/manager/test_edp.py @@ -302,8 +302,9 @@ class JobExecutionTest(test_base.ConductorManagerTestCase): # Run job on cluster 1 self.api.job_execution_create(ctx, my_sample_job_exec) - # Run the same job on cluster 2 + # Run the same job on cluster 2 and set status my_sample_job_exec['cluster_id'] = cl2['id'] + my_sample_job_exec['info'] = {'status': 'KiLLeD'} self.api.job_execution_create(ctx, my_sample_job_exec) # Search only with job exeuction fields (finds both) @@ -311,18 +312,26 @@ class JobExecutionTest(test_base.ConductorManagerTestCase): self.assertEqual(len(lst), 2) # Search on cluster name - kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'], + kwargs = {'cluster.name': cl1['name'], 'return_code': 1} lst = self.api.job_execution_get_all(ctx, **kwargs) self.assertEqual(len(lst), 1) # Search on cluster name and job name - kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'], + kwargs = {'cluster.name': cl1['name'], 'job.name': SAMPLE_JOB['name'], 'return_code': 1} lst = self.api.job_execution_get_all(ctx, **kwargs) self.assertEqual(len(lst), 1) + # Search on cluster name, job name, and status + kwargs = {'cluster.name': cl2['name'], + 'job.name': SAMPLE_JOB['name'], + 'status': 'killed', + 'return_code': 1} + lst = self.api.job_execution_get_all(ctx, **kwargs) + self.assertEqual(len(lst), 1) + # Search on job name (finds both) kwargs = {'job.name': SAMPLE_JOB['name'], 'return_code': 1} @@ -330,17 +339,23 @@ class JobExecutionTest(test_base.ConductorManagerTestCase): self.assertEqual(len(lst), 2) # invalid cluster name value - kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name']+'foo', + kwargs = {'cluster.name': cl1['name']+'foo', 'job.name': SAMPLE_JOB['name']} lst = self.api.job_execution_get_all(ctx, **kwargs) self.assertEqual(len(lst), 0) # invalid job name value - kwargs = {'cluster.name': test_clusters.SAMPLE_CLUSTER['name'], + kwargs = {'cluster.name': cl1['name'], 'job.name': SAMPLE_JOB['name']+'foo'} lst = self.api.job_execution_get_all(ctx, **kwargs) self.assertEqual(len(lst), 0) + # invalid status value + kwargs = {'cluster.name': cl1['name'], + 'status': 'PENDING'} + lst = self.api.job_execution_get_all(ctx, **kwargs) + self.assertEqual(len(lst), 0) + class JobTest(test_base.ConductorManagerTestCase): def __init__(self, *args, **kwargs):