Delete all its versions when deleting function

Change-Id: Ic0132619b0aa513cd84abc278006623b42ef15b1
Story: 2001829
Task: 14423
This commit is contained in:
Lingxian Kong 2018-04-25 20:03:58 +12:00
parent e0e8f3d13d
commit 3f303eb60c
6 changed files with 160 additions and 70 deletions

View File

@ -254,7 +254,10 @@ class FunctionsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete the specified function."""
"""Delete the specified function.
Delete function will also delete all its versions.
"""
LOG.info("Delete function %s.", id)
with db_api.transaction():
@ -263,9 +266,9 @@ class FunctionsController(rest.RestController):
raise exc.NotAllowedException(
'The function is still associated with running job(s).'
)
if func_db.webhook:
if len(func_db.webhooks) > 0:
raise exc.NotAllowedException(
'The function is still associated with webhook.'
'The function is still associated with webhook(s).'
)
# Even admin user can not delete other project's function because
@ -275,22 +278,41 @@ class FunctionsController(rest.RestController):
'Function can only be deleted by its owner.'
)
# Delete trust if needed
if func_db.trust_id:
keystone_util.delete_trust(func_db.trust_id)
for version_db in func_db.versions:
# Delete all resources created by orchestrator asynchronously.
self.engine_client.delete_function(
id,
version=version_db.version_number
)
# Delete etcd keys
etcd_util.delete_function(
id,
version=version_db.version_number
)
# Delete function version packages. Versions is only supported
# for package type function.
self.storage_provider.delete(
func_db.project_id,
id,
None,
version=version_db.version_number
)
# Delete resources for function version 0(func_db.versions==[])
self.engine_client.delete_function(id)
etcd_util.delete_function(id)
source = func_db.code['source']
if source == constants.PACKAGE_FUNCTION:
self.storage_provider.delete(func_db.project_id, id,
func_db.code['md5sum'])
# Delete all resources created by orchestrator asynchronously.
self.engine_client.delete_function(id)
# Delete trust if needed
if func_db.trust_id:
keystone_util.delete_trust(func_db.trust_id)
# Delete etcd keys
etcd_util.delete_function(id)
# This will also delete function service mapping as well.
# This will also delete function service mapping and function
# versions as well.
db_api.delete_function(id)
@rest_utils.wrap_pecan_controller_exception

View File

@ -56,12 +56,11 @@ def transaction():
def delete_all():
"""A helper function for testing."""
with transaction():
delete_jobs(insecure=True)
delete_webhooks(insecure=True)
delete_executions(insecure=True)
delete_functions(insecure=True)
delete_runtimes(insecure=True)
delete_jobs(insecure=True)
delete_webhooks(insecure=True)
delete_executions(insecure=True)
delete_functions(insecure=True)
delete_runtimes(insecure=True)
def conditional_update(model, values, expected_values, **kwargs):

View File

@ -193,11 +193,11 @@ def _get_db_object_by_id(model, id, insecure=None):
@db_base.insecure_aware()
def _delete_all(model, insecure=None, **kwargs):
# NOTE(kong): Because we use 'in_' operator in _secure_query(), delete()
# NOTE(kong): If we use 'in_' operator in _secure_query(), delete()
# method will raise error with default parameter. Please refer to
# http://docs.sqlalchemy.org/en/rel_1_0/orm/query.html#sqlalchemy.orm.query.Query.delete
query = db_base.model_query(model) if insecure else _secure_query(model)
query.filter_by(**kwargs).delete(synchronize_session=False)
query.filter_by(**kwargs).delete(synchronize_session="fetch")
@db_base.session_aware()

View File

@ -134,7 +134,7 @@ Function.jobs = relationship(
"~Job.status.in_(['done', 'cancelled']))"
)
)
Function.webhook = relationship("Webhook", uselist=False, backref="function")
Function.webhooks = relationship("Webhook", uselist=True, backref="function")
Function.versions = relationship(
"FunctionVersion",
order_by="FunctionVersion.version_number",

View File

@ -48,55 +48,65 @@ def handle_function_service_expiration(ctx, engine):
delta = timedelta(seconds=CONF.engine.function_service_expiration)
expiry_time = datetime.utcnow() - delta
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
latest_version=0
)
for func_db in results:
if not etcd_util.get_service_url(func_db.id, 0):
continue
LOG.info(
'Deleting service mapping and workers for function %s(version 0)',
func_db.id
with db_api.transaction():
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
latest_version=0
)
# Delete resources related to the function
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id, 0)
for func_db in results:
if not etcd_util.get_service_url(func_db.id, 0):
continue
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
LOG.info(
'Deleting service mapping and workers for function '
'%s(version 0)',
func_db.id
)
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
# Delete resources related to the function
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id, 0)
LOG.info(
'Deleting service mapping and workers for function %s(version %s)',
v.function_id, v.version_number
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
LOG.info(
'Deleting service mapping and workers for function '
'%s(version %s)',
v.function_id, v.version_number
)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
@periodics.periodic(3)
def handle_job(engine_client):
"""Execute job task with no db transactions."""
for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
job_id = job.id
func_id = job.function_id
func_version = job.function_version
"""Execute job task with no db transactions.
We don't do iterations on jobs_db directly to avoid the potential
'Cursor needed to be reset' error.
"""
jobs_db = db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3))
jobs_dict = [j.to_dict() for j in jobs_db]
for job in jobs_dict:
job_id = job["id"]
func_id = job["function_id"]
func_version = job["function_version"]
LOG.debug("Processing job: %s, function: %s(version %s)", job_id,
func_id, func_version)
@ -106,16 +116,16 @@ def handle_job(engine_client):
try:
# Setup context before schedule job.
ctx = keystone_utils.create_trust_context(
trust_id, job.project_id
trust_id, job["project_id"]
)
context.set_ctx(ctx)
if (job.count is not None and job.count > 0):
job.count -= 1
if (job["count"] is not None and job["count"] > 0):
job["count"] -= 1
# Job delete/update is done using UPDATE ... FROM ... WHERE
# non-locking clause.
if job.count == 0:
if job["count"] == 0:
modified = db_api.conditional_update(
models.Job,
{
@ -130,19 +140,19 @@ def handle_job(engine_client):
)
else:
next_time = jobs.get_next_execution_time(
job.pattern,
job.next_execution_time
job["pattern"],
job["next_execution_time"]
)
modified = db_api.conditional_update(
models.Job,
{
'next_execution_time': next_time,
'count': job.count
'count': job["count"]
},
{
'id': job_id,
'next_execution_time': job.next_execution_time
'next_execution_time': job["next_execution_time"]
},
insecure=True,
)
@ -162,7 +172,7 @@ def handle_job(engine_client):
params = {
'function_id': func_id,
'function_version': func_version,
'input': job.function_input,
'input': job["function_input"],
'sync': False,
'description': constants.EXECUTION_BY_JOB % job_id
}

View File

@ -19,6 +19,7 @@ import tempfile
import mock
from oslo_config import cfg
from qinling.db import api as db_api
from qinling import status
from qinling.tests.unit.api import base
from qinling.tests.unit import base as unit_base
@ -277,6 +278,64 @@ class TestFunctionController(base.APITest):
self.assertEqual(403, resp.status_int)
@mock.patch('qinling.utils.etcd_util.delete_function')
@mock.patch('qinling.rpc.EngineClient.delete_function')
@mock.patch('qinling.storage.file_system.FileSystemStorage.delete')
def test_delete_with_versions(self, mock_package_delete,
mock_engine_delete, mock_etcd_delete):
db_func = self.create_function(runtime_id=self.runtime_id)
func_id = db_func.id
# Create two versions for the function
db_api.increase_function_version(func_id, 0)
db_api.increase_function_version(func_id, 1)
resp = self.app.delete('/v1/functions/%s' % func_id)
self.assertEqual(204, resp.status_int)
self.assertEqual(3, mock_package_delete.call_count)
self.assertEqual(3, mock_engine_delete.call_count)
self.assertEqual(3, mock_etcd_delete.call_count)
mock_package_delete.assert_has_calls(
[
mock.call(unit_base.DEFAULT_PROJECT_ID, func_id, None,
version=1),
mock.call(unit_base.DEFAULT_PROJECT_ID, func_id, None,
version=2),
mock.call(unit_base.DEFAULT_PROJECT_ID, func_id, "fake_md5")
]
)
mock_engine_delete.assert_has_calls(
[
mock.call(func_id, version=1),
mock.call(func_id, version=2),
mock.call(func_id)
]
)
mock_etcd_delete.assert_has_calls(
[
mock.call(func_id, version=1),
mock.call(func_id, version=2),
mock.call(func_id)
]
)
def test_delete_with_version_associate_webhook(self):
db_func = self.create_function(runtime_id=self.runtime_id)
func_id = db_func.id
db_api.increase_function_version(func_id, 0)
self.create_webhook(func_id, function_version=1)
resp = self.app.delete(
'/v1/functions/%s' % func_id,
expect_errors=True
)
self.assertEqual(403, resp.status_int)
@mock.patch('qinling.rpc.EngineClient.scaleup_function')
def test_scale_up(self, scaleup_function_mock):
db_func = self.create_function(runtime_id=self.runtime_id)