Browse Source

Merge "Function versioning API: create"

changes/31/562431/2
Zuul 3 years ago
committed by Gerrit Code Review
parent
commit
1703394888
  1. 2
      qinling/api/controllers/v1/function.py
  2. 123
      qinling/api/controllers/v1/function_version.py
  3. 18
      qinling/api/controllers/v1/resources.py
  4. 5
      qinling/db/api.py
  5. 23
      qinling/db/sqlalchemy/api.py
  6. 5
      qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py
  7. 10
      qinling/db/sqlalchemy/models.py
  8. 27
      qinling/storage/base.py
  9. 53
      qinling/storage/file_system.py
  10. 93
      qinling/tests/unit/api/controllers/v1/test_function_version.py
  11. 9
      qinling/tests/unit/base.py
  12. 45
      qinling/tests/unit/storage/test_file_system.py
  13. 2
      qinling/utils/constants.py
  14. 6
      qinling/utils/etcd_util.py

2
qinling/api/controllers/v1/function.py

@ -26,6 +26,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from qinling.api import access_control as acl
from qinling.api.controllers.v1 import function_version
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling import context
@ -66,6 +67,7 @@ class FunctionWorkerController(rest.RestController):
class FunctionsController(rest.RestController):
workers = FunctionWorkerController()
versions = function_version.FunctionVersionsController()
_custom_actions = {
'scale_up': ['POST'],

123
qinling/api/controllers/v1/function_version.py

@ -0,0 +1,123 @@
# Copyright 2018 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from pecan import rest
import tenacity
import wsmeext.pecan as wsme_pecan
from qinling.api import access_control as acl
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling import context
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling.storage import base as storage_base
from qinling.utils import constants
from qinling.utils import etcd_util
from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class FunctionVersionsController(rest.RestController):
def __init__(self, *args, **kwargs):
self.type = 'function_version'
self.storage_provider = storage_base.load_storage_provider(CONF)
super(FunctionVersionsController, self).__init__(*args, **kwargs)
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30),
retry=(tenacity.retry_if_result(lambda result: result is False))
)
def _create_function_version(self, project_id, function_id, **kwargs):
with etcd_util.get_function_version_lock(function_id) as lock:
if not lock.is_acquired():
return False
with db_api.transaction():
# Get latest function package md5 and version number
func_db = db_api.get_function(function_id)
if func_db.code['source'] != constants.PACKAGE_FUNCTION:
raise exc.NotAllowedException(
"Function versioning only allowed for %s type "
"function." %
constants.PACKAGE_FUNCTION
)
l_md5 = func_db.code['md5sum']
l_version = func_db.latest_version
if len(func_db.versions) >= constants.MAX_VERSION_NUMBER:
raise exc.NotAllowedException(
'Can not exceed maximum number(%s) of versions' %
constants.MAX_VERSION_NUMBER
)
# Check if the latest package changed since last version
changed = self.storage_provider.changed_since(project_id,
function_id,
l_md5,
l_version)
if not changed:
raise exc.NotAllowedException(
'Function package not changed since the latest '
'version %s.' % l_version
)
LOG.info("Creating %s, function_id: %s, old_version: %d",
self.type, function_id, l_version)
# Create new version and copy package.
self.storage_provider.copy(project_id, function_id, l_md5,
l_version)
version = db_api.increase_function_version(function_id,
l_version,
**kwargs)
func_db.latest_version = l_version + 1
LOG.info("New version %d for function %s created.", l_version + 1,
function_id)
return version
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.FunctionVersion,
types.uuid,
body=resources.FunctionVersion,
status_code=201
)
def post(self, function_id, body):
"""Create a new version for the function.
The supported boy params:
- description: Optional. The description of the new version.
"""
ctx = context.get_ctx()
acl.enforce('function_version:create', ctx)
params = body.to_dict()
values = {
'description': params.get('description'),
}
# Try to create a new function version within lock and db transaction
version = self._create_function_version(ctx.project_id, function_id,
**values)
return resources.FunctionVersion.from_dict(version.to_dict())

18
qinling/api/controllers/v1/resources.py

@ -412,3 +412,21 @@ class Webhooks(ResourceList):
self._type = 'webhooks'
super(Webhooks, self).__init__(**kwargs)
class FunctionVersion(Resource):
id = types.uuid
description = wtypes.text
function_version = wsme.wsattr(int, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
class FunctionVersions(ResourceList):
function_versions = [FunctionVersion]
def __init__(self, **kwargs):
self._type = 'function_versions'
super(FunctionVersions, self).__init__(**kwargs)

5
qinling/db/api.py

@ -201,3 +201,8 @@ def update_webhook(id, values):
def delete_webhooks(**kwargs):
return IMPL.delete_webhooks(**kwargs)
def increase_function_version(function_id, old_version, **kwargs):
"""This function is meant to be invoked within locking section."""
return IMPL.increase_function_version(function_id, old_version, **kwargs)

23
qinling/db/sqlalchemy/api.py

@ -491,3 +491,26 @@ def update_webhook(id, values, session=None):
@db_base.session_aware()
def delete_webhooks(session=None, insecure=None, **kwargs):
return _delete_all(models.Webhook, insecure=insecure, **kwargs)
@db_base.session_aware()
def increase_function_version(function_id, old_version, session=None,
**kwargs):
"""This function is supposed to be invoked within locking section."""
version = models.FunctionVersion()
kwargs.update(
{
"function_id": function_id,
"version_number": old_version + 1
}
)
version.update(kwargs.copy())
try:
version.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for function_versions: %s" % e.columns
)
return version

