From 12d2ee6f26996b15162153de891b887537b81969 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Thu, 12 Apr 2018 16:18:18 +1200 Subject: [PATCH] Function versioning API: create This patch supports to create function version. - Versioning feature is only allowed for package type function for now - New version creation code should be wrapped in lock to avoid any race condition - New version number is calculated by Qinling and no need to be provided by end users - Version creation is not allowed if no change happened to function package. - Only function owner can create version. - Maximum version number is 10 by default, we will make it configurable as needed in future. - All function versions will be deleted when function itself is deleted This patch only focuses on version creation, the other version operations and related function operations will be handled in the following patches. Story: #2001829 Task: #14305 Change-Id: I62bb344da237766fc11cce2ffda65945313136b1 --- qinling/api/controllers/v1/function.py | 2 + .../api/controllers/v1/function_version.py | 123 ++++++++++++++++++ qinling/api/controllers/v1/resources.py | 18 +++ qinling/db/api.py | 5 + qinling/db/sqlalchemy/api.py | 23 ++++ .../002_add_function_version_support.py | 5 + qinling/db/sqlalchemy/models.py | 10 +- qinling/storage/base.py | 27 ++++ qinling/storage/file_system.py | 53 ++++++++ .../controllers/v1/test_function_version.py | 93 +++++++++++++ qinling/tests/unit/base.py | 9 ++ .../tests/unit/storage/test_file_system.py | 45 +++++++ qinling/utils/constants.py | 2 + qinling/utils/etcd_util.py | 6 + 14 files changed, 420 insertions(+), 1 deletion(-) create mode 100644 qinling/api/controllers/v1/function_version.py create mode 100644 qinling/tests/unit/api/controllers/v1/test_function_version.py diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 2e5cf975..1e3190c8 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -26,6 +26,7 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from qinling.api import access_control as acl +from qinling.api.controllers.v1 import function_version from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import types from qinling import context @@ -66,6 +67,7 @@ class FunctionWorkerController(rest.RestController): class FunctionsController(rest.RestController): workers = FunctionWorkerController() + versions = function_version.FunctionVersionsController() _custom_actions = { 'scale_up': ['POST'], diff --git a/qinling/api/controllers/v1/function_version.py b/qinling/api/controllers/v1/function_version.py new file mode 100644 index 00000000..cd75be88 --- /dev/null +++ b/qinling/api/controllers/v1/function_version.py @@ -0,0 +1,123 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from pecan import rest +import tenacity +import wsmeext.pecan as wsme_pecan + +from qinling.api import access_control as acl +from qinling.api.controllers.v1 import resources +from qinling.api.controllers.v1 import types +from qinling import context +from qinling.db import api as db_api +from qinling import exceptions as exc +from qinling.storage import base as storage_base +from qinling.utils import constants +from qinling.utils import etcd_util +from qinling.utils import rest_utils + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class FunctionVersionsController(rest.RestController): + def __init__(self, *args, **kwargs): + self.type = 'function_version' + self.storage_provider = storage_base.load_storage_provider(CONF) + + super(FunctionVersionsController, self).__init__(*args, **kwargs) + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(30), + retry=(tenacity.retry_if_result(lambda result: result is False)) + ) + def _create_function_version(self, project_id, function_id, **kwargs): + with etcd_util.get_function_version_lock(function_id) as lock: + if not lock.is_acquired(): + return False + + with db_api.transaction(): + # Get latest function package md5 and version number + func_db = db_api.get_function(function_id) + if func_db.code['source'] != constants.PACKAGE_FUNCTION: + raise exc.NotAllowedException( + "Function versioning only allowed for %s type " + "function." % + constants.PACKAGE_FUNCTION + ) + + l_md5 = func_db.code['md5sum'] + l_version = func_db.latest_version + + if len(func_db.versions) >= constants.MAX_VERSION_NUMBER: + raise exc.NotAllowedException( + 'Can not exceed maximum number(%s) of versions' % + constants.MAX_VERSION_NUMBER + ) + + # Check if the latest package changed since last version + changed = self.storage_provider.changed_since(project_id, + function_id, + l_md5, + l_version) + if not changed: + raise exc.NotAllowedException( + 'Function package not changed since the latest ' + 'version %s.' % l_version + ) + + LOG.info("Creating %s, function_id: %s, old_version: %d", + self.type, function_id, l_version) + + # Create new version and copy package. + self.storage_provider.copy(project_id, function_id, l_md5, + l_version) + version = db_api.increase_function_version(function_id, + l_version, + **kwargs) + func_db.latest_version = l_version + 1 + + LOG.info("New version %d for function %s created.", l_version + 1, + function_id) + return version + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.FunctionVersion, + types.uuid, + body=resources.FunctionVersion, + status_code=201 + ) + def post(self, function_id, body): + """Create a new version for the function. + + The supported boy params: + - description: Optional. The description of the new version. + """ + ctx = context.get_ctx() + acl.enforce('function_version:create', ctx) + + params = body.to_dict() + values = { + 'description': params.get('description'), + } + + # Try to create a new function version within lock and db transaction + version = self._create_function_version(ctx.project_id, function_id, + **values) + + return resources.FunctionVersion.from_dict(version.to_dict()) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 3cd87712..decea180 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -412,3 +412,21 @@ class Webhooks(ResourceList): self._type = 'webhooks' super(Webhooks, self).__init__(**kwargs) + + +class FunctionVersion(Resource): + id = types.uuid + description = wtypes.text + function_version = wsme.wsattr(int, readonly=True) + project_id = wsme.wsattr(wtypes.text, readonly=True) + created_at = wsme.wsattr(wtypes.text, readonly=True) + updated_at = wsme.wsattr(wtypes.text, readonly=True) + + +class FunctionVersions(ResourceList): + function_versions = [FunctionVersion] + + def __init__(self, **kwargs): + self._type = 'function_versions' + + super(FunctionVersions, self).__init__(**kwargs) diff --git a/qinling/db/api.py b/qinling/db/api.py index 2444f04d..dc376b7e 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -201,3 +201,8 @@ def update_webhook(id, values): def delete_webhooks(**kwargs): return IMPL.delete_webhooks(**kwargs) + + +def increase_function_version(function_id, old_version, **kwargs): + """This function is meant to be invoked within locking section.""" + return IMPL.increase_function_version(function_id, old_version, **kwargs) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 5b18aef6..e0e23369 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -491,3 +491,26 @@ def update_webhook(id, values, session=None): @db_base.session_aware() def delete_webhooks(session=None, insecure=None, **kwargs): return _delete_all(models.Webhook, insecure=insecure, **kwargs) + + +@db_base.session_aware() +def increase_function_version(function_id, old_version, session=None, + **kwargs): + """This function is supposed to be invoked within locking section.""" + version = models.FunctionVersion() + kwargs.update( + { + "function_id": function_id, + "version_number": old_version + 1 + } + ) + version.update(kwargs.copy()) + + try: + version.save(session=session) + except oslo_db_exc.DBDuplicateEntry as e: + raise exc.DBError( + "Duplicate entry for function_versions: %s" % e.columns + ) + + return version diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py index a656a302..13f6d982 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py @@ -47,6 +47,11 @@ def upgrade(): ) ) + op.add_column( + 'functions', + sa.Column('latest_version', sa.Integer, nullable=False), + ) + op.add_column( 'executions', sa.Column('function_version', sa.Integer, nullable=False), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index bd99d892..cf1944f8 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -48,6 +48,7 @@ class Function(model_base.QinlingSecureModelBase): entry = sa.Column(sa.String(80), nullable=True) count = sa.Column(sa.Integer, default=0) trust_id = sa.Column(sa.String(80)) + latest_version = sa.Column(sa.Integer, default=0) class Execution(model_base.QinlingSecureModelBase): @@ -113,7 +114,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase): function_id = sa.Column( sa.String(36), - sa.ForeignKey(Function.id) + sa.ForeignKey(Function.id, ondelete='CASCADE') ) description = sa.Column(sa.String(255), nullable=True) version_number = sa.Column(sa.Integer, default=0) @@ -131,3 +132,10 @@ Function.jobs = relationship( ) ) Function.webhook = relationship("Webhook", uselist=False, backref="function") +Function.versions = relationship( + "FunctionVersion", + order_by="FunctionVersion.version_number", + uselist=True, + lazy='select', + cascade="all, delete-orphan" +) diff --git a/qinling/storage/base.py b/qinling/storage/base.py index dbeb3c58..ad345abc 100644 --- a/qinling/storage/base.py +++ b/qinling/storage/base.py @@ -54,6 +54,33 @@ class PackageStorage(object): def delete(self, project_id, function, md5sum): raise NotImplementedError + @abc.abstractmethod + def changed_since(self, project_id, function, l_md5, version): + """Check if the function package has changed. + + Check if the function package has changed between lastest and the + specified version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param version: The version number compared with. + :return: True if changed otherwise False. + """ + raise NotImplementedError + + @abc.abstractmethod + def copy(self, project_id, function, l_md5, old_version): + """Copy function package for a new version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param old_version: The version number that should copy from. + :return: None + """ + raise NotImplementedError + def load_storage_provider(conf): global STORAGE_PROVIDER diff --git a/qinling/storage/file_system.py b/qinling/storage/file_system.py index 0e19b523..dc47ffbc 100644 --- a/qinling/storage/file_system.py +++ b/qinling/storage/file_system.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import shutil import zipfile from oslo_log import log as logging @@ -26,6 +27,8 @@ LOG = logging.getLogger(__name__) PACKAGE_NAME_TEMPLATE = "%s_%s.zip" # Package path name including project ID PACKAGE_PATH_TEMPLATE = "%s/%s_%s.zip" +# Package path name including version +PACKAGE_VERSION_TEMPLATE = "%s_%s_%s.zip" class FileSystemStorage(base.PackageStorage): @@ -113,3 +116,53 @@ class FileSystemStorage(base.PackageStorage): if os.path.exists(func_zip): os.remove(func_zip) + + def changed_since(self, project_id, function, l_md5, version): + """Check if the function package has changed. + + Check if the function package has changed between lastest and the + specified version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param version: The version number compared with. + :return: True if changed otherwise False. + """ + # If it's the first version creation, don't check. + if version == 0: + return True + + version_path = os.path.join( + self.base_path, project_id, + PACKAGE_VERSION_TEMPLATE % (function, version, l_md5) + ) + if os.path.exists(version_path): + return False + + return True + + def copy(self, project_id, function, l_md5, old_version): + """Copy function package for a new version. + + :param project_id: Project ID. + :param function: Function ID. + :param l_md5: Latest function package md5sum. + :param old_version: The version number that should copy from. + :return: None + """ + src_package = os.path.join(self.base_path, + project_id, + PACKAGE_NAME_TEMPLATE % (function, l_md5) + ) + dest_package = os.path.join(self.base_path, + project_id, + PACKAGE_VERSION_TEMPLATE % + (function, old_version + 1, l_md5)) + + try: + shutil.copyfile(src_package, dest_package) + except Exception: + msg = "Failed to create new function version." + LOG.exception(msg) + raise exc.StorageProviderException(msg) diff --git a/qinling/tests/unit/api/controllers/v1/test_function_version.py b/qinling/tests/unit/api/controllers/v1/test_function_version.py new file mode 100644 index 00000000..857811df --- /dev/null +++ b/qinling/tests/unit/api/controllers/v1/test_function_version.py @@ -0,0 +1,93 @@ +# Copyright 2018 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from qinling import context +from qinling.db import api as db_api +from qinling.tests.unit.api import base +from qinling.tests.unit import base as unit_base + +TESTCASE_NAME = 'TestFunctionVersionController' + + +class TestFunctionVersionController(base.APITest): + def setUp(self): + super(TestFunctionVersionController, self).setUp() + + db_func = self.create_function(prefix=TESTCASE_NAME) + self.func_id = db_func.id + + @mock.patch('qinling.storage.file_system.FileSystemStorage.copy') + @mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since') + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post(self, mock_etcd_lock, mock_changed, mock_copy): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + mock_changed.return_value = True + + # Getting function and versions needs to happen in a db transaction + with db_api.transaction(): + func_db = db_api.get_function(self.func_id) + self.assertEqual(0, len(func_db.versions)) + + body = {'description': 'new version'} + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + body) + + self.assertEqual(201, resp.status_int) + self._assertDictContainsSubset(resp.json, body) + + mock_changed.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID, + self.func_id, "fake_md5", 0) + mock_copy.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID, + self.func_id, "fake_md5", 0) + + # We need to set context as it was removed after the API call + context.set_ctx(self.ctx) + + with db_api.transaction(): + func_db = db_api.get_function(self.func_id) + self.assertEqual(1, len(func_db.versions)) + + @mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since') + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post_not_change(self, mock_etcd_lock, mock_changed): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + mock_changed.return_value = False + + body = {'description': 'new version'} + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + body, + expect_errors=True) + + self.assertEqual(403, resp.status_int) + + @mock.patch('qinling.utils.etcd_util.get_function_version_lock') + def test_post_max_versions(self, mock_etcd_lock): + lock = mock.Mock() + mock_etcd_lock.return_value.__enter__.return_value = lock + lock.is_acquired.return_value = True + + for i in range(10): + self.create_function_version(i, function_id=self.func_id) + + resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id, + {}, + expect_errors=True) + + self.assertEqual(403, resp.status_int) diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index 1f3a91cd..5f8a82cb 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -238,3 +238,12 @@ class DbTestCase(BaseTest): execution = db_api.create_execution(execution_params) return execution + + def create_function_version(self, old_version, function_id=None, + prefix=None, **kwargs): + if not function_id: + function_id = self.create_function(prefix=prefix).id + + db_api.increase_function_version(function_id, old_version, **kwargs) + db_api.update_function(function_id, + {"latest_version": old_version + 1}) diff --git a/qinling/tests/unit/storage/test_file_system.py b/qinling/tests/unit/storage/test_file_system.py index c89732c9..97cd71c2 100644 --- a/qinling/tests/unit/storage/test_file_system.py +++ b/qinling/tests/unit/storage/test_file_system.py @@ -181,3 +181,48 @@ class TestFileSystemStorage(base.BaseTest): ) exists_mock.assert_called_once_with(package_path) remove_mock.assert_not_called() + + def test_changed_since_first_version(self): + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 0) + + self.assertTrue(ret) + + @mock.patch('os.path.exists') + def test_changed_since_exists(self, mock_exists): + mock_exists.return_value = True + + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 1) + + self.assertFalse(ret) + + expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_exists.assert_called_once_with(expect_path) + + @mock.patch('os.path.exists') + def test_changed_since_not_exists(self, mock_exists): + mock_exists.return_value = False + + ret = self.storage.changed_since(self.project_id, "fake_function", + "fake_md5", 1) + + self.assertTrue(ret) + + expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_exists.assert_called_once_with(expect_path) + + @mock.patch("shutil.copyfile") + def test_copy(self, mock_copy): + self.storage.copy(self.project_id, "fake_function", "fake_md5", 0) + + expect_src = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_fake_md5.zip") + expect_dest = os.path.join(FAKE_STORAGE_PATH, self.project_id, + "fake_function_1_fake_md5.zip") + + mock_copy.assert_called_once_with(expect_src, expect_dest) diff --git a/qinling/utils/constants.py b/qinling/utils/constants.py index cdbacdac..e5655bfb 100644 --- a/qinling/utils/constants.py +++ b/qinling/utils/constants.py @@ -24,3 +24,5 @@ SWIFT_FUNCTION = 'swift' IMAGE_FUNCTION = 'image' MAX_PACKAGE_SIZE = 51 * 1024 * 1024 + +MAX_VERSION_NUMBER = 10 diff --git a/qinling/utils/etcd_util.py b/qinling/utils/etcd_util.py index 753ef46e..bfa4aa4d 100644 --- a/qinling/utils/etcd_util.py +++ b/qinling/utils/etcd_util.py @@ -34,6 +34,12 @@ def get_worker_lock(): return client.lock(id='function_worker') +def get_function_version_lock(function_id): + client = get_client() + lock_id = "function_version_%s" % function_id + return client.lock(id=lock_id) + + def create_worker(function_id, worker): """Create the worker info in etcd.