Fix function service expiration handler
Add a unit test as well. Change-Id: I5eaac5891e2c0aba8c14eef896baeb25c7ea9bc4
This commit is contained in:
parent
aa1469da68
commit
789bed3c85
|
@ -33,8 +33,7 @@ class DefaultEngine(object):
|
||||||
self.session = requests.Session()
|
self.session = requests.Session()
|
||||||
|
|
||||||
def create_runtime(self, ctx, runtime_id):
|
def create_runtime(self, ctx, runtime_id):
|
||||||
LOG.info('Start to create.',
|
LOG.info('Start to create runtime %s.', runtime_id)
|
||||||
resource={'type': 'runtime', 'id': runtime_id})
|
|
||||||
|
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
runtime = db_api.get_runtime(runtime_id)
|
runtime = db_api.get_runtime(runtime_id)
|
||||||
|
@ -47,6 +46,7 @@ class DefaultEngine(object):
|
||||||
labels=labels,
|
labels=labels,
|
||||||
)
|
)
|
||||||
runtime.status = status.AVAILABLE
|
runtime.status = status.AVAILABLE
|
||||||
|
LOG.info('Runtime %s created.', runtime_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(
|
LOG.exception(
|
||||||
'Failed to create pool for runtime %s. Error: %s',
|
'Failed to create pool for runtime %s. Error: %s',
|
||||||
|
@ -56,18 +56,19 @@ class DefaultEngine(object):
|
||||||
runtime.status = status.ERROR
|
runtime.status = status.ERROR
|
||||||
|
|
||||||
def delete_runtime(self, ctx, runtime_id):
|
def delete_runtime(self, ctx, runtime_id):
|
||||||
resource = {'type': 'runtime', 'id': runtime_id}
|
LOG.info('Start to delete runtime %s.', runtime_id)
|
||||||
LOG.info('Start to delete.', resource=resource)
|
|
||||||
|
|
||||||
labels = {'runtime_id': runtime_id}
|
labels = {'runtime_id': runtime_id}
|
||||||
self.orchestrator.delete_pool(runtime_id, labels=labels)
|
self.orchestrator.delete_pool(runtime_id, labels=labels)
|
||||||
db_api.delete_runtime(runtime_id)
|
db_api.delete_runtime(runtime_id)
|
||||||
|
|
||||||
LOG.info('Deleted.', resource=resource)
|
LOG.info('Deleted runtime %s.', runtime_id)
|
||||||
|
|
||||||
def update_runtime(self, ctx, runtime_id, image=None, pre_image=None):
|
def update_runtime(self, ctx, runtime_id, image=None, pre_image=None):
|
||||||
resource = {'type': 'runtime', 'id': runtime_id}
|
LOG.info(
|
||||||
LOG.info('Start to update, image=%s', image, resource=resource)
|
'Start to update runtime %s, image: %s, pre_image: %s',
|
||||||
|
runtime_id, image, pre_image
|
||||||
|
)
|
||||||
|
|
||||||
labels = {'runtime_id': runtime_id}
|
labels = {'runtime_id': runtime_id}
|
||||||
ret = self.orchestrator.update_pool(
|
ret = self.orchestrator.update_pool(
|
||||||
|
@ -78,12 +79,12 @@ class DefaultEngine(object):
|
||||||
values = {'status': status.AVAILABLE}
|
values = {'status': status.AVAILABLE}
|
||||||
db_api.update_runtime(runtime_id, values)
|
db_api.update_runtime(runtime_id, values)
|
||||||
|
|
||||||
LOG.info('Updated.', resource=resource)
|
LOG.info('Updated runtime %s.', runtime_id)
|
||||||
else:
|
else:
|
||||||
values = {'status': status.AVAILABLE, 'image': pre_image}
|
values = {'status': status.AVAILABLE, 'image': pre_image}
|
||||||
db_api.update_runtime(runtime_id, values)
|
db_api.update_runtime(runtime_id, values)
|
||||||
|
|
||||||
LOG.info('Rollbacked.', resource=resource)
|
LOG.info('Rollbacked runtime %s.', runtime_id)
|
||||||
|
|
||||||
@tenacity.retry(
|
@tenacity.retry(
|
||||||
wait=tenacity.wait_fixed(1),
|
wait=tenacity.wait_fixed(1),
|
||||||
|
@ -216,13 +217,12 @@ class DefaultEngine(object):
|
||||||
|
|
||||||
def delete_function(self, ctx, function_id):
|
def delete_function(self, ctx, function_id):
|
||||||
"""Deletes underlying resources allocated for function."""
|
"""Deletes underlying resources allocated for function."""
|
||||||
resource = {'type': 'function', 'id': function_id}
|
LOG.info('Start to delete function %s.', function_id)
|
||||||
LOG.info('Start to delete.', resource=resource)
|
|
||||||
|
|
||||||
labels = {'function_id': function_id}
|
labels = {'function_id': function_id}
|
||||||
self.orchestrator.delete_function(function_id, labels=labels)
|
self.orchestrator.delete_function(function_id, labels=labels)
|
||||||
|
|
||||||
LOG.info('Deleted.', resource=resource)
|
LOG.info('Deleted function %s.', function_id)
|
||||||
|
|
||||||
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
|
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
|
||||||
worker_names, service_url = self.orchestrator.scaleup_function(
|
worker_names, service_url = self.orchestrator.scaleup_function(
|
||||||
|
|
|
@ -64,7 +64,7 @@ def handle_function_service_expiration(ctx, engine):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Delete resources related to the function
|
# Delete resources related to the function
|
||||||
engine.delete_function(func_db.id)
|
engine.delete_function(ctx, func_db.id)
|
||||||
# Delete etcd keys
|
# Delete etcd keys
|
||||||
etcd_util.delete_function(func_db.id)
|
etcd_util.delete_function(func_db.id)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
# Copyright 2017 Catalyst IT Limited
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import time
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from qinling.db import api as db_api
|
||||||
|
from qinling.engine import default_engine
|
||||||
|
from qinling.services import periodics
|
||||||
|
from qinling.tests.unit import base
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
class TestPeriodics(base.DbTestCase):
|
||||||
|
TEST_CASE_NAME = 'TestPeriodics'
|
||||||
|
|
||||||
|
@mock.patch('qinling.utils.etcd_util.delete_function')
|
||||||
|
@mock.patch('qinling.utils.etcd_util.get_service_url')
|
||||||
|
def test_function_service_expiration(self, mock_etcd_url,
|
||||||
|
mock_etcd_delete):
|
||||||
|
db_func = self.create_function(
|
||||||
|
runtime_id=None, prefix=self.TEST_CASE_NAME
|
||||||
|
)
|
||||||
|
function_id = db_func.id
|
||||||
|
# Update function to simulate function execution
|
||||||
|
db_api.update_function(function_id, {'count': 1})
|
||||||
|
time.sleep(1.5)
|
||||||
|
|
||||||
|
mock_k8s = mock.Mock()
|
||||||
|
mock_etcd_url.return_value = 'http://localhost:37718'
|
||||||
|
self.override_config('function_service_expiration', 1, 'engine')
|
||||||
|
engine = default_engine.DefaultEngine(mock_k8s)
|
||||||
|
periodics.handle_function_service_expiration(self.ctx, engine)
|
||||||
|
|
||||||
|
self.assertEqual(1, mock_k8s.delete_function.call_count)
|
||||||
|
args, kwargs = mock_k8s.delete_function.call_args
|
||||||
|
self.assertIn(function_id, args)
|
||||||
|
mock_etcd_delete.assert_called_once_with(function_id)
|
Loading…
Reference in New Issue