Browse Source

Support function version for job

Change-Id: If754e2e04ee3b46f3ca8ea7efe60b6e0c1454d04
Story: 2001829
Task: 14412
changes/05/563805/1
Lingxian Kong 3 years ago
parent
commit
7d79bf3c75
5 changed files with 77 additions and 31 deletions
  1. +5
    -0
      qinling/api/controllers/v1/job.py
  2. +1
    -29
      qinling/api/controllers/v1/resources.py
  3. +6
    -2
      qinling/services/periodics.py
  4. +15
    -0
      qinling/tests/unit/api/controllers/v1/test_job.py
  5. +50
    -0
      qinling/tests/unit/services/test_periodics.py

+ 5
- 0
qinling/api/controllers/v1/job.py View File

@ -61,8 +61,12 @@ class JobsController(rest.RestController):
first_time, next_time, count = jobs.validate_job(params)
LOG.info("Creating %s, params: %s", self.type, params)
version = params.get('function_version', 0)
with db_api.transaction():
db_api.get_function(params['function_id'])
if version > 0:
db_api.get_function_version(params['function_id'], version)
values = {
'name': params.get('name'),
@ -71,6 +75,7 @@ class JobsController(rest.RestController):
'next_execution_time': next_time,
'count': count,
'function_id': params['function_id'],
'function_version': version,
'function_input': params.get('function_input'),
'status': status.RUNNING
}


+ 1
- 29
qinling/api/controllers/v1/resources.py View File

@ -318,6 +318,7 @@ class Job(Resource):
id = types.uuid
name = wtypes.text
function_id = types.uuid
function_version = wsme.wsattr(int, default=0)
function_input = wtypes.text
status = wtypes.text
pattern = wtypes.text
@ -328,23 +329,6 @@ class Job(Resource):
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
name='my_job',
function_id='123e4567-e89b-12d3-a456-426655440000',
function_input={'data': 'hello, world'},
status='done',
pattern='* * * * *',
count=0,
first_execution_time='',
next_execution_time='',
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Jobs(ResourceList):
jobs = [Job]
@ -354,18 +338,6 @@ class Jobs(ResourceList):
super(Jobs, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.jobs = [Job.sample()]
sample.next = (
"http://localhost:7070/v1/jobs?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000"
)
return sample
class ScaleInfo(Resource):
count = wtypes.IntegerType(minimum=1)


+ 6
- 2
qinling/services/periodics.py View File

@ -96,7 +96,9 @@ def handle_job(engine_client):
for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
job_id = job.id
func_id = job.function_id
LOG.debug("Processing job: %s, function: %s", job_id, func_id)
func_version = job.function_version
LOG.debug("Processing job: %s, function: %s(version %s)", job_id,
func_id, func_version)
func_db = db_api.get_function(func_id, insecure=True)
trust_id = func_db.trust_id
@ -153,11 +155,13 @@ def handle_job(engine_client):
continue
LOG.debug(
"Starting to execute function %s by job %s", func_id, job_id
"Starting to execute function %s(version %s) by job %s",
func_id, func_version, job_id
)
params = {
'function_id': func_id,
'function_version': func_version,
'input': job.function_input,
'sync': False,
'description': constants.EXECUTION_BY_JOB % job_id


+ 15
- 0
qinling/tests/unit/api/controllers/v1/test_job.py View File

@ -41,6 +41,21 @@ class TestJobController(base.APITest):
self.assertEqual(201, resp.status_int)
def test_post_with_version(self):
db_api.increase_function_version(self.function_id, 0)
body = {
'name': self.rand_name('job', prefix=self.prefix),
'first_execution_time': str(
datetime.utcnow() + timedelta(hours=1)),
'function_id': self.function_id,
'function_version': 1,
}
resp = self.app.post_json('/v1/jobs', body)
self.assertEqual(201, resp.status_int)
self.assertEqual(1, resp.json.get('function_version'))
def test_post_pattern(self):
body = {
'name': self.rand_name('job', prefix=self.prefix),


+ 50
- 0
qinling/tests/unit/services/test_periodics.py View File

@ -118,3 +118,53 @@ class TestPeriodics(base.DbTestCase):
self.assertEqual(2, db_func.count)
db_execs = db_api.get_executions(function_id=function_id)
self.assertEqual(2, len(db_execs))
@mock.patch('qinling.utils.jobs.get_next_execution_time')
def test_job_handler_with_version(self, mock_next_time):
db_func = self.create_function()
function_id = db_func.id
new_version = db_api.increase_function_version(function_id, 0)
self.assertEqual(0, new_version.count)
now = datetime.utcnow()
db_job = self.create_job(
function_id,
function_version=1,
status=status.RUNNING,
next_execution_time=now,
count=2
)
job_id = db_job.id
e_client = mock.Mock()
# It doesn't matter what's the returned value, but need to be in
# datetime type.
mock_next_time.return_value = now + timedelta(seconds=1)
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
db_job = db_api.get_job(job_id)
self.assertEqual(1, db_job.count)
db_func = db_api.get_function(function_id)
self.assertEqual(0, db_func.count)
db_version = db_api.get_function_version(function_id, 1)
self.assertEqual(1, db_version.count)
db_execs = db_api.get_executions(function_id=function_id,
function_version=1)
self.assertEqual(1, len(db_execs))
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
db_job = db_api.get_job(job_id)
self.assertEqual(0, db_job.count)
self.assertEqual(status.DONE, db_job.status)
db_func = db_api.get_function(function_id)
self.assertEqual(0, db_func.count)
db_version = db_api.get_function_version(function_id, 1)
self.assertEqual(2, db_version.count)
db_execs = db_api.get_executions(function_id=function_id,
function_version=1)
self.assertEqual(2, len(db_execs))

Loading…
Cancel
Save