Browse Source

Scale down function

This is admin only operation. The load monitoring of function execution
depends on the monitoring solution of underlying orchestrator.

Implements: blueprint qingling-autoscaling
Change-Id: I2eff62a45a718d230b51f18c5a49e0abea5f3164
changes/76/499976/6
Lingxian Kong 5 years ago
parent
commit
ac9d477112
  1. 39
      qinling/api/controllers/v1/function.py
  2. 4
      qinling/api/controllers/v1/resources.py
  3. 5
      qinling/config.py
  4. 8
      qinling/db/api.py
  5. 18
      qinling/db/sqlalchemy/api.py
  6. 3
      qinling/db/sqlalchemy/models.py
  7. 22
      qinling/engine/default_engine.py
  8. 4
      qinling/orchestrator/base.py
  9. 13
      qinling/orchestrator/kubernetes/manager.py
  10. 12
      qinling/rpc.py
  11. 3
      qinling/services/periodics.py
  12. 8
      qinling_tempest_plugin/pre_test_hook.sh
  13. 3
      setup.cfg
  14. 3
      tools/gate/funcs/common.sh
  15. 2
      tools/gate/setup_gate.sh

39
qinling/api/controllers/v1/function.py

@ -226,16 +226,45 @@ class FunctionsController(rest.RestController):
@wsme_pecan.wsexpose(
None,
types.uuid,
body=resources.ScaleInfo,
status_code=202
)
def scale_up(self, id):
def scale_up(self, id, scale):
"""Scale up the containers for function execution.
This is admin only operation. The number of added containers is defined
in config file.
This is admin only operation. The load monitoring of function execution
depends on the monitoring solution of underlying orchestrator.
"""
func_db = db_api.get_function(id)
params = scale.to_dict()
LOG.info('Starting to scale up function %s', id)
LOG.info('Starting to scale up function %s, params: %s', id, params)
self.engine_client.scaleup_function(id, runtime_id=func_db.runtime_id)
self.engine_client.scaleup_function(
id,
runtime_id=func_db.runtime_id,
count=params['count']
)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
None,
types.uuid,
body=resources.ScaleInfo,
status_code=202
)
def scale_down(self, id, scale):
"""Scale down the containers for function execution.
This is admin only operation. The load monitoring of function execution
depends on the monitoring solution of underlying orchestrator.
"""
func_db = db_api.get_function(id)
params = scale.to_dict()
if len(func_db.workers) <= 1:
LOG.info('No need to scale down function %s', id)
return
LOG.info('Starting to scale down function %s, params: %s', id, params)
self.engine_client.scaledown_function(id, count=params['count'])

4
qinling/api/controllers/v1/resources.py

@ -358,3 +358,7 @@ class Jobs(ResourceList):
)
return sample
class ScaleInfo(Resource):
count = wtypes.IntegerType(minimum=1)

5
qinling/config.py

@ -135,11 +135,6 @@ kubernetes_opts = [
default='127.0.0.1',
help='Qinling API service ip address.'
),
cfg.IntOpt(
'scale_step',
default=1,
help='Number of pods for function scale up.'
),
]
CONF = cfg.CONF

8
qinling/db/api.py

@ -166,8 +166,12 @@ def create_function_worker(values):
return IMPL.create_function_worker(values)
def delete_function_workers(id):
return IMPL.delete_function_workers(id)
def delete_function_worker(name):
return IMPL.delete_function_worker(name)
def delete_function_workers(function_id):
return IMPL.delete_function_workers(function_id)
def get_function_workers(function_id):

18
qinling/db/sqlalchemy/api.py

@ -420,8 +420,22 @@ def get_function_workers(function_id, session=None):
@db_base.session_aware()
def delete_function_workers(id, session=None):
workers = get_function_workers(id)
def delete_function_worker(worker_name, session=None):
worker = db_base.model_query(
models.FunctionWorkers
).filter_by(worker_name=worker_name).first()
if not worker:
raise exc.DBEntityNotFoundError(
"FunctionWorker not found [worker_name=%s]" % worker_name
)
session.delete(worker)
@db_base.session_aware()
def delete_function_workers(function_id, session=None):
workers = get_function_workers(function_id)
for worker in workers:
session.delete(worker)

3
qinling/db/sqlalchemy/models.py

@ -119,11 +119,14 @@ class Job(model_base.QinlingSecureModelBase):
Function.service = relationship(
"FunctionServiceMapping",
uselist=False,
lazy='subquery',
cascade="all, delete-orphan"
)
# Delete workers automatically when deleting function.
Function.workers = relationship(
"FunctionWorkers",
order_by="FunctionWorkers.created_at",
lazy='subquery',
cascade="all, delete-orphan"
)

22
qinling/engine/default_engine.py

@ -188,13 +188,14 @@ class DefaultEngine(object):
LOG.info('Deleted.', resource=resource)
def scaleup_function(self, ctx, function_id, runtime_id):
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
function = db_api.get_function(function_id)
worker_names = self.orchestrator.scaleup_function(
function_id,
identifier=runtime_id,
entry=function.entry
entry=function.entry,
count=count
)
with db_api.transaction():
@ -206,3 +207,20 @@ class DefaultEngine(object):
db_api.create_function_worker(worker)
LOG.info('Finished scaling up function %s.', function_id)
def scaledown_function(self, ctx, function_id, count=1):
func_db = db_api.get_function(function_id)
worker_deleted_num = (
count if len(func_db.workers) > count else len(func_db.workers) - 1
)
workers = func_db.workers[:worker_deleted_num]
with db_api.transaction():
for worker in workers:
LOG.debug('Removing worker %s', worker.worker_name)
self.orchestrator.delete_worker(
worker.worker_name,
)
db_api.delete_function_worker(worker.worker_name)
LOG.info('Finished scaling up function %s.', function_id)

4
qinling/orchestrator/base.py

@ -54,6 +54,10 @@ class OrchestratorBase(object):
def scaleup_function(self, function_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def delete_worker(self, worker_name, **kwargs):
raise NotImplementedError
def load_orchestrator(conf):
global ORCHESTRATOR

13
qinling/orchestrator/kubernetes/manager.py

@ -407,14 +407,14 @@ class KubernetesManager(base.OrchestratorBase):
label_selector=selector
)
LOG.info("Pod for function %s deleted.", function_id)
LOG.info("Pod(s) for function %s deleted.", function_id)
def scaleup_function(self, function_id, identifier=None,
entry='main.main'):
entry='main.main', count=1):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pod(
labels, count=self.conf.kubernetes.scale_step
labels, count=count
)
if not pods:
@ -446,3 +446,10 @@ class KubernetesManager(base.OrchestratorBase):
LOG.info('Pods scaled up for function %s: %s', function_id, pod_names)
return pod_names
def delete_worker(self, worker_name, **kwargs):
self.v1.delete_namespaced_pod(
worker_name,
self.conf.kubernetes.namespace,
{}
)

12
qinling/rpc.py

@ -188,10 +188,20 @@ class EngineClient(object):
)
@wrap_messaging_exception
def scaleup_function(self, id, runtime_id):
def scaleup_function(self, id, runtime_id, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaleup_function',
function_id=id,
runtime_id=runtime_id,
count=count
)
@wrap_messaging_exception
def scaledown_function(self, id, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaledown_function',
function_id=id,
count=count
)

3
qinling/services/periodics.py

@ -50,6 +50,9 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator):
return
for func_db in results:
if not func_db.service:
continue
with db_api.transaction():
LOG.info(
'Deleting service mapping and workers for function %s',

8
qinling_tempest_plugin/pre_test_hook.sh

@ -20,7 +20,9 @@ export localconf=$BASE/new/devstack/local.conf
export QINLING_CONF=/etc/qinling/qinling.conf
# Install k8s cluster
bash $BASE/new/qinling/tools/gate/setup_gate.sh
pushd $BASE/new/qinling/
bash tools/gate/setup_gate.sh
popd
echo -e '[[post-config|$QINLING_CONF]]\n[kubernetes]\n' >> $localconf
echo -e 'qinling_service_address=${DEFAULT_HOST_IP}\n' >> $localconf
echo -e "[[post-config|$QINLING_CONF]]\n[kubernetes]\n" >> $localconf
echo -e "qinling_service_address=${DEFAULT_HOST_IP}\n" >> $localconf

3
setup.cfg

@ -21,6 +21,7 @@ classifier =
[files]
packages =
qinling
qinling_tempest_plugin
[entry_points]
console_scripts =
@ -37,7 +38,7 @@ oslo.config.opts =
qinling.config = qinling.config:list_opts
tempest.test_plugins =
qinling_test = qinling_tempest_tests.plugin:QinlingTempestPlugin
qinling_test = qinling_tempest_plugin.plugin:QinlingTempestPlugin
[build_sphinx]
all-files = 1

3
tools/gate/funcs/common.sh

@ -20,7 +20,8 @@ function base_install {
iptables \
ipcalc \
nmap \
lshw
lshw \
screen
elif [ "x$HOST_OS" == "xcentos" ]; then
sudo yum install -y \
epel-release

2
tools/gate/setup_gate.sh

@ -39,5 +39,5 @@ bash ${WORK_DIR}/tools/gate/kubeadm_aio.sh
# Starts a proxy to the Kubernetes API server in a screen session
sudo screen -S kube_proxy -X quit || true
sudo screen -dmS kube_proxy && screen -S kube_proxy -X screen -t kube_proxy
sudo screen -dmS kube_proxy && sudo screen -S kube_proxy -X screen -t kube_proxy
sudo screen -S kube_proxy -p kube_proxy -X stuff 'kubectl proxy --accept-hosts=".*" --address="0.0.0.0"\n'

Loading…
Cancel
Save