5
qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py

@ -47,6 +47,11 @@ def upgrade():
)
)
op.add_column(
'functions',
sa.Column('latest_version', sa.Integer, nullable=False),
)
op.add_column(
'executions',
sa.Column('function_version', sa.Integer, nullable=False),

10
qinling/db/sqlalchemy/models.py

@ -48,6 +48,7 @@ class Function(model_base.QinlingSecureModelBase):
entry = sa.Column(sa.String(80), nullable=True)
count = sa.Column(sa.Integer, default=0)
trust_id = sa.Column(sa.String(80))
latest_version = sa.Column(sa.Integer, default=0)
class Execution(model_base.QinlingSecureModelBase):
@ -113,7 +114,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase):
function_id = sa.Column(
sa.String(36),
sa.ForeignKey(Function.id)
sa.ForeignKey(Function.id, ondelete='CASCADE')
)
description = sa.Column(sa.String(255), nullable=True)
version_number = sa.Column(sa.Integer, default=0)
@ -131,3 +132,10 @@ Function.jobs = relationship(
)
)
Function.webhook = relationship("Webhook", uselist=False, backref="function")
Function.versions = relationship(
"FunctionVersion",
order_by="FunctionVersion.version_number",
uselist=True,
lazy='select',
cascade="all, delete-orphan"
)

27
qinling/storage/base.py

@ -54,6 +54,33 @@ class PackageStorage(object):
def delete(self, project_id, function, md5sum):
raise NotImplementedError
@abc.abstractmethod
def changed_since(self, project_id, function, l_md5, version):
"""Check if the function package has changed.
Check if the function package has changed between lastest and the
specified version.
:param project_id: Project ID.
:param function: Function ID.
:param l_md5: Latest function package md5sum.
:param version: The version number compared with.
:return: True if changed otherwise False.
"""
raise NotImplementedError
@abc.abstractmethod
def copy(self, project_id, function, l_md5, old_version):
"""Copy function package for a new version.
:param project_id: Project ID.
:param function: Function ID.
:param l_md5: Latest function package md5sum.
:param old_version: The version number that should copy from.
:return: None
"""
raise NotImplementedError
def load_storage_provider(conf):
global STORAGE_PROVIDER

53
qinling/storage/file_system.py

@ -13,6 +13,7 @@
# limitations under the License.
import os
import shutil
import zipfile
from oslo_log import log as logging
@ -26,6 +27,8 @@ LOG = logging.getLogger(__name__)
PACKAGE_NAME_TEMPLATE = "%s_%s.zip"
# Package path name including project ID
PACKAGE_PATH_TEMPLATE = "%s/%s_%s.zip"
# Package path name including version
PACKAGE_VERSION_TEMPLATE = "%s_%s_%s.zip"
class FileSystemStorage(base.PackageStorage):
@ -113,3 +116,53 @@ class FileSystemStorage(base.PackageStorage):
if os.path.exists(func_zip):
os.remove(func_zip)
def changed_since(self, project_id, function, l_md5, version):
"""Check if the function package has changed.
Check if the function package has changed between lastest and the
specified version.
:param project_id: Project ID.
:param function: Function ID.
:param l_md5: Latest function package md5sum.
:param version: The version number compared with.
:return: True if changed otherwise False.
"""
# If it's the first version creation, don't check.
if version == 0:
return True
version_path = os.path.join(
self.base_path, project_id,
PACKAGE_VERSION_TEMPLATE % (function, version, l_md5)
)
if os.path.exists(version_path):
return False
return True
def copy(self, project_id, function, l_md5, old_version):
"""Copy function package for a new version.
:param project_id: Project ID.
:param function: Function ID.
:param l_md5: Latest function package md5sum.
:param old_version: The version number that should copy from.
:return: None
"""
src_package = os.path.join(self.base_path,
project_id,
PACKAGE_NAME_TEMPLATE % (function, l_md5)
)
dest_package = os.path.join(self.base_path,
project_id,
PACKAGE_VERSION_TEMPLATE %
(function, old_version + 1, l_md5))
try:
shutil.copyfile(src_package, dest_package)
except Exception:
msg = "Failed to create new function version."
LOG.exception(msg)
raise exc.StorageProviderException(msg)

93
qinling/tests/unit/api/controllers/v1/test_function_version.py

