diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index bce95f98..328d3ef2 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -26,6 +26,7 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from qinling.api import access_control as acl +from qinling.api.controllers.v1 import function_version from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import types from qinling import context @@ -66,6 +67,7 @@ class FunctionWorkerController(rest.RestController): class FunctionsController(rest.RestController): workers = FunctionWorkerController() + versions = function_version.FunctionVersionsController() _custom_actions = { 'scale_up': ['POST'], diff --git a/qinling/api/controllers/v1/function_version.py b/qinling/api/controllers/v1/function_version.py new file mode 100644 index 00000000..cd75be88 --- /dev/null +++ b/qinling/api/controllers/v1/function_version.py @@ -0,0 +1,123 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from pecan import rest +import tenacity +import wsmeext.pecan as wsme_pecan + +from qinling.api import access_control as acl +from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling import context +from qinling.db import api as db_api +from qinling import exceptions as exc +from qinling.storage import base as storage_base +from qinling.utils import constants +from qinling.utils import etcd_util +from qinling.utils import rest_utils + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class FunctionVersionsController(rest.RestController): + def __init__(self, *args, **kwargs): + self.type = 'function_version' + self.storage_provider = storage_base.load_storage_provider(CONF) + + super(FunctionVersionsController, self).__init__(*args, **kwargs) + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(30), + retry=(tenacity.retry_if_result(lambda result: result is False)) + ) + def _create_function_version(self, project_id, function_id, **kwargs): + with etcd_util.get_function_version_lock(function_id) as lock: + if not lock.is_acquired(): + return False + + with db_api.transaction(): + # Get latest function package md5 and version number + func_db = db_api.get_function(function_id) + if func_db.code['source'] != constants.PACKAGE_FUNCTION: + raise exc.NotAllowedException( + "Function versioning only allowed for %s type " + "function." % + constants.PACKAGE_FUNCTION + ) + + l_md5 = func_db.code['md5sum'] + l_version = func_db.latest_version + + if len(func_db.versions) >= constants.MAX_VERSION_NUMBER: + raise exc.NotAllowedException( + 'Can not exceed maximum number(%s) of versions' % + constants.MAX_VERSION_NUMBER + ) + + # Check if the latest package changed since last version + changed = self.storage_provider.changed_since(project_id, + function_id, + l_md5, + l_version) + if not changed: + raise exc.NotAllowedException( + 'Function package not changed since the latest ' + 'version %s.' % l_version + ) + + LOG.info("Creating %s, function_id: %s, old_version: %d", + self.type, function_id, l_version) + + # Create new version and copy package. + self.storage_provider.copy(project_id, function_id, l_md5, + l_version) + version = db_api.increase_function_version(function_id, + l_version, + **kwargs) + func_db.latest_version = l_version + 1 + + LOG.info("New version %d for function %s created.", l_version + 1, + function_id) + return version + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.FunctionVersion, + types.uuid, + body=resources.FunctionVersion, + status_code=201 + ) + def post(self, function_id, body): + """Create a new version for the function. + + The supported boy params: + - description: Optional. The description of the new version. + """ + ctx = context.get_ctx() + acl.enforce('function_version:create', ctx) + + params = body.to_dict() + values = { + 'description': params.get('description'), + } + + # Try to create a new function version within lock and db transaction + version = self._create_function_version(ctx.project_id, function_id, + **values) + + return resources.FunctionVersion.from_dict(version.to_dict()) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 3cd87712..decea180 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -412,3 +412,21 @@ class Webhooks(ResourceList): self._type = 'webhooks' super(Webhooks, self).__init__(**kwargs) + + +class FunctionVersion(Resource): + id = types.uuid + description = wtypes.text + function_version = wsme.wsattr(int, readonly=True) + project_id = wsme.wsattr(wtypes.text, readonly=True) + created_at = wsme.wsattr(wtypes.text, readonly=True) + updated_at = wsme.wsattr(wtypes.text, readonly=True) + + +class FunctionVersions(ResourceList): + function_versions = [FunctionVersion] + + def __init__(self, **kwargs): + self._type = 'function_versions' + + super(FunctionVersions, self).__init__(**kwargs) diff --git a/qinling/db/api.py b/qinling/db/api.py index 2444f04d..dc376b7e 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -201,3 +201,8 @@ def update_webhook(id, values): def delete_webhooks(**kwargs): return IMPL.delete_webhooks(**kwargs) + + +def increase_function_version(function_id, old_version, **kwargs): + """This function is meant to be invoked within locking section.""" + return IMPL.increase_function_version(function_id, old_version, **kwargs) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 5b18aef6..e0e23369 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -491,3 +491,26 @@ def update_webhook(id, values, session=None): @db_base.session_aware() def delete_webhooks(session=None, insecure=None, **kwargs): return _delete_all(models.Webhook, insecure=insecure, **kwargs) + + +@db_base.session_aware() +def increase_function_version(function_id, old_version, session=None, + **kwargs): + """This function is supposed to be invoked within locking section.""" + version = models.FunctionVersion() + kwargs.update( + { + "function_id": function_id, + "version_number": old_version + 1 + } + ) + version.update(kwargs.copy()) + + try: + version.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for function_versions: %s" % e.columns + ) + + return version diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py index a656a302..13f6d982 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py @@ -47,6 +47,11 @@ def upgrade(): ) ) + op.add_column( + 'functions', + sa.Column('latest_version', sa.Integer, nullable=False), + ) + op.add_column( 'executions', sa.Column('function_version', sa.Integer, nullable=False), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index bd99d892..cf1944f8 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -48,6 +48,7 @@ class Function(model_base.QinlingSecureModelBase): entry = sa.Column(sa.String(80), nullable=True) count = sa.Column(sa.Integer, default=0) trust_id = sa.Column(sa.String(80)) + latest_version = sa.Column(sa.Integer, default=0) class Execution(model_base.QinlingSecureModelBase): @@ -113,7 +114,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase): function_id = sa.Column( sa.String(36), - sa.ForeignKey(Function.id) + sa.ForeignKey(Function.id, ondelete='CASCADE') ) description = sa.Column(sa.String(255), nullable=True) version_number = sa.Column(sa.Integer, default=0) @@ -131,3 +132,10 @@ Function.jobs = relationship( ) ) Function.webhook = relationship("Webhook", uselist=False, backref="function") +Function.versions = relationship( + "FunctionVersion", + order_by="FunctionVersion.version_number", + uselist=True, + lazy='select', + cascade="all, delete-orphan" +) diff --git a/qinling/storage/base.py b/qinling/storage/base.py index dbeb3c58..ad345abc 100644 --- a/qinling/storage/base.py +++ b/qinling/storage/base.py @@ -54,6 +54,33 @@ class PackageStorage(object): def delete(self, project_id, function, md5sum): raise NotImplementedError + @abc.abstractmethod + def changed_since(self, project_id, function, l_md5, version): + """Check if the function package has changed. + + Check if the function package has changed between lastest and the + specified version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param version: The version number compared with. + :return: True if changed otherwise False. + """ + raise NotImplementedError + + @abc.abstractmethod + def copy(self, project_id, function, l_md5, old_version): + """Copy function package for a new version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param old_version: The version number that should copy from. + :return: None + """ + raise NotImplementedError + def load_storage_provider(conf): global STORAGE_PROVIDER diff --git a/qinling/storage/file_system.py b/qinling/storage/file_system.py index 0e19b523..dc47ffbc 100644 --- a/qinling/storage/file_system.py +++ b/qinling/storage/file_system.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import shutil import zipfile from oslo_log import log as logging @@ -26,6 +27,8 @@ LOG = logging.getLogger(__name__) PACKAGE_NAME_TEMPLATE = "%s_%s.zip" # Package path name including project ID PACKAGE_PATH_TEMPLATE = "%s/%s_%s.zip" +# Package path name including version +PACKAGE_VERSION_TEMPLATE = "%s_%s_%s.zip" class FileSystemStorage(base.PackageStorage): @@ -113,3 +116,53 @@ class FileSystemStorage(base.PackageStorage): if os.path.exists(func_zip): os.remove(func_zip) + + def changed_since(self, project_id, function, l_md5, version): + """Check if the function package has changed. + + Check if the function package has changed between lastest and the + specified version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param version: The version number compared with. + :return: True if changed otherwise False. + """ + # If it's the first version creation, don't check. + if version == 0: + return True + + version_path = os.path.join( + self.base_path, project_id, + PACKAGE_VERSION_TEMPLATE % (function, version, l_md5) + ) + if os.path.exists(version_path): + return False + + return True + + def copy(self, project_id, function, l_md5, old_version): + """Copy function package for a new version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param old_version: The version number that should copy from. + :return: None + """ + src_package = os.path.join(self.base_path, + project_id, + PACKAGE_NAME_TEMPLATE % (function, l_md5) + ) + dest_package = os.path.join(self.base_path, + project_id, + PACKAGE_VERSION_TEMPLATE % + (function, old_version + 1, l_md5)) + + try: + shutil.copyfile(src_package, dest_package) + except Exception: + msg = "Failed to create new function version." + LOG.exception(msg) + raise exc.StorageProviderException(msg) diff --git a/qinling/tests/unit/api/controllers/v1/test_function_version.py b/qinling/tests/unit/api/controllers/v1/test_function_version.py new file mode 100644 index 00000000..857811df --- /dev/null +++ b/qinling/tests/unit/api/controllers/v1/test_function_version.py @@ -0,0 +1,93 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from qinling import context +from qinling.db import api as db_api +from qinling.tests.unit.api import base +from qinling.tests.unit import base as unit_base + +TESTCASE_NAME = 'TestFunctionVersionController' + + +class TestFunctionVersionController(base.APITest): + def setUp(self): + super(TestFunctionVersionController, self).setUp() + + db_func = self.create_function(prefix=TESTCASE_NAME) + self.func_id = db_func.id + + @mock.patch('qinling.storage.file_system.FileSystemStorage.copy') + @mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since') + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post(self, mock_etcd_lock, mock_changed, mock_copy): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + mock_changed.return_value = True + + # Getting function and versions needs to happen in a db transaction + with db_api.transaction(): + func_db = db_api.get_function(self.func_id) + self.assertEqual(0, len(func_db.versions)) + + body = {'description': 'new version'} + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + body) + + self.assertEqual(201, resp.status_int) + self._assertDictContainsSubset(resp.json, body) + + mock_changed.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID, + self.func_id, "fake_md5", 0) + mock_copy.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID, + self.func_id, "fake_md5", 0) + + # We need to set context as it was removed after the API call + context.set_ctx(self.ctx) + + with db_api.transaction(): + func_db = db_api.get_function(self.func_id) + self.assertEqual(1, len(func_db.versions)) + + @mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since') + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post_not_change(self, mock_etcd_lock, mock_changed): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + mock_changed.return_value = False + + body = {'description': 'new version'} + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + body, + expect_errors=True) + + self.assertEqual(403, resp.status_int) + + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post_max_versions(self, mock_etcd_lock): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + + for i in range(10): + self.create_function_version(i, function_id=self.func_id) + + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + {}, + expect_errors=True) + + self.assertEqual(403, resp.status_int) diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index 1f3a91cd..5f8a82cb 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -238,3 +238,12 @@ class DbTestCase(BaseTest): execution = db_api.create_execution(execution_params) return execution + + def create_function_version(self, old_version, function_id=None, + prefix=None, **kwargs): + if not function_id: + function_id = self.create_function(prefix=prefix).id + + db_api.increase_function_version(function_id, old_version, **kwargs) + db_api.update_function(function_id, + {"latest_version": old_version + 1}) diff --git a/qinling/tests/unit/storage/test_file_system.py b/qinling/tests/unit/storage/test_file_system.py index c89732c9..97cd71c2 100644 --- a/qinling/tests/unit/storage/test_file_system.py +++ b/qinling/tests/unit/storage/test_file_system.py @@ -181,3 +181,48 @@ class TestFileSystemStorage(base.BaseTest): ) exists_mock.assert_called_once_with(package_path) remove_mock.assert_not_called() + + def test_changed_since_first_version(self): + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 0) + + self.assertTrue(ret) + + @mock.patch('os.path.exists') + def test_changed_since_exists(self, mock_exists): + mock_exists.return_value = True + + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 1) + + self.assertFalse(ret) + + expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_exists.assert_called_once_with(expect_path) + + @mock.patch('os.path.exists') + def test_changed_since_not_exists(self, mock_exists): + mock_exists.return_value = False + + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 1) + + self.assertTrue(ret) + + expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_exists.assert_called_once_with(expect_path) + + @mock.patch("shutil.copyfile") + def test_copy(self, mock_copy): + self.storage.copy(self.project_id, "fake_function", "fake_md5", 0) + + expect_src = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_fake_md5.zip") + expect_dest = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_copy.assert_called_once_with(expect_src, expect_dest) diff --git a/qinling/utils/constants.py b/qinling/utils/constants.py index cdbacdac..e5655bfb 100644 --- a/qinling/utils/constants.py +++ b/qinling/utils/constants.py @@ -24,3 +24,5 @@ SWIFT_FUNCTION = 'swift' IMAGE_FUNCTION = 'image' MAX_PACKAGE_SIZE = 51 * 1024 * 1024 + +MAX_VERSION_NUMBER = 10 diff --git a/qinling/utils/etcd_util.py b/qinling/utils/etcd_util.py index 753ef46e..bfa4aa4d 100644 --- a/qinling/utils/etcd_util.py +++ b/qinling/utils/etcd_util.py @@ -34,6 +34,12 @@ def get_worker_lock(): return client.lock(id='function_worker') +def get_function_version_lock(function_id): + client = get_client() + lock_id = "function_version_%s" % function_id + return client.lock(id=lock_id) + + def create_worker(function_id, worker): """Create the worker info in etcd.