Merge "Add db api tests for scheduled jobs"

This commit is contained in:
Zuul 2019-09-11 06:40:01 +00:00 committed by Gerrit Code Review
commit 7698c3def0
2 changed files with 111 additions and 0 deletions

View File

@ -1221,6 +1221,8 @@ def delete_delayed_calls(session=None, **kwargs):
return _delete_all(models.DelayedCall, **kwargs)
# Scheduled jobs.
@b.session_aware()
def create_scheduled_job(values, session=None):
job = models.ScheduledJob()

View File

@ -2654,6 +2654,115 @@ class CronTriggerTest(SQLAlchemyTest):
self.assertIn("'name': 'trigger1'", s)
SCHEDULED_JOBS = [
{
'run_after': 30,
'func_name': 'test_module.test_func',
'func_args': {
'server': 'localhost',
'database': 'test',
'timeout': 600,
'verbose': True
},
'execute_at': datetime.datetime(2019, 7, 6, 15, 1, 0)
},
{
'run_after': 50,
'target_factory_func_name': 'test_target_factory_func',
'func_name': 'test_func',
'func_args': {
'server': 'localhost',
'database': 'test',
'timeout': 600,
'verbose': True
},
'execute_at': datetime.datetime(2019, 7, 6, 20, 30, 0)
},
]
class ScheduledJobTest(SQLAlchemyTest):
def setUp(self):
super(ScheduledJobTest, self).setUp()
db_api.delete_scheduled_jobs()
def test_create_and_get_scheduled_job(self):
created = db_api.create_scheduled_job(SCHEDULED_JOBS[0])
fetched = db_api.get_scheduled_job(created.id)
self.assertEqual(created, fetched)
def test_create_scheduled_job_duplicate_without_auth(self):
cfg.CONF.set_default('auth_enable', False, group='pecan')
db_api.create_scheduled_job(SCHEDULED_JOBS[0])
db_api.create_scheduled_job(SCHEDULED_JOBS[0])
def test_update_scheduled_job(self):
created = db_api.create_scheduled_job(SCHEDULED_JOBS[0])
self.assertIsNone(created.updated_at)
updated = db_api.update_scheduled_job(
created.id,
{'captured_at': datetime.datetime(2019, 7, 6, 20, 30, 0)}
)
self.assertEqual(
datetime.datetime(2019, 7, 6, 20, 30, 0),
updated[0].captured_at
)
fetched = db_api.get_scheduled_job(created.id)
self.assertEqual(updated[0], fetched)
self.assertIsNotNone(fetched.updated_at)
def test_get_scheduled_jobs(self):
created0 = db_api.create_scheduled_job(SCHEDULED_JOBS[0])
created1 = db_api.create_scheduled_job(SCHEDULED_JOBS[1])
fetched = db_api.get_scheduled_jobs()
self.assertEqual(2, len(fetched))
self._assert_single_item(fetched, func_name=created0['func_name'])
self._assert_single_item(fetched, func_name=created1['func_name'])
def test_delete_scheduled_job(self):
created = db_api.create_scheduled_job(SCHEDULED_JOBS[0])
fetched = db_api.get_scheduled_job(created.id)
self.assertEqual(created, fetched)
db_api.delete_scheduled_job(created.id)
self.assertRaises(
exc.DBEntityNotFoundError,
db_api.get_environment,
created.id
)
def test_get_scheduled_jobs_count(self):
res = db_api.get_scheduled_jobs_count()
self.assertEqual(0, res)
created0 = db_api.create_scheduled_job(SCHEDULED_JOBS[0])
created1 = db_api.create_scheduled_job(SCHEDULED_JOBS[1])
res = db_api.get_scheduled_jobs_count()
self.assertEqual(2, res)
db_api.delete_scheduled_job(created0.id)
res = db_api.get_scheduled_jobs_count()
self.assertEqual(1, res)
db_api.delete_scheduled_job(created1.id)
res = db_api.get_scheduled_jobs_count()
self.assertEqual(0, res)
ENVIRONMENTS = [
{
'name': 'env1',