@ -0,0 +1,93 @@
# Copyright 2018 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from qinling import context
from qinling.db import api as db_api
from qinling.tests.unit.api import base
from qinling.tests.unit import base as unit_base
TESTCASE_NAME = 'TestFunctionVersionController'
class TestFunctionVersionController(base.APITest):
def setUp(self):
super(TestFunctionVersionController, self).setUp()
db_func = self.create_function(prefix=TESTCASE_NAME)
self.func_id = db_func.id
@mock.patch('qinling.storage.file_system.FileSystemStorage.copy')
@mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since')
@mock.patch('qinling.utils.etcd_util.get_function_version_lock')
def test_post(self, mock_etcd_lock, mock_changed, mock_copy):
lock = mock.Mock()
mock_etcd_lock.return_value.__enter__.return_value = lock
lock.is_acquired.return_value = True
mock_changed.return_value = True
# Getting function and versions needs to happen in a db transaction
with db_api.transaction():
func_db = db_api.get_function(self.func_id)
self.assertEqual(0, len(func_db.versions))
body = {'description': 'new version'}
resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id,
body)
self.assertEqual(201, resp.status_int)
self._assertDictContainsSubset(resp.json, body)
mock_changed.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID,
self.func_id, "fake_md5", 0)
mock_copy.assert_called_once_with(unit_base.DEFAULT_PROJECT_ID,
self.func_id, "fake_md5", 0)
# We need to set context as it was removed after the API call
context.set_ctx(self.ctx)
with db_api.transaction():
func_db = db_api.get_function(self.func_id)
self.assertEqual(1, len(func_db.versions))
@mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since')
@mock.patch('qinling.utils.etcd_util.get_function_version_lock')
def test_post_not_change(self, mock_etcd_lock, mock_changed):
lock = mock.Mock()
mock_etcd_lock.return_value.__enter__.return_value = lock
lock.is_acquired.return_value = True
mock_changed.return_value = False
body = {'description': 'new version'}
resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id,
body,
expect_errors=True)
self.assertEqual(403, resp.status_int)
@mock.patch('qinling.utils.etcd_util.get_function_version_lock')
def test_post_max_versions(self, mock_etcd_lock):
lock = mock.Mock()
mock_etcd_lock.return_value.__enter__.return_value = lock
lock.is_acquired.return_value = True
for i in range(10):
self.create_function_version(i, function_id=self.func_id)
resp = self.app.post_json('/v1/functions/%s/versions' % self.func_id,
{},
expect_errors=True)
self.assertEqual(403, resp.status_int)

9
qinling/tests/unit/base.py

@ -238,3 +238,12 @@ class DbTestCase(BaseTest):
execution = db_api.create_execution(execution_params)
return execution
def create_function_version(self, old_version, function_id=None,
prefix=None, **kwargs):
if not function_id:
function_id = self.create_function(prefix=prefix).id
db_api.increase_function_version(function_id, old_version, **kwargs)
db_api.update_function(function_id,
{"latest_version": old_version + 1})

45
qinling/tests/unit/storage/test_file_system.py

@ -181,3 +181,48 @@ class TestFileSystemStorage(base.BaseTest):
)
exists_mock.assert_called_once_with(package_path)
remove_mock.assert_not_called()
def test_changed_since_first_version(self):
ret = self.storage.changed_since(self.project_id, "fake_function",
"fake_md5", 0)
self.assertTrue(ret)
@mock.patch('os.path.exists')
def test_changed_since_exists(self, mock_exists):
mock_exists.return_value = True
ret = self.storage.changed_since(self.project_id, "fake_function",
"fake_md5", 1)
self.assertFalse(ret)
expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id,
"fake_function_1_fake_md5.zip")
mock_exists.assert_called_once_with(expect_path)
@mock.patch('os.path.exists')
def test_changed_since_not_exists(self, mock_exists):
mock_exists.return_value = False
ret = self.storage.changed_since(self.project_id, "fake_function",
"fake_md5", 1)
self.assertTrue(ret)
expect_path = os.path.join(FAKE_STORAGE_PATH, self.project_id,
"fake_function_1_fake_md5.zip")
mock_exists.assert_called_once_with(expect_path)
@mock.patch("shutil.copyfile")
def test_copy(self, mock_copy):
self.storage.copy(self.project_id, "fake_function", "fake_md5", 0)
expect_src = os.path.join(FAKE_STORAGE_PATH, self.project_id,
"fake_function_fake_md5.zip")
expect_dest = os.path.join(FAKE_STORAGE_PATH, self.project_id,
"fake_function_1_fake_md5.zip")
mock_copy.assert_called_once_with(expect_src, expect_dest)

2
qinling/utils/constants.py

@ -24,3 +24,5 @@ SWIFT_FUNCTION = 'swift'
IMAGE_FUNCTION = 'image'
MAX_PACKAGE_SIZE = 51 * 1024 * 1024
MAX_VERSION_NUMBER = 10

6
qinling/utils/etcd_util.py

@ -34,6 +34,12 @@ def get_worker_lock():
return client.lock(id='function_worker')
def get_function_version_lock(function_id):
client = get_client()
lock_id = "function_version_%s" % function_id
return client.lock(id=lock_id)
def create_worker(function_id, worker):
"""Create the worker info in etcd.

Loading…
Cancel
Save