Merge "Support function version for job"
This commit is contained in:
commit
e423187796
qinling
api/controllers/v1
services
tests/unit
@ -61,8 +61,12 @@ class JobsController(rest.RestController):
|
||||
first_time, next_time, count = jobs.validate_job(params)
|
||||
LOG.info("Creating %s, params: %s", self.type, params)
|
||||
|
||||
version = params.get('function_version', 0)
|
||||
|
||||
with db_api.transaction():
|
||||
db_api.get_function(params['function_id'])
|
||||
if version > 0:
|
||||
db_api.get_function_version(params['function_id'], version)
|
||||
|
||||
values = {
|
||||
'name': params.get('name'),
|
||||
@ -71,6 +75,7 @@ class JobsController(rest.RestController):
|
||||
'next_execution_time': next_time,
|
||||
'count': count,
|
||||
'function_id': params['function_id'],
|
||||
'function_version': version,
|
||||
'function_input': params.get('function_input'),
|
||||
'status': status.RUNNING
|
||||
}
|
||||
|
@ -318,6 +318,7 @@ class Job(Resource):
|
||||
id = types.uuid
|
||||
name = wtypes.text
|
||||
function_id = types.uuid
|
||||
function_version = wsme.wsattr(int, default=0)
|
||||
function_input = wtypes.text
|
||||
status = wtypes.text
|
||||
pattern = wtypes.text
|
||||
@ -328,23 +329,6 @@ class Job(Resource):
|
||||
created_at = wsme.wsattr(wtypes.text, readonly=True)
|
||||
updated_at = wsme.wsattr(wtypes.text, readonly=True)
|
||||
|
||||
@classmethod
|
||||
def sample(cls):
|
||||
return cls(
|
||||
id='123e4567-e89b-12d3-a456-426655440000',
|
||||
name='my_job',
|
||||
function_id='123e4567-e89b-12d3-a456-426655440000',
|
||||
function_input={'data': 'hello, world'},
|
||||
status='done',
|
||||
pattern='* * * * *',
|
||||
count=0,
|
||||
first_execution_time='',
|
||||
next_execution_time='',
|
||||
project_id='default',
|
||||
created_at='1970-01-01T00:00:00.000000',
|
||||
updated_at='1970-01-01T00:00:00.000000'
|
||||
)
|
||||
|
||||
|
||||
class Jobs(ResourceList):
|
||||
jobs = [Job]
|
||||
@ -354,18 +338,6 @@ class Jobs(ResourceList):
|
||||
|
||||
super(Jobs, self).__init__(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def sample(cls):
|
||||
sample = cls()
|
||||
sample.jobs = [Job.sample()]
|
||||
sample.next = (
|
||||
"http://localhost:7070/v1/jobs?"
|
||||
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
|
||||
"marker=123e4567-e89b-12d3-a456-426655440000"
|
||||
)
|
||||
|
||||
return sample
|
||||
|
||||
|
||||
class ScaleInfo(Resource):
|
||||
count = wtypes.IntegerType(minimum=1)
|
||||
|
@ -96,7 +96,9 @@ def handle_job(engine_client):
|
||||
for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
|
||||
job_id = job.id
|
||||
func_id = job.function_id
|
||||
LOG.debug("Processing job: %s, function: %s", job_id, func_id)
|
||||
func_version = job.function_version
|
||||
LOG.debug("Processing job: %s, function: %s(version %s)", job_id,
|
||||
func_id, func_version)
|
||||
|
||||
func_db = db_api.get_function(func_id, insecure=True)
|
||||
trust_id = func_db.trust_id
|
||||
@ -153,11 +155,13 @@ def handle_job(engine_client):
|
||||
continue
|
||||
|
||||
LOG.debug(
|
||||
"Starting to execute function %s by job %s", func_id, job_id
|
||||
"Starting to execute function %s(version %s) by job %s",
|
||||
func_id, func_version, job_id
|
||||
)
|
||||
|
||||
params = {
|
||||
'function_id': func_id,
|
||||
'function_version': func_version,
|
||||
'input': job.function_input,
|
||||
'sync': False,
|
||||
'description': constants.EXECUTION_BY_JOB % job_id
|
||||
|
@ -41,6 +41,21 @@ class TestJobController(base.APITest):
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
def test_post_with_version(self):
|
||||
db_api.increase_function_version(self.function_id, 0)
|
||||
|
||||
body = {
|
||||
'name': self.rand_name('job', prefix=self.prefix),
|
||||
'first_execution_time': str(
|
||||
datetime.utcnow() + timedelta(hours=1)),
|
||||
'function_id': self.function_id,
|
||||
'function_version': 1,
|
||||
}
|
||||
resp = self.app.post_json('/v1/jobs', body)
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
self.assertEqual(1, resp.json.get('function_version'))
|
||||
|
||||
def test_post_pattern(self):
|
||||
body = {
|
||||
'name': self.rand_name('job', prefix=self.prefix),
|
||||
|
@ -118,3 +118,53 @@ class TestPeriodics(base.DbTestCase):
|
||||
self.assertEqual(2, db_func.count)
|
||||
db_execs = db_api.get_executions(function_id=function_id)
|
||||
self.assertEqual(2, len(db_execs))
|
||||
|
||||
@mock.patch('qinling.utils.jobs.get_next_execution_time')
|
||||
def test_job_handler_with_version(self, mock_next_time):
|
||||
db_func = self.create_function()
|
||||
function_id = db_func.id
|
||||
new_version = db_api.increase_function_version(function_id, 0)
|
||||
|
||||
self.assertEqual(0, new_version.count)
|
||||
|
||||
now = datetime.utcnow()
|
||||
db_job = self.create_job(
|
||||
function_id,
|
||||
function_version=1,
|
||||
status=status.RUNNING,
|
||||
next_execution_time=now,
|
||||
count=2
|
||||
)
|
||||
job_id = db_job.id
|
||||
|
||||
e_client = mock.Mock()
|
||||
# It doesn't matter what's the returned value, but need to be in
|
||||
# datetime type.
|
||||
mock_next_time.return_value = now + timedelta(seconds=1)
|
||||
|
||||
periodics.handle_job(e_client)
|
||||
context.set_ctx(self.ctx)
|
||||
|
||||
db_job = db_api.get_job(job_id)
|
||||
self.assertEqual(1, db_job.count)
|
||||
db_func = db_api.get_function(function_id)
|
||||
self.assertEqual(0, db_func.count)
|
||||
db_version = db_api.get_function_version(function_id, 1)
|
||||
self.assertEqual(1, db_version.count)
|
||||
db_execs = db_api.get_executions(function_id=function_id,
|
||||
function_version=1)
|
||||
self.assertEqual(1, len(db_execs))
|
||||
|
||||
periodics.handle_job(e_client)
|
||||
context.set_ctx(self.ctx)
|
||||
|
||||
db_job = db_api.get_job(job_id)
|
||||
self.assertEqual(0, db_job.count)
|
||||
self.assertEqual(status.DONE, db_job.status)
|
||||
db_func = db_api.get_function(function_id)
|
||||
self.assertEqual(0, db_func.count)
|
||||
db_version = db_api.get_function_version(function_id, 1)
|
||||
self.assertEqual(2, db_version.count)
|
||||
db_execs = db_api.get_executions(function_id=function_id,
|
||||
function_version=1)
|
||||
self.assertEqual(2, len(db_execs))
|
||||
|
Loading…
x
Reference in New Issue
Block a user