Added unit tests for workflow executions and task executions filtering

Change-Id: I2892c6c3d02689d11131c4955b56e705de89dfe6
This commit is contained in:
Hardik Parekh 2016-09-19 16:50:22 +05:30
parent 8fa0fd2654
commit 17b892da92
1 changed files with 277 additions and 0 deletions

View File

@ -1405,6 +1405,141 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_equal_value(self):
db_api.create_workflow_execution(WF_EXECS[0])
created = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'id',
created.id,
'eq'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_workflow_execution_by_not_equal_value(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'id',
created0.id,
'neq'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_greater_than_value(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
created0['created_at'],
'gt'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_greater_than_equal_value(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
created0['created_at'],
'gte'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(2, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_filter_workflow_execution_by_less_than_value(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
created1['created_at'],
'lt'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_less_than_equal_value(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
created1['created_at'],
'lte'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(2, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_filter_workflow_execution_by_values_in_list(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
[created0['created_at']],
'in'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_workflow_execution_by_values_notin_list(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
[created0['created_at']],
'nin'
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_workflow_execution_by_multiple_columns(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
_filter = filter_utils.create_or_update_filter(
'created_at',
[created0['created_at'], created1['created_at']],
'in'
)
_filter = filter_utils.create_or_update_filter(
'id',
created1.id,
'eq',
_filter
)
fetched = db_api.get_workflow_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_delete_workflow_execution(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
@ -1515,6 +1650,7 @@ TASK_EXECS = [
class TaskExecutionTest(SQLAlchemyTest):
def test_create_and_get_and_load_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -1653,6 +1789,132 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_filter_task_execution_by_equal_value(self):
created, _ = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'name',
created.name,
'eq'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_not_equal_value(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'name',
created0.name,
'neq'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_value(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
created0['created_at'],
'gt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_equal_value(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
created0['created_at'],
'gte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_filter_task_execution_by_less_than_value(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
created1['created_at'],
'lt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_task_execution_by_less_than_equal_value(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
created1['created_at'],
'lte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_filter_task_execution_by_values_in_list(self):
created, _ = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
[created['created_at']],
'in'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_values_notin_list(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
[created0['created_at']],
'nin'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_multiple_columns(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
'created_at',
[created0['created_at'], created1['created_at']],
'in'
)
_filter = filter_utils.create_or_update_filter(
'name',
created1.name,
'eq',
_filter
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_delete_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -1730,6 +1992,21 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertIn("'state': 'IDLE'", s)
self.assertIn("'name': 'my_task1'", s)
def _create_task_executions(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created0 = db_api.create_task_execution(values)
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
created1 = db_api.create_task_execution(values)
return created0, created1
CRON_TRIGGERS = [
{