From 7d79bf3c7594a0bca6b756234beead5485f4b4f3 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 24 Apr 2018 15:23:38 +1200 Subject: [PATCH] Support function version for job Change-Id: If754e2e04ee3b46f3ca8ea7efe60b6e0c1454d04 Story: 2001829 Task: 14412 --- qinling/api/controllers/v1/job.py | 5 ++ qinling/api/controllers/v1/resources.py | 30 +---------- qinling/services/periodics.py | 8 ++- .../tests/unit/api/controllers/v1/test_job.py | 15 ++++++ qinling/tests/unit/services/test_periodics.py | 50 +++++++++++++++++++ 5 files changed, 77 insertions(+), 31 deletions(-) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index 179f1a30..62bb81fc 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -61,8 +61,12 @@ class JobsController(rest.RestController): first_time, next_time, count = jobs.validate_job(params) LOG.info("Creating %s, params: %s", self.type, params) + version = params.get('function_version', 0) + with db_api.transaction(): db_api.get_function(params['function_id']) + if version > 0: + db_api.get_function_version(params['function_id'], version) values = { 'name': params.get('name'), @@ -71,6 +75,7 @@ class JobsController(rest.RestController): 'next_execution_time': next_time, 'count': count, 'function_id': params['function_id'], + 'function_version': version, 'function_input': params.get('function_input'), 'status': status.RUNNING } diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index b9c95504..7918e610 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -318,6 +318,7 @@ class Job(Resource): id = types.uuid name = wtypes.text function_id = types.uuid + function_version = wsme.wsattr(int, default=0) function_input = wtypes.text status = wtypes.text pattern = wtypes.text @@ -328,23 +329,6 @@ class Job(Resource): created_at = wsme.wsattr(wtypes.text, readonly=True) updated_at = wsme.wsattr(wtypes.text, readonly=True) - @classmethod - def sample(cls): - return cls( - id='123e4567-e89b-12d3-a456-426655440000', - name='my_job', - function_id='123e4567-e89b-12d3-a456-426655440000', - function_input={'data': 'hello, world'}, - status='done', - pattern='* * * * *', - count=0, - first_execution_time='', - next_execution_time='', - project_id='default', - created_at='1970-01-01T00:00:00.000000', - updated_at='1970-01-01T00:00:00.000000' - ) - class Jobs(ResourceList): jobs = [Job] @@ -354,18 +338,6 @@ class Jobs(ResourceList): super(Jobs, self).__init__(**kwargs) - @classmethod - def sample(cls): - sample = cls() - sample.jobs = [Job.sample()] - sample.next = ( - "http://localhost:7070/v1/jobs?" - "sort_keys=id,name&sort_dirs=asc,desc&limit=10&" - "marker=123e4567-e89b-12d3-a456-426655440000" - ) - - return sample - class ScaleInfo(Resource): count = wtypes.IntegerType(minimum=1) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index fb582a88..84dbdf14 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -96,7 +96,9 @@ def handle_job(engine_client): for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)): job_id = job.id func_id = job.function_id - LOG.debug("Processing job: %s, function: %s", job_id, func_id) + func_version = job.function_version + LOG.debug("Processing job: %s, function: %s(version %s)", job_id, + func_id, func_version) func_db = db_api.get_function(func_id, insecure=True) trust_id = func_db.trust_id @@ -153,11 +155,13 @@ def handle_job(engine_client): continue LOG.debug( - "Starting to execute function %s by job %s", func_id, job_id + "Starting to execute function %s(version %s) by job %s", + func_id, func_version, job_id ) params = { 'function_id': func_id, + 'function_version': func_version, 'input': job.function_input, 'sync': False, 'description': constants.EXECUTION_BY_JOB % job_id diff --git a/qinling/tests/unit/api/controllers/v1/test_job.py b/qinling/tests/unit/api/controllers/v1/test_job.py index 6ca26fe4..4d89ebb7 100644 --- a/qinling/tests/unit/api/controllers/v1/test_job.py +++ b/qinling/tests/unit/api/controllers/v1/test_job.py @@ -41,6 +41,21 @@ class TestJobController(base.APITest): self.assertEqual(201, resp.status_int) + def test_post_with_version(self): + db_api.increase_function_version(self.function_id, 0) + + body = { + 'name': self.rand_name('job', prefix=self.prefix), + 'first_execution_time': str( + datetime.utcnow() + timedelta(hours=1)), + 'function_id': self.function_id, + 'function_version': 1, + } + resp = self.app.post_json('/v1/jobs', body) + + self.assertEqual(201, resp.status_int) + self.assertEqual(1, resp.json.get('function_version')) + def test_post_pattern(self): body = { 'name': self.rand_name('job', prefix=self.prefix), diff --git a/qinling/tests/unit/services/test_periodics.py b/qinling/tests/unit/services/test_periodics.py index bda10ada..7b2bf0ba 100644 --- a/qinling/tests/unit/services/test_periodics.py +++ b/qinling/tests/unit/services/test_periodics.py @@ -118,3 +118,53 @@ class TestPeriodics(base.DbTestCase): self.assertEqual(2, db_func.count) db_execs = db_api.get_executions(function_id=function_id) self.assertEqual(2, len(db_execs)) + + @mock.patch('qinling.utils.jobs.get_next_execution_time') + def test_job_handler_with_version(self, mock_next_time): + db_func = self.create_function() + function_id = db_func.id + new_version = db_api.increase_function_version(function_id, 0) + + self.assertEqual(0, new_version.count) + + now = datetime.utcnow() + db_job = self.create_job( + function_id, + function_version=1, + status=status.RUNNING, + next_execution_time=now, + count=2 + ) + job_id = db_job.id + + e_client = mock.Mock() + # It doesn't matter what's the returned value, but need to be in + # datetime type. + mock_next_time.return_value = now + timedelta(seconds=1) + + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + db_job = db_api.get_job(job_id) + self.assertEqual(1, db_job.count) + db_func = db_api.get_function(function_id) + self.assertEqual(0, db_func.count) + db_version = db_api.get_function_version(function_id, 1) + self.assertEqual(1, db_version.count) + db_execs = db_api.get_executions(function_id=function_id, + function_version=1) + self.assertEqual(1, len(db_execs)) + + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + db_job = db_api.get_job(job_id) + self.assertEqual(0, db_job.count) + self.assertEqual(status.DONE, db_job.status) + db_func = db_api.get_function(function_id) + self.assertEqual(0, db_func.count) + db_version = db_api.get_function_version(function_id, 1) + self.assertEqual(2, db_version.count) + db_execs = db_api.get_executions(function_id=function_id, + function_version=1) + self.assertEqual(2, len(db_execs))