Browse Source

Function version API: scale_up/scale_down/detach/get_workers

This patch adds some other API for function version:

- scale_up function version workers
- scale_down function version workers
- detach function version from workers
- get all workers for function version

This patch also fixes a unit test bug that job handler should be
disabled for api tests.

Change-Id: Ibd8e195bf8fa0f878cfe0bd8f777c820f4b2f24b
Story: 2001829
Task: 14438
changes/03/564203/13
Lingxian Kong 3 years ago
parent
commit
08ded02123
10 changed files with 292 additions and 87 deletions
  1. +4
    -0
      etc/policy.json.sample
  2. +11
    -5
      qinling/api/controllers/v1/function.py
  3. +98
    -0
      qinling/api/controllers/v1/function_version.py
  4. +2
    -1
      qinling/api/controllers/v1/resources.py
  5. +0
    -6
      qinling/db/sqlalchemy/models.py
  6. +49
    -55
      qinling/services/periodics.py
  7. +4
    -0
      qinling/tests/unit/api/base.py
  8. +72
    -2
      qinling/tests/unit/api/controllers/v1/test_function_version.py
  9. +33
    -0
      qinling/tests/unit/api/controllers/v1/test_function_worker.py
  10. +19
    -18
      qinling/utils/executions.py

+ 4
- 0
etc/policy.json.sample View File

@ -14,6 +14,10 @@
"function:scale_down": "rule:context_is_admin",
"function:detach": "rule:context_is_admin",
"function_version:scale_up": "rule:context_is_admin",
"function_version:scale_down": "rule:context_is_admin",
"function_version:detach": "rule:context_is_admin",
"execution:get_all:all_projects": "rule:context_is_admin",
"webhook:get_all:all_projects": "rule:context_is_admin",


+ 11
- 5
qinling/api/controllers/v1/function.py View File

@ -52,15 +52,21 @@ UPDATE_ALLOWED = set(['name', 'description', 'code', 'package', 'entry',
class FunctionWorkerController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.FunctionWorkers, types.uuid)
def get_all(self, function_id):
@wsme_pecan.wsexpose(resources.FunctionWorkers, types.uuid, int)
def get_all(self, function_id, function_version=0):
acl.enforce('function_worker:get_all', context.get_ctx())
LOG.info("Get workers for function %s.", function_id)
workers = etcd_util.get_workers(function_id)
LOG.info("Getting workers for function %s(version %s).", function_id,
function_version)
workers = etcd_util.get_workers(function_id, version=function_version)
workers = [
resources.FunctionWorker.from_dict(
{'function_id': function_id, 'worker_name': w}
{
'function_id': function_id,
'function_version': function_version,
'worker_name': w
}
) for w in workers
]


+ 98
- 0
qinling/api/controllers/v1/function_version.py View File

@ -29,6 +29,7 @@ from qinling.api.controllers.v1 import types
from qinling import context
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import rpc
from qinling.storage import base as storage_base
from qinling.utils import constants
from qinling.utils import etcd_util
@ -39,9 +40,16 @@ CONF = cfg.CONF
class FunctionVersionsController(rest.RestController):
_custom_actions = {
'scale_up': ['POST'],
'scale_down': ['POST'],
'detach': ['POST'],
}
def __init__(self, *args, **kwargs):
self.type = 'function_version'
self.storage_provider = storage_base.load_storage_provider(CONF)
self.engine_client = rpc.get_engine_client()
super(FunctionVersionsController, self).__init__(*args, **kwargs)
@ -225,3 +233,93 @@ class FunctionVersionsController(rest.RestController):
version_db.function.latest_version = latest_version - 1
LOG.info("Version %s of function %s deleted.", version, function_id)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
None,
types.uuid,
int,
body=resources.ScaleInfo,
status_code=202
)
def scale_up(self, function_id, version, scale):
"""Scale up the workers for function version execution.
This is admin only operation. The load monitoring of execution
depends on the monitoring solution of underlying orchestrator.
"""
acl.enforce('function_version:scale_up', context.get_ctx())
func_db = db_api.get_function(function_id)
# If version=0, it's equivalent to /functions/<funcion-id>/scale_up
if version > 0:
db_api.get_function_version(function_id, version)
params = scale.to_dict()
LOG.info('Starting to scale up function %s(version %s), params: %s',
function_id, version, params)
self.engine_client.scaleup_function(
function_id,
runtime_id=func_db.runtime_id,
version=version,
count=params['count']
)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
None,
types.uuid,
int,
body=resources.ScaleInfo,
status_code=202
)
def scale_down(self, function_id, version, scale):
"""Scale down the workers for function version execution.
This is admin only operation. The load monitoring of execution
depends on the monitoring solution of underlying orchestrator.
"""
acl.enforce('function_version:scale_down', context.get_ctx())
db_api.get_function(function_id)
params = scale.to_dict()
# If version=0, it's equivalent to /functions/<funcion-id>/scale_down
if version > 0:
db_api.get_function_version(function_id, version)
workers = etcd_util.get_workers(function_id, version=version)
if len(workers) <= 1:
LOG.info('No need to scale down function %s(version)', function_id,
version)
return
LOG.info('Starting to scale down function %s(version), params: %s',
function_id, version, params)
self.engine_client.scaledown_function(function_id, version=version,
count=params['count'])
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, int, status_code=202)
def detach(self, function_id, version):
"""Detach the function version from its underlying workers.
This is admin only operation, which gives admin user a safe way to
clean up the underlying resources allocated for the function version.
"""
acl.enforce('function_version:detach', context.get_ctx())
db_api.get_function(function_id)
# If version=0, it's equivalent to /functions/<funcion-id>/detach
if version > 0:
db_api.get_function_version(function_id, version)
LOG.info('Starting to detach function %s(version)', function_id,
version)
# Delete allocated resources in orchestrator and etcd keys.
self.engine_client.delete_function(function_id, version=version)
etcd_util.delete_function(function_id, version=version)

