diff --git a/qinling/api/controllers/v1/function_alias.py b/qinling/api/controllers/v1/function_alias.py index 0627f5d5..d0915110 100644 --- a/qinling/api/controllers/v1/function_alias.py +++ b/qinling/api/controllers/v1/function_alias.py @@ -152,5 +152,5 @@ class FunctionAliasesController(rest.RestController): alias = db_api.update_function_alias(alias_name, **values) - LOG.info("Alias updated.") + LOG.info("Alias %s updated.", alias_name) return resources.FunctionAlias.from_db_obj(alias) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index d1613416..09acae7f 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -56,37 +56,37 @@ class JobsController(rest.RestController): 'Either function_alias or function_id must be provided.' ) - # Check the input params. - first_time, next_time, count = jobs.validate_job(params) LOG.info("Creating %s, params: %s", self.type, params) - version = params.get('function_version', 0) - # if function_alias provided - function_alias = params.get('function_alias') - if function_alias: - alias_db = db_api.get_function_alias(function_alias) - function_id = alias_db.function_id - version = alias_db.function_version - params.update({'function_id': function_id, - 'version': version}) + # Check the input params. + first_time, next_time, count = jobs.validate_job(params) - with db_api.transaction(): + version = params.get('function_version', 0) + function_alias = params.get('function_alias') + + if function_alias: + # Check if the alias exists. + db_api.get_function_alias(function_alias) + else: + # Check the function(version) exists. db_api.get_function(params['function_id']) if version > 0: + # Check if the version exists. db_api.get_function_version(params['function_id'], version) - values = { - 'name': params.get('name'), - 'pattern': params.get('pattern'), - 'first_execution_time': first_time, - 'next_execution_time': next_time, - 'count': count, - 'function_id': params['function_id'], - 'function_version': version, - 'function_input': params.get('function_input'), - 'status': status.RUNNING - } - db_job = db_api.create_job(values) + values = { + 'name': params.get('name'), + 'pattern': params.get('pattern'), + 'first_execution_time': first_time, + 'next_execution_time': next_time, + 'count': count, + 'function_alias': function_alias, + 'function_id': params.get("function_id"), + 'function_version': version, + 'function_input': params.get('function_input'), + 'status': status.RUNNING + } + db_job = db_api.create_job(values) return resources.Job.from_db_obj(db_job) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/007_function_id_for_job.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/007_function_id_for_job.py new file mode 100644 index 00000000..ec3e2650 --- /dev/null +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/007_function_id_for_job.py @@ -0,0 +1,34 @@ +# Copyright 2019 Catalyst Cloud Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Make function id nullable for jobs table + +Revision ID: 007 +Revises: 006 +""" + +revision = '007' +down_revision = '006' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.alter_column( + 'jobs', + 'function_id', + existing_type=sa.String(length=36), + nullable=True + ) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 73a3ee92..4af7bc3d 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -73,7 +73,8 @@ class Job(model_base.QinlingSecureModelBase): count = sa.Column(sa.Integer) function_id = sa.Column( sa.String(36), - sa.ForeignKey(Function.id) + sa.ForeignKey(Function.id), + nullable=True ) function = relationship('Function', back_populates="jobs") function_input = sa.Column(sa.String(255), nullable=True) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index a605ad5e..c3988037 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -98,8 +98,16 @@ def handle_job(engine_client): for job in jobs_db: job_id = job.id - func_id = job.function_id - func_version = job.function_version + func_alias = job.function_alias + + if func_alias: + alias = db_api.get_function_alias(func_alias, insecure=True) + func_id = alias.function_id + func_version = alias.function_version + else: + func_id = job.function_id + func_version = job.function_version + LOG.debug("Processing job: %s, function: %s(version %s)", job_id, func_id, func_version) diff --git a/qinling/tests/unit/api/controllers/v1/test_job.py b/qinling/tests/unit/api/controllers/v1/test_job.py index c2ea6a5a..e3728a1d 100644 --- a/qinling/tests/unit/api/controllers/v1/test_job.py +++ b/qinling/tests/unit/api/controllers/v1/test_job.py @@ -32,7 +32,7 @@ class TestJobController(base.APITest): db_function = self.create_function() self.function_id = db_function.id - def test_post(self): + def test_create_with_function(self): body = { 'name': self.rand_name('job', prefix=self.prefix), 'first_execution_time': str( @@ -43,7 +43,7 @@ class TestJobController(base.APITest): self.assertEqual(201, resp.status_int) - def test_post_with_version(self): + def test_create_with_version(self): db_api.increase_function_version(self.function_id, 0) body = { @@ -58,13 +58,10 @@ class TestJobController(base.APITest): self.assertEqual(201, resp.status_int) self.assertEqual(1, resp.json.get('function_version')) - def test_post_with_alias(self): - db_api.increase_function_version(self.function_id, 0, - description="version 1") + def test_create_with_alias(self): name = self.rand_name(name="alias", prefix=self.prefix) body = { 'function_id': self.function_id, - 'function_version': 1, 'name': name } db_api.create_function_alias(**body) @@ -79,9 +76,21 @@ class TestJobController(base.APITest): resp = self.app.post_json('/v1/jobs', job_body) self.assertEqual(201, resp.status_int) - self.assertEqual(1, resp.json.get('function_version')) + self.assertEqual(name, resp.json.get('function_alias')) + self.assertIsNone(resp.json.get('function_id')) - def test_post_without_required_params(self): + def test_create_with_invalid_alias(self): + body = { + 'function_alias': 'fake_alias', + 'first_execution_time': str( + datetime.utcnow() + timedelta(hours=1)), + } + + resp = self.app.post_json('/v1/jobs', body, expect_errors=True) + + self.assertEqual(404, resp.status_int) + + def test_create_without_required_params(self): resp = self.app.post( '/v1/jobs', params={}, @@ -90,7 +99,7 @@ class TestJobController(base.APITest): self.assertEqual(400, resp.status_int) - def test_post_pattern(self): + def test_create_pattern(self): body = { 'name': self.rand_name('job', prefix=self.prefix), 'function_id': self.function_id, @@ -107,7 +116,7 @@ class TestJobController(base.APITest): res["next_execution_time"] ) - def test_post_both_pattern_and_first_execution_time(self): + def test_create_both_pattern_and_first_execution_time(self): body = { 'name': self.rand_name('job', prefix=self.prefix), 'function_id': self.function_id, diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index 014966b0..3f31d23a 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -203,12 +203,13 @@ class DbTestCase(BaseTest): return function - def create_job(self, function_id=None, **kwargs): - if not function_id: + def create_job(self, function_id=None, function_alias=None, **kwargs): + if not function_id and not function_alias: function_id = self.create_function().id job_params = { 'name': self.rand_name('job', prefix=self.prefix), + 'function_alias': function_alias, 'function_id': function_id, # 'auth_enable' is disabled by default 'project_id': DEFAULT_PROJECT_ID, diff --git a/qinling/tests/unit/services/test_periodics.py b/qinling/tests/unit/services/test_periodics.py index c2a54ebe..ae16204f 100644 --- a/qinling/tests/unit/services/test_periodics.py +++ b/qinling/tests/unit/services/test_periodics.py @@ -117,7 +117,7 @@ class TestPeriodics(base.DbTestCase): now = datetime.utcnow() db_job = self.create_job( - function_id, + function_id=function_id, status=status.RUNNING, next_execution_time=now, count=2 @@ -197,3 +197,40 @@ class TestPeriodics(base.DbTestCase): db_execs = db_api.get_executions(function_id=function_id, function_version=1) self.assertEqual(2, len(db_execs)) + + @mock.patch('qinling.utils.jobs.get_next_execution_time') + def test_job_handler_with_alias(self, mock_next_time): + e_client = mock.Mock() + now = datetime.utcnow() + # It doesn't matter what's the returned value, but need to be in + # datetime type. + mock_next_time.return_value = now + timedelta(seconds=1) + + # Create a alias for a function. + alias_name = self.rand_name(name="alias", prefix=self.prefix) + db_func = self.create_function() + function_id = db_func.id + db_api.create_function_alias(name=alias_name, function_id=function_id) + + self.create_job( + function_alias=alias_name, + status=status.RUNNING, + next_execution_time=now, + ) + + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + # Create function version 1 and update the alias. + db_api.increase_function_version(function_id, 0) + db_api.update_function_alias(alias_name, function_version=1) + + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + db_func = db_api.get_function(function_id) + self.assertEqual(1, db_func.count) + db_version = db_api.get_function_version(function_id, 1) + self.assertEqual(1, db_version.count) + db_execs = db_api.get_executions(function_id=function_id) + self.assertEqual(2, len(db_execs))