From 992c90e044f99fba5fe88282d71c991f3af1c037 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Mon, 26 Jun 2017 13:51:28 +1200 Subject: [PATCH] Support update runtime If update image, will send asynchronous request to engine. If some error happened during update, engine will rollback runtime automatically. Change-Id: I1598b1c04427dfdb7d573bbff8dc017f113c2fd2 --- qinling/api/controllers/v1/execution.py | 9 ++- qinling/api/controllers/v1/runtime.py | 64 +++++++++++++++++++++- qinling/db/sqlalchemy/api.py | 8 +++ qinling/engine/default_engine.py | 51 +++++++++-------- qinling/exceptions.py | 5 ++ qinling/orchestrator/base.py | 4 ++ qinling/orchestrator/kubernetes/manager.py | 56 ++++++++++++++++++- qinling/rpc.py | 10 ++++ qinling/status.py | 20 +++++++ 9 files changed, 201 insertions(+), 26 deletions(-) create mode 100644 qinling/status.py diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index d9afbd8d..3a535e4f 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -22,6 +22,7 @@ from qinling.api.controllers.v1 import types from qinling.db import api as db_api from qinling import exceptions as exc from qinling import rpc +from qinling import status from qinling.utils import rest_utils LOG = logging.getLogger(__name__) @@ -72,7 +73,13 @@ class ExecutionsController(rest.RestController): ) else: runtime_id = func_db.runtime_id - params.update({'status': 'running'}) + runtime_db = db_api.get_runtime(runtime_id) + if runtime_db.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % runtime_id + ) + + params.update({'status': status.RUNNING}) db_model = db_api.create_execution(params) diff --git a/qinling/api/controllers/v1/runtime.py b/qinling/api/controllers/v1/runtime.py index b5f5bc6c..034ac54d 100644 --- a/qinling/api/controllers/v1/runtime.py +++ b/qinling/api/controllers/v1/runtime.py @@ -21,10 +21,13 @@ from qinling.api.controllers.v1 import types from qinling.db import api as db_api from qinling import exceptions as exc from qinling import rpc +from qinling import status from qinling.utils import rest_utils LOG = logging.getLogger(__name__) +UPDATE_ALLOWED = set(['name', 'description', 'image']) + class RuntimesController(rest.RestController): def __init__(self, *args, **kwargs): @@ -62,7 +65,7 @@ class RuntimesController(rest.RestController): LOG.info("Creating runtime. [runtime=%s]", params) - params.update({'status': 'creating'}) + params.update({'status': status.CREATING}) db_model = db_api.create_runtime(params) self.engine_client.create_runtime(db_model.id) @@ -86,7 +89,64 @@ class RuntimesController(rest.RestController): 'Runtime %s is still in use.' % id ) - runtime_db.status = 'deleting' + runtime_db.status = status.DELETING # Clean related resources asynchronously self.engine_client.delete_runtime(id) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + resources.Runtime, + types.uuid, + body=resources.Runtime + ) + def put(self, id, runtime): + """Update runtime. + + Currently, we only support update name, description, image. When + updating image, send message to engine for asynchronous handling. + """ + values = {} + for key in UPDATE_ALLOWED: + if key in runtime.to_dict(): + values.update({key: runtime.to_dict().get(key)}) + + LOG.info('Update runtime [id=%s, values=%s]' % (id, values)) + + with db_api.transaction(): + if 'image' in values: + pre_runtime = db_api.get_runtime(id) + if pre_runtime.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % id + ) + + pre_image = pre_runtime.image + if pre_image != values['image']: + # Ensure there is no function running in the runtime. + db_funcs = db_api.get_functions( + insecure=True, fields=['id'], runtime_id=id + ) + func_ids = [func.id for func in db_funcs] + + mappings = db_api.get_function_service_mappings( + insecure=True, function_id={'in': func_ids} + ) + if mappings: + raise exc.NotAllowedException( + 'Runtime %s is still in use by functions.' % id + ) + + values['status'] = status.UPGRADING + + self.engine_client.update_runtime( + id, + image=values['image'], + pre_image=pre_image + ) + else: + values.pop('image') + + runtime_db = db_api.update_runtime(id, values) + + return resources.Runtime.from_dict(runtime_db.to_dict()) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 5b493cdb..6ff478db 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -264,6 +264,14 @@ def delete_runtime(id, session=None): session.delete(runtime) +@db_base.session_aware() +def update_runtime(id, values, session=None): + runtime = get_runtime(id) + runtime.update(values.copy()) + + return runtime + + @db_base.session_aware() def create_execution(values, session=None): execution = models.Execution() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 867a61d0..36675499 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -15,6 +15,7 @@ from oslo_log import log as logging from qinling.db import api as db_api +from qinling import status from qinling.utils import common LOG = logging.getLogger(__name__) @@ -29,39 +30,50 @@ class DefaultEngine(object): with db_api.transaction(): runtime = db_api.get_runtime(runtime_id) - identifier = '%s-%s' % (runtime_id, runtime.name) - labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + labels = {'runtime_id': runtime_id} try: self.orchestrator.create_pool( - identifier, + runtime_id, runtime.image, labels=labels, ) - - runtime.status = 'available' + runtime.status = status.AVAILABLE except Exception as e: LOG.exception( 'Failed to create pool for runtime %s. Error: %s', runtime_id, str(e) ) - - runtime.status = 'error' + runtime.status = status.ERROR def delete_runtime(self, ctx, runtime_id): LOG.info('Start to delete runtime, id=%s', runtime_id) - with db_api.transaction(): - runtime = db_api.get_runtime(runtime_id) - identifier = '%s-%s' % (runtime_id, runtime.name) - labels = {'runtime_name': runtime.name, 'runtime_id': runtime_id} + labels = {'runtime_id': runtime_id} + self.orchestrator.delete_pool(runtime_id, labels=labels) + db_api.delete_runtime(runtime_id) - self.orchestrator.delete_pool(identifier, labels=labels) + LOG.info('Runtime %s deleted.', runtime_id) - db_api.delete_runtime(runtime_id) + def update_runtime(self, ctx, runtime_id, image=None, pre_image=None): + LOG.info('Start to update runtime, id=%s, image=%s', runtime_id, image) - LOG.info('Runtime %s deleted.', runtime_id) + labels = {'runtime_id': runtime_id} + ret = self.orchestrator.update_pool( + runtime_id, labels=labels, image=image + ) + + if ret: + values = {'status': status.AVAILABLE} + db_api.update_runtime(runtime_id, values) + + LOG.info('Runtime %s updated.', runtime_id) + else: + values = {'status': status.AVAILABLE, 'image': pre_image} + db_api.update_runtime(runtime_id, values) + + LOG.info('Runtime %s rollbacked.', runtime_id) def create_execution(self, ctx, execution_id, function_id, runtime_id, input=None): @@ -86,15 +98,10 @@ class DefaultEngine(object): (common.generate_unicode_uuid(dashed=False), function_id) )[:63] - labels = { - 'function_name': function.name, 'function_id': function_id - } + labels = {'function_id': function_id} else: - runtime = db_api.get_runtime(runtime_id) - identifier = ('%s-%s' % (runtime_id, runtime.name))[:63] - labels = { - 'runtime_name': runtime.name, 'runtime_id': runtime_id - } + identifier = runtime_id + labels = {'runtime_id': runtime_id} service_url = self.orchestrator.prepare_execution( function_id, diff --git a/qinling/exceptions.py b/qinling/exceptions.py index 1009b815..ded88bc8 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -64,6 +64,11 @@ class NotAllowedException(QinlingException): message = "Operation not allowed" +class RuntimeNotAvailableException(QinlingException): + http_code = 403 + message = "Runtime not available" + + class DBError(QinlingException): http_code = 400 diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index d5146e59..bcd81df6 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -34,6 +34,10 @@ class OrchestratorBase(object): def delete_pool(self, name, **kwargs): raise NotImplementedError + @abc.abstractmethod + def update_pool(self, name, **kwargs): + raise NotImplementedError + @abc.abstractmethod def prepare_execution(self, function_id, **kwargs): raise NotImplementedError diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 357540e1..11d9ed83 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -50,7 +50,6 @@ class KubernetesManager(base.OrchestratorBase): loader=template_loader, autoescape=True, trim_blocks=True, lstrip_blocks=True ) - self.deployment_template = jinja_env.get_template('deployment.j2') self.service_template = jinja_env.get_template('service.j2') self.pod_template = jinja_env.get_template('pod.j2') @@ -141,6 +140,61 @@ class KubernetesManager(base.OrchestratorBase): LOG.info("Pods in deployment %s deleted.", name) LOG.info("Deployment %s deleted.", name) + def update_pool(self, name, labels=None, image=None): + """Deployment rolling-update. + + Return True if successful, otherwise return False after rolling back. + """ + LOG.info('Start to do rolling-update deployment %s', name) + + body = { + 'spec': { + 'template': { + 'spec': { + 'containers': [ + { + # TODO(kong): Make the name configurable. + 'name': 'worker', + 'image': image + } + ] + } + } + } + } + self.v1extention.patch_namespaced_deployment( + name, self.conf.kubernetes.namespace, body + ) + + unavailable_replicas = 1 + # TODO(kong): Make this configurable + retry = 5 + while unavailable_replicas != 0 and retry > 0: + time.sleep(5) + retry = retry - 1 + + deploy = self.v1extention.read_namespaced_deployment_status( + name, + self.conf.kubernetes.namespace + ) + unavailable_replicas = deploy.status.unavailable_replicas + + # Handle failure of rolling-update. + if unavailable_replicas > 0: + body = { + "name": name, + "rollbackTo": { + "revision": 0 + } + } + self.v1extention.create_namespaced_deployment_rollback_rollback( + name, self.conf.kubernetes.namespace, body + ) + + return False + + return True + def _choose_available_pod(self, labels): selector = common.convert_dict_to_string(labels) diff --git a/qinling/rpc.py b/qinling/rpc.py index 54a72ecf..4d7dbcec 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -145,6 +145,16 @@ class EngineClient(object): runtime_id=id ) + @wrap_messaging_exception + def update_runtime(self, id, image=None, pre_image=None): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'update_runtime', + runtime_id=id, + image=image, + pre_image=pre_image + ) + @wrap_messaging_exception def create_execution(self, execution_id, function_id, runtime_id, input=None, is_sync=True): diff --git a/qinling/status.py b/qinling/status.py new file mode 100644 index 00000000..4dfac928 --- /dev/null +++ b/qinling/status.py @@ -0,0 +1,20 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CREATING = 'creating' +AVAILABLE = 'available' +UPGRADING = 'upgrading' +ERROR = 'error' +DELETING = 'deleting' +RUNNING = 'running'