From ac9d477112a8f9eaac5ca6c4a84a53297a4709c7 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Fri, 1 Sep 2017 22:20:45 +1200 Subject: [PATCH] Scale down function This is admin only operation. The load monitoring of function execution depends on the monitoring solution of underlying orchestrator. Implements: blueprint qingling-autoscaling Change-Id: I2eff62a45a718d230b51f18c5a49e0abea5f3164 --- qinling/api/controllers/v1/function.py | 39 +++++++++++++++++++--- qinling/api/controllers/v1/resources.py | 4 +++ qinling/config.py | 5 --- qinling/db/api.py | 8 +++-- qinling/db/sqlalchemy/api.py | 18 ++++++++-- qinling/db/sqlalchemy/models.py | 3 ++ qinling/engine/default_engine.py | 22 ++++++++++-- qinling/orchestrator/base.py | 4 +++ qinling/orchestrator/kubernetes/manager.py | 13 ++++++-- qinling/rpc.py | 12 ++++++- qinling/services/periodics.py | 3 ++ qinling_tempest_plugin/pre_test_hook.sh | 8 +++-- setup.cfg | 3 +- tools/gate/funcs/common.sh | 3 +- tools/gate/setup_gate.sh | 2 +- 15 files changed, 121 insertions(+), 26 deletions(-) mode change 100644 => 100755 qinling_tempest_plugin/pre_test_hook.sh diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 920d1247..6449dac5 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -226,16 +226,45 @@ class FunctionsController(rest.RestController): @wsme_pecan.wsexpose( None, types.uuid, + body=resources.ScaleInfo, status_code=202 ) - def scale_up(self, id): + def scale_up(self, id, scale): """Scale up the containers for function execution. - This is admin only operation. The number of added containers is defined - in config file. + This is admin only operation. The load monitoring of function execution + depends on the monitoring solution of underlying orchestrator. """ func_db = db_api.get_function(id) + params = scale.to_dict() - LOG.info('Starting to scale up function %s', id) + LOG.info('Starting to scale up function %s, params: %s', id, params) - self.engine_client.scaleup_function(id, runtime_id=func_db.runtime_id) + self.engine_client.scaleup_function( + id, + runtime_id=func_db.runtime_id, + count=params['count'] + ) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + None, + types.uuid, + body=resources.ScaleInfo, + status_code=202 + ) + def scale_down(self, id, scale): + """Scale down the containers for function execution. + + This is admin only operation. The load monitoring of function execution + depends on the monitoring solution of underlying orchestrator. + """ + func_db = db_api.get_function(id) + params = scale.to_dict() + if len(func_db.workers) <= 1: + LOG.info('No need to scale down function %s', id) + return + + LOG.info('Starting to scale down function %s, params: %s', id, params) + + self.engine_client.scaledown_function(id, count=params['count']) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 4154b3e5..1d70c5c8 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -358,3 +358,7 @@ class Jobs(ResourceList): ) return sample + + +class ScaleInfo(Resource): + count = wtypes.IntegerType(minimum=1) diff --git a/qinling/config.py b/qinling/config.py index 2ba042cd..b068f12b 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -135,11 +135,6 @@ kubernetes_opts = [ default='127.0.0.1', help='Qinling API service ip address.' ), - cfg.IntOpt( - 'scale_step', - default=1, - help='Number of pods for function scale up.' - ), ] CONF = cfg.CONF diff --git a/qinling/db/api.py b/qinling/db/api.py index f55cf828..630717ce 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -166,8 +166,12 @@ def create_function_worker(values): return IMPL.create_function_worker(values) -def delete_function_workers(id): - return IMPL.delete_function_workers(id) +def delete_function_worker(name): + return IMPL.delete_function_worker(name) + + +def delete_function_workers(function_id): + return IMPL.delete_function_workers(function_id) def get_function_workers(function_id): diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 3f687c8f..54ac76e8 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -420,8 +420,22 @@ def get_function_workers(function_id, session=None): @db_base.session_aware() -def delete_function_workers(id, session=None): - workers = get_function_workers(id) +def delete_function_worker(worker_name, session=None): + worker = db_base.model_query( + models.FunctionWorkers + ).filter_by(worker_name=worker_name).first() + + if not worker: + raise exc.DBEntityNotFoundError( + "FunctionWorker not found [worker_name=%s]" % worker_name + ) + + session.delete(worker) + + +@db_base.session_aware() +def delete_function_workers(function_id, session=None): + workers = get_function_workers(function_id) for worker in workers: session.delete(worker) diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 5df35be5..0a9b59d5 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -119,11 +119,14 @@ class Job(model_base.QinlingSecureModelBase): Function.service = relationship( "FunctionServiceMapping", uselist=False, + lazy='subquery', cascade="all, delete-orphan" ) # Delete workers automatically when deleting function. Function.workers = relationship( "FunctionWorkers", + order_by="FunctionWorkers.created_at", + lazy='subquery', cascade="all, delete-orphan" ) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 7c91f436..c7b2ac8d 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -188,13 +188,14 @@ class DefaultEngine(object): LOG.info('Deleted.', resource=resource) - def scaleup_function(self, ctx, function_id, runtime_id): + def scaleup_function(self, ctx, function_id, runtime_id, count=1): function = db_api.get_function(function_id) worker_names = self.orchestrator.scaleup_function( function_id, identifier=runtime_id, - entry=function.entry + entry=function.entry, + count=count ) with db_api.transaction(): @@ -206,3 +207,20 @@ class DefaultEngine(object): db_api.create_function_worker(worker) LOG.info('Finished scaling up function %s.', function_id) + + def scaledown_function(self, ctx, function_id, count=1): + func_db = db_api.get_function(function_id) + worker_deleted_num = ( + count if len(func_db.workers) > count else len(func_db.workers) - 1 + ) + workers = func_db.workers[:worker_deleted_num] + + with db_api.transaction(): + for worker in workers: + LOG.debug('Removing worker %s', worker.worker_name) + self.orchestrator.delete_worker( + worker.worker_name, + ) + db_api.delete_function_worker(worker.worker_name) + + LOG.info('Finished scaling up function %s.', function_id) diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index 7829332e..14d74f89 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -54,6 +54,10 @@ class OrchestratorBase(object): def scaleup_function(self, function_id, **kwargs): raise NotImplementedError + @abc.abstractmethod + def delete_worker(self, worker_name, **kwargs): + raise NotImplementedError + def load_orchestrator(conf): global ORCHESTRATOR diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 9e47e4d3..0958b24f 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -407,14 +407,14 @@ class KubernetesManager(base.OrchestratorBase): label_selector=selector ) - LOG.info("Pod for function %s deleted.", function_id) + LOG.info("Pod(s) for function %s deleted.", function_id) def scaleup_function(self, function_id, identifier=None, - entry='main.main'): + entry='main.main', count=1): pod_names = [] labels = {'runtime_id': identifier} pods = self._choose_available_pod( - labels, count=self.conf.kubernetes.scale_step + labels, count=count ) if not pods: @@ -446,3 +446,10 @@ class KubernetesManager(base.OrchestratorBase): LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) return pod_names + + def delete_worker(self, worker_name, **kwargs): + self.v1.delete_namespaced_pod( + worker_name, + self.conf.kubernetes.namespace, + {} + ) diff --git a/qinling/rpc.py b/qinling/rpc.py index ea33c9c2..14260487 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -188,10 +188,20 @@ class EngineClient(object): ) @wrap_messaging_exception - def scaleup_function(self, id, runtime_id): + def scaleup_function(self, id, runtime_id, count=1): return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), 'scaleup_function', function_id=id, runtime_id=runtime_id, + count=count + ) + + @wrap_messaging_exception + def scaledown_function(self, id, count=1): + return self._client.prepare(topic=self.topic, server=None).cast( + ctx.get_ctx(), + 'scaledown_function', + function_id=id, + count=count ) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 4ec8a3fc..25c01b65 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -50,6 +50,9 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): return for func_db in results: + if not func_db.service: + continue + with db_api.transaction(): LOG.info( 'Deleting service mapping and workers for function %s', diff --git a/qinling_tempest_plugin/pre_test_hook.sh b/qinling_tempest_plugin/pre_test_hook.sh old mode 100644 new mode 100755 index ecb1b68f..b3819753 --- a/qinling_tempest_plugin/pre_test_hook.sh +++ b/qinling_tempest_plugin/pre_test_hook.sh @@ -20,7 +20,9 @@ export localconf=$BASE/new/devstack/local.conf export QINLING_CONF=/etc/qinling/qinling.conf # Install k8s cluster -bash $BASE/new/qinling/tools/gate/setup_gate.sh +pushd $BASE/new/qinling/ +bash tools/gate/setup_gate.sh +popd -echo -e '[[post-config|$QINLING_CONF]]\n[kubernetes]\n' >> $localconf -echo -e 'qinling_service_address=${DEFAULT_HOST_IP}\n' >> $localconf +echo -e "[[post-config|$QINLING_CONF]]\n[kubernetes]\n" >> $localconf +echo -e "qinling_service_address=${DEFAULT_HOST_IP}\n" >> $localconf diff --git a/setup.cfg b/setup.cfg index 0252a5bc..b7133124 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,7 @@ classifier = [files] packages = qinling + qinling_tempest_plugin [entry_points] console_scripts = @@ -37,7 +38,7 @@ oslo.config.opts = qinling.config = qinling.config:list_opts tempest.test_plugins = - qinling_test = qinling_tempest_tests.plugin:QinlingTempestPlugin + qinling_test = qinling_tempest_plugin.plugin:QinlingTempestPlugin [build_sphinx] all-files = 1 diff --git a/tools/gate/funcs/common.sh b/tools/gate/funcs/common.sh index 7e1c7501..514cba8d 100644 --- a/tools/gate/funcs/common.sh +++ b/tools/gate/funcs/common.sh @@ -20,7 +20,8 @@ function base_install { iptables \ ipcalc \ nmap \ - lshw + lshw \ + screen elif [ "x$HOST_OS" == "xcentos" ]; then sudo yum install -y \ epel-release diff --git a/tools/gate/setup_gate.sh b/tools/gate/setup_gate.sh index 3f638c12..0c2603f9 100755 --- a/tools/gate/setup_gate.sh +++ b/tools/gate/setup_gate.sh @@ -39,5 +39,5 @@ bash ${WORK_DIR}/tools/gate/kubeadm_aio.sh # Starts a proxy to the Kubernetes API server in a screen session sudo screen -S kube_proxy -X quit || true -sudo screen -dmS kube_proxy && screen -S kube_proxy -X screen -t kube_proxy +sudo screen -dmS kube_proxy && sudo screen -S kube_proxy -X screen -t kube_proxy sudo screen -S kube_proxy -p kube_proxy -X stuff 'kubectl proxy --accept-hosts=".*" --address="0.0.0.0"\n'