+ 2
- 1
qinling/api/controllers/v1/resources.py View File

@ -219,7 +219,8 @@ class Functions(ResourceList):
class FunctionWorker(Resource):
function_id = wsme.wsattr(types.uuid, readonly=True)
worker_name = wtypes.text
function_version = wsme.wsattr(int, readonly=True)
worker_name = wsme.wsattr(wtypes.text, readonly=True)
class FunctionWorkers(ResourceList):


+ 0
- 6
qinling/db/sqlalchemy/models.py View File

@ -38,10 +38,6 @@ class Function(model_base.QinlingSecureModelBase):
runtime_id = sa.Column(
sa.String(36), sa.ForeignKey(Runtime.id), nullable=True
)
# We want to get runtime info when we query function
runtime = relationship(
'Runtime', back_populates="functions", innerjoin=True, lazy='select'
)
cpu = sa.Column(sa.Integer, default=0)
memory_size = sa.Column(sa.Integer, default=0)
timeout = sa.Column(sa.Integer)
@ -123,8 +119,6 @@ class FunctionVersion(model_base.QinlingSecureModelBase):
count = sa.Column(sa.Integer, default=0)
Runtime.functions = relationship("Function", back_populates="runtime")
# Only get running jobs
Function.jobs = relationship(
"Job",


+ 49
- 55
qinling/services/periodics.py View File

@ -48,65 +48,59 @@ def handle_function_service_expiration(ctx, engine):
delta = timedelta(seconds=CONF.engine.function_service_expiration)
expiry_time = datetime.utcnow() - delta
with db_api.transaction():
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
latest_version=0
)
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
latest_version=0
)
for func_db in results:
if not etcd_util.get_service_url(func_db.id, 0):
continue
for func_db in results:
if not etcd_util.get_service_url(func_db.id, 0):
continue
LOG.info(
'Deleting service mapping and workers for function '
'%s(version 0)',
func_db.id
)
LOG.info(
'Deleting service mapping and workers for function '
'%s(version 0)',
func_db.id
)
# Delete resources related to the function
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id, 0)
# Delete resources related to the function
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id, 0)
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
LOG.info(
'Deleting service mapping and workers for function '
'%s(version %s)',
v.function_id, v.version_number
)
LOG.info(
'Deleting service mapping and workers for function '
'%s(version %s)',
v.function_id, v.version_number
)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
@periodics.periodic(3)
def handle_job(engine_client):
"""Execute job task with no db transactions.
We don't do iterations on jobs_db directly to avoid the potential
'Cursor needed to be reset' error.
"""
"""Execute job task with no db transactions."""
jobs_db = db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3))
jobs_dict = [j.to_dict() for j in jobs_db]
for job in jobs_dict:
job_id = job["id"]
func_id = job["function_id"]
func_version = job["function_version"]
for job in jobs_db:
job_id = job.id
func_id = job.function_id
func_version = job.function_version
LOG.debug("Processing job: %s, function: %s(version %s)", job_id,
func_id, func_version)
@ -116,16 +110,16 @@ def handle_job(engine_client):
try:
# Setup context before schedule job.
ctx = keystone_utils.create_trust_context(
trust_id, job["project_id"]
trust_id, job.project_id
)
context.set_ctx(ctx)
if (job["count"] is not None and job["count"] > 0):
job["count"] -= 1
if (job.count is not None and job.count > 0):
job.count -= 1
# Job delete/update is done using UPDATE ... FROM ... WHERE
# non-locking clause.
if job["count"] == 0:
if job.count == 0:
modified = db_api.conditional_update(
models.Job,
{
@ -140,19 +134,19 @@ def handle_job(engine_client):
)
else:
next_time = jobs.get_next_execution_time(
job["pattern"],
job["next_execution_time"]
job.pattern,
job.next_execution_time
)
modified = db_api.conditional_update(
models.Job,
{
'next_execution_time': next_time,
'count': job["count"]
'count': job.count
},
{
'id': job_id,
'next_execution_time': job["next_execution_time"]
'next_execution_time': job.next_execution_time
},
insecure=True,
)
@ -172,7 +166,7 @@ def handle_job(engine_client):
params = {
'function_id': func_id,
'function_version': func_version,
'input': job["function_input"],
'input': job.function_input,
'sync': False,
'description': constants.EXECUTION_BY_JOB % job_id
}


+ 4
- 0
qinling/tests/unit/api/base.py View File

@ -38,6 +38,10 @@ class APITest(base.DbTestCase):
# Disable authentication by default for API tests.
self.override_config('auth_enable', False, group='pecan')
# Disable job handler. The following pecan app instantiation will
# invoke qinling.api.app:setup_app()
self.override_config('enable_job_handler', False, group='api')
pecan_opts = CONF.pecan
self.app = pecan.testing.load_test_app({
'app': {


+ 72
- 2
qinling/tests/unit/api/controllers/v1/test_function_version.py View File

@ -14,6 +14,7 @@
from datetime import datetime
from datetime import timedelta
import json
import mock
@ -28,8 +29,8 @@ class TestFunctionVersionController(base.APITest):
def setUp(self):
super(TestFunctionVersionController, self).setUp()
db_func = self.create_function()
self.func_id = db_func.id
self.db_func = self.create_function()
self.func_id = self.db_func.id
@mock.patch('qinling.storage.file_system.FileSystemStorage.copy')
@mock.patch('qinling.storage.file_system.FileSystemStorage.changed_since')
@ -168,3 +169,72 @@ class TestFunctionVersionController(base.APITest):
)
self.assertEqual(403, resp.status_int)
@mock.patch('qinling.rpc.EngineClient.scaleup_function')
def test_scale_up(self, scaleup_function_mock):
db_api.increase_function_version(self.func_id, 0)
body = {'count': 1}
resp = self.app.post(
'/v1/functions/%s/versions/1/scale_up' % self.func_id,
params=json.dumps(body),
content_type='application/json'
)
self.assertEqual(202, resp.status_int)
scaleup_function_mock.assert_called_once_with(
self.func_id,
runtime_id=self.db_func.runtime_id,
version=1,
count=1
)
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.rpc.EngineClient.scaledown_function')
def test_scale_down(self, scaledown_function_mock, get_workers_mock):
db_api.increase_function_version(self.func_id, 0)
get_workers_mock.return_value = [mock.Mock(), mock.Mock()]
body = {'count': 1}
resp = self.app.post(
'/v1/functions/%s/versions/1/scale_down' % self.func_id,
params=json.dumps(body),
content_type='application/json'
)
self.assertEqual(202, resp.status_int)
scaledown_function_mock.assert_called_once_with(self.func_id,
version=1, count=1)
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.rpc.EngineClient.scaledown_function')
def test_scale_down_no_need(self, scaledown_function_mock,
get_workers_mock):
db_api.increase_function_version(self.func_id, 0)
get_workers_mock.return_value = [mock.Mock()]
body = {'count': 1}
resp = self.app.post(
'/v1/functions/%s/versions/1/scale_down' % self.func_id,
params=json.dumps(body),
content_type='application/json'
)
self.assertEqual(202, resp.status_int)
scaledown_function_mock.assert_not_called()
@mock.patch('qinling.utils.etcd_util.delete_function')
@mock.patch('qinling.rpc.EngineClient.delete_function')
def test_detach(self, engine_delete_function_mock,
etcd_delete_function_mock):
db_api.increase_function_version(self.func_id, 0)
resp = self.app.post(
'/v1/functions/%s/versions/1/detach' % self.func_id
)
self.assertEqual(202, resp.status_int)
engine_delete_function_mock.assert_called_once_with(self.func_id,
version=1)
etcd_delete_function_mock.assert_called_once_with(self.func_id,
version=1)

+ 33
- 0
qinling/tests/unit/api/controllers/v1/test_function_worker.py View File

@ -35,3 +35,36 @@ class TestFunctionWorkerController(base.APITest):
self._assert_single_item(
resp.json['workers'], worker_name='test_worker1'
)
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_get_all_version_workers(self, mock_get_workers):
function_id = uuidutils.generate_uuid()
mock_get_workers.return_value = ['test_worker0', 'test_worker1']
resp = self.app.get(
'/v1/functions/%s/workers?function_version=1' % function_id
)
self.assertEqual(200, resp.status_int)
mock_get_workers.assert_called_once_with(function_id, version=1)
self._assert_multiple_items(
resp.json['workers'],
2,
function_id=function_id,
function_version=1
)
self._assert_single_item(
resp.json['workers'], worker_name='test_worker0'
)
self._assert_single_item(
resp.json['workers'], worker_name='test_worker1'
)
def test_get_all_version_workers_not_int(self):
function_id = uuidutils.generate_uuid()
resp = self.app.get(
'/v1/functions/%s/workers?function_version=invalid' % function_id,
expect_errors=True
)
self.assertEqual(400, resp.status_int)

+ 19
- 18
qinling/utils/executions.py View File

@ -75,30 +75,31 @@ def create_execution(engine_client, params):
input = params.get('input')
version = params.get('function_version', 0)
with db_api.transaction():
func_db = db_api.get_function(function_id)
runtime_id = func_db.runtime_id
func_db = db_api.get_function(function_id)
runtime_id = func_db.runtime_id
runtime_db = func_db.runtime
# Image type function does not need runtime
if runtime_id:
runtime_db = db_api.get_runtime(runtime_id)
if runtime_db and runtime_db.status != status.AVAILABLE:
raise exc.RuntimeNotAvailableException(
'Runtime %s is not available.' % func_db.runtime_id
)
if version > 0:
if func_db.code['source'] != constants.PACKAGE_FUNCTION:
raise exc.InputException(
"Can not specify version for %s type function." %
constants.PACKAGE_FUNCTION
)
# update version count
version_db = db_api.get_function_version(function_id, version)
pre_version_count = version_db.count
_update_function_version_db(version_db.id, pre_version_count)
else:
pre_count = func_db.count
_update_function_db(function_id, pre_count)
if version > 0:
if func_db.code['source'] != constants.PACKAGE_FUNCTION:
raise exc.InputException(
"Can not specify version for %s type function." %
constants.PACKAGE_FUNCTION
)
# update version count
version_db = db_api.get_function_version(function_id, version)
pre_version_count = version_db.count
_update_function_version_db(version_db.id, pre_version_count)
else:
pre_count = func_db.count
_update_function_db(function_id, pre_count)
# input in params should be a string.
if input:


Loading…
Cancel
Save