Browse Source

Merge "Parse function alias when job is running"

changes/48/671648/1
Zuul 2 years ago
committed by Gerrit Code Review
parent
commit
05387ff01a
8 changed files with 128 additions and 38 deletions
  1. +1
    -1
      qinling/api/controllers/v1/function_alias.py
  2. +21
    -21
      qinling/api/controllers/v1/job.py
  3. +34
    -0
      qinling/db/sqlalchemy/migration/alembic_migrations/versions/007_function_id_for_job.py
  4. +2
    -1
      qinling/db/sqlalchemy/models.py
  5. +10
    -2
      qinling/services/periodics.py
  6. +19
    -10
      qinling/tests/unit/api/controllers/v1/test_job.py
  7. +3
    -2
      qinling/tests/unit/base.py
  8. +38
    -1
      qinling/tests/unit/services/test_periodics.py

+ 1
- 1
qinling/api/controllers/v1/function_alias.py View File

@ -152,5 +152,5 @@ class FunctionAliasesController(rest.RestController):
alias = db_api.update_function_alias(alias_name, **values)
LOG.info("Alias updated.")
LOG.info("Alias %s updated.", alias_name)
return resources.FunctionAlias.from_db_obj(alias)

+ 21
- 21
qinling/api/controllers/v1/job.py View File

@ -56,37 +56,37 @@ class JobsController(rest.RestController):
'Either function_alias or function_id must be provided.'
)
LOG.info("Creating %s, params: %s", self.type, params)
# Check the input params.
first_time, next_time, count = jobs.validate_job(params)
LOG.info("Creating %s, params: %s", self.type, params)
version = params.get('function_version', 0)
# if function_alias provided
function_alias = params.get('function_alias')
if function_alias:
alias_db = db_api.get_function_alias(function_alias)
function_id = alias_db.function_id
version = alias_db.function_version
params.update({'function_id': function_id,
'version': version})
with db_api.transaction():
if function_alias:
# Check if the alias exists.
db_api.get_function_alias(function_alias)
else:
# Check the function(version) exists.
db_api.get_function(params['function_id'])
if version > 0:
# Check if the version exists.
db_api.get_function_version(params['function_id'], version)
values = {
'name': params.get('name'),
'pattern': params.get('pattern'),
'first_execution_time': first_time,
'next_execution_time': next_time,
'count': count,
'function_id': params['function_id'],
'function_version': version,
'function_input': params.get('function_input'),
'status': status.RUNNING
}
db_job = db_api.create_job(values)
values = {
'name': params.get('name'),
'pattern': params.get('pattern'),
'first_execution_time': first_time,
'next_execution_time': next_time,
'count': count,
'function_alias': function_alias,
'function_id': params.get("function_id"),
'function_version': version,
'function_input': params.get('function_input'),
'status': status.RUNNING
}
db_job = db_api.create_job(values)
return resources.Job.from_db_obj(db_job)


+ 34
- 0
qinling/db/sqlalchemy/migration/alembic_migrations/versions/007_function_id_for_job.py View File

@ -0,0 +1,34 @@
# Copyright 2019 Catalyst Cloud Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Make function id nullable for jobs table
Revision ID: 007
Revises: 006
"""
revision = '007'
down_revision = '006'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column(
'jobs',
'function_id',
existing_type=sa.String(length=36),
nullable=True
)

+ 2
- 1
qinling/db/sqlalchemy/models.py View File

@ -73,7 +73,8 @@ class Job(model_base.QinlingSecureModelBase):
count = sa.Column(sa.Integer)
function_id = sa.Column(
sa.String(36),
sa.ForeignKey(Function.id)
sa.ForeignKey(Function.id),
nullable=True
)
function = relationship('Function', back_populates="jobs")
function_input = sa.Column(sa.String(255), nullable=True)


+ 10
- 2
qinling/services/periodics.py View File

@ -98,8 +98,16 @@ def handle_job(engine_client):
for job in jobs_db:
job_id = job.id
func_id = job.function_id
func_version = job.function_version
func_alias = job.function_alias
if func_alias:
alias = db_api.get_function_alias(func_alias, insecure=True)
func_id = alias.function_id
func_version = alias.function_version
else:
func_id = job.function_id
func_version = job.function_version
LOG.debug("Processing job: %s, function: %s(version %s)", job_id,
func_id, func_version)


+ 19
- 10
qinling/tests/unit/api/controllers/v1/test_job.py View File

@ -32,7 +32,7 @@ class TestJobController(base.APITest):
db_function = self.create_function()
self.function_id = db_function.id
def test_post(self):
def test_create_with_function(self):
body = {
'name': self.rand_name('job', prefix=self.prefix),
'first_execution_time': str(
@ -43,7 +43,7 @@ class TestJobController(base.APITest):
self.assertEqual(201, resp.status_int)
def test_post_with_version(self):
def test_create_with_version(self):
db_api.increase_function_version(self.function_id, 0)
body = {
@ -58,13 +58,10 @@ class TestJobController(base.APITest):
self.assertEqual(201, resp.status_int)
self.assertEqual(1, resp.json.get('function_version'))
def test_post_with_alias(self):
db_api.increase_function_version(self.function_id, 0,
description="version 1")
def test_create_with_alias(self):
name = self.rand_name(name="alias", prefix=self.prefix)
body = {
'function_id': self.function_id,
'function_version': 1,
'name': name
}
db_api.create_function_alias(**body)
@ -79,9 +76,21 @@ class TestJobController(base.APITest):
resp = self.app.post_json('/v1/jobs', job_body)
self.assertEqual(201, resp.status_int)
self.assertEqual(1, resp.json.get('function_version'))
self.assertEqual(name, resp.json.get('function_alias'))
self.assertIsNone(resp.json.get('function_id'))
def test_create_with_invalid_alias(self):
body = {
'function_alias': 'fake_alias',
'first_execution_time': str(
datetime.utcnow() + timedelta(hours=1)),
}
resp = self.app.post_json('/v1/jobs', body, expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_post_without_required_params(self):
def test_create_without_required_params(self):
resp = self.app.post(
'/v1/jobs',
params={},
@ -90,7 +99,7 @@ class TestJobController(base.APITest):
self.assertEqual(400, resp.status_int)
def test_post_pattern(self):
def test_create_pattern(self):
body = {
'name': self.rand_name('job', prefix=self.prefix),
'function_id': self.function_id,
@ -107,7 +116,7 @@ class TestJobController(base.APITest):
res["next_execution_time"]
)
def test_post_both_pattern_and_first_execution_time(self):
def test_create_both_pattern_and_first_execution_time(self):
body = {
'name': self.rand_name('job', prefix=self.prefix),
'function_id': self.function_id,


+ 3
- 2
qinling/tests/unit/base.py View File

@ -203,12 +203,13 @@ class DbTestCase(BaseTest):
return function
def create_job(self, function_id=None, **kwargs):
if not function_id:
def create_job(self, function_id=None, function_alias=None, **kwargs):
if not function_id and not function_alias:
function_id = self.create_function().id
job_params = {
'name': self.rand_name('job', prefix=self.prefix),
'function_alias': function_alias,
'function_id': function_id,
# 'auth_enable' is disabled by default
'project_id': DEFAULT_PROJECT_ID,


+ 38
- 1
qinling/tests/unit/services/test_periodics.py View File

@ -117,7 +117,7 @@ class TestPeriodics(base.DbTestCase):
now = datetime.utcnow()
db_job = self.create_job(
function_id,
function_id=function_id,
status=status.RUNNING,
next_execution_time=now,
count=2
@ -197,3 +197,40 @@ class TestPeriodics(base.DbTestCase):
db_execs = db_api.get_executions(function_id=function_id,
function_version=1)
self.assertEqual(2, len(db_execs))
@mock.patch('qinling.utils.jobs.get_next_execution_time')
def test_job_handler_with_alias(self, mock_next_time):
e_client = mock.Mock()
now = datetime.utcnow()
# It doesn't matter what's the returned value, but need to be in
# datetime type.
mock_next_time.return_value = now + timedelta(seconds=1)
# Create a alias for a function.
alias_name = self.rand_name(name="alias", prefix=self.prefix)
db_func = self.create_function()
function_id = db_func.id
db_api.create_function_alias(name=alias_name, function_id=function_id)
self.create_job(
function_alias=alias_name,
status=status.RUNNING,
next_execution_time=now,
)
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
# Create function version 1 and update the alias.
db_api.increase_function_version(function_id, 0)
db_api.update_function_alias(alias_name, function_version=1)
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
db_func = db_api.get_function(function_id)
self.assertEqual(1, db_func.count)
db_version = db_api.get_function_version(function_id, 1)
self.assertEqual(1, db_version.count)
db_execs = db_api.get_executions(function_id=function_id)
self.assertEqual(2, len(db_execs))

Loading…
Cancel
Save