Support to get pool information for the runtime

Add an administrative operation for getting the pool information for the
runtime, so that the admin user can check the capacity of the runtime and
scale up or scale down the pool accordingly.

Change-Id: Iec4536396c1c31a9e545b09c5f46b46dc6d79ae6
Story: 2002969
Task: 22975
This commit is contained in:
Lingxian Kong 2018-07-13 23:12:58 +12:00
parent bcdbfb61be
commit 01d769fa62
12 changed files with 171 additions and 46 deletions

View File

@ -7,6 +7,7 @@
"runtime:create": "rule:context_is_admin", "runtime:create": "rule:context_is_admin",
"runtime:update": "rule:context_is_admin", "runtime:update": "rule:context_is_admin",
"runtime:delete": "rule:context_is_admin", "runtime:delete": "rule:context_is_admin",
"runtime_pool:get_all": "rule:context_is_admin",
"function:get_all:all_projects": "rule:context_is_admin", "function:get_all:all_projects": "rule:context_is_admin",
"function_worker:get_all": "rule:context_is_admin", "function_worker:get_all": "rule:context_is_admin",

View File

@ -282,6 +282,16 @@ class Runtimes(ResourceList):
return sample return sample
class RuntimePoolCapacity(Resource):
total = wsme.wsattr(int, readonly=True)
available = wsme.wsattr(int, readonly=True)
class RuntimePool(Resource):
name = wsme.wsattr(wtypes.text, readonly=True)
capacity = wsme.wsattr(RuntimePoolCapacity, readonly=True)
class Execution(Resource): class Execution(Resource):
id = types.uuid id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True) function_id = wsme.wsattr(types.uuid, mandatory=True)

View File

@ -34,6 +34,8 @@ UPDATE_ALLOWED = set(['name', 'description', 'image'])
class RuntimesController(rest.RestController): class RuntimesController(rest.RestController):
_custom_actions = {'pool': ['GET']}
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client() self.engine_client = rpc.get_engine_client()
self.type = 'runtime' self.type = 'runtime'
@ -162,3 +164,24 @@ class RuntimesController(rest.RestController):
runtime_db = db_api.update_runtime(id, values) runtime_db = db_api.update_runtime(id, values)
return resources.Runtime.from_db_obj(runtime_db) return resources.Runtime.from_db_obj(runtime_db)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.RuntimePool, types.uuid)
def pool(self, id):
"""Get the pool information for the runtime.
This operation should be admin only.
We don't check the runtime existence, because this function
also helps us to check the underlying pool even after the runtime
is already deleted.
"""
acl.enforce('runtime_pool:get_all', context.get_ctx())
LOG.info("Getting pool information for runtime %s.", id)
capacity = self.engine_client.get_runtime_pool(id)
pool_capacity = resources.RuntimePoolCapacity.from_dict(capacity)
return resources.RuntimePool.from_dict(
{"name": id, "capacity": pool_capacity}
)

View File

@ -84,6 +84,11 @@ class DefaultEngine(object):
LOG.info('Rollbacked runtime %s.', runtime_id) LOG.info('Rollbacked runtime %s.', runtime_id)
def get_runtime_pool(self, ctx, runtime_id):
LOG.info("Getting pool information for runtime %s", runtime_id)
return self.orchestrator.get_pool(runtime_id)
@tenacity.retry( @tenacity.retry(
wait=tenacity.wait_fixed(1), wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30), stop=tenacity.stop_after_attempt(30),

View File

@ -84,6 +84,11 @@ class DBEntityNotFoundError(DBError):
message = "Object not found" message = "Object not found"
class RuntimeNotFoundException(QinlingException):
http_code = 404
message = "Runtime not found"
class ApplicationContextNotFoundException(QinlingException): class ApplicationContextNotFoundException(QinlingException):
http_code = 400 http_code = 400
message = "Application context not found" message = "Application context not found"

View File

@ -38,6 +38,10 @@ class OrchestratorBase(object):
def update_pool(self, name, **kwargs): def update_pool(self, name, **kwargs):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod
def get_pool(self, name, **kwargs):
raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def prepare_execution(self, function_id, function_version, **kwargs): def prepare_execution(self, function_id, function_version, **kwargs):
raise NotImplementedError raise NotImplementedError

View File

@ -103,6 +103,33 @@ class KubernetesManager(base.OrchestratorBase):
return ret.status.replicas == ret.status.available_replicas return ret.status.replicas == ret.status.available_replicas
def get_pool(self, name):
total = 0
available = 0
try:
ret = self.v1extension.read_namespaced_deployment(
name,
namespace=self.conf.kubernetes.namespace
)
except Exception:
raise exc.RuntimeNotFoundException()
if not ret.status.replicas:
return {"total": total, "available": available}
total = ret.status.replicas
labels = {'runtime_id': name}
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='!function_id,%s' % selector
)
available = len(ret.items)
return {"total": total, "available": available}
def create_pool(self, name, image): def create_pool(self, name, image):
deployment_body = self.deployment_template.render( deployment_body = self.deployment_template.render(
{ {

View File

@ -152,6 +152,14 @@ class EngineClient(object):
pre_image=pre_image pre_image=pre_image
) )
@wrap_messaging_exception
def get_runtime_pool(self, runtime_id):
return self._client.prepare(topic=self.topic, server=None).call(
ctx.get_ctx(),
'get_runtime_pool',
runtime_id=runtime_id
)
@wrap_messaging_exception @wrap_messaging_exception
def create_execution(self, execution_id, function_id, version, runtime_id, def create_execution(self, execution_id, function_id, version, runtime_id,
input=None, is_sync=True): input=None, is_sync=True):

View File

@ -156,3 +156,20 @@ class TestRuntimeController(base.APITest):
self.assertEqual(403, resp.status_int) self.assertEqual(403, resp.status_int)
mock_update_runtime.assert_not_called() mock_update_runtime.assert_not_called()
mock_etcd_url.assert_called_once_with(function_id) mock_etcd_url.assert_called_once_with(function_id)
@mock.patch('qinling.rpc.EngineClient.get_runtime_pool')
def test_get_runtime_pool(self, mock_get_pool):
mock_get_pool.return_value = {"total": 3, "available": 2}
resp = self.app.get('/v1/runtimes/%s/pool' % self.runtime_id)
expected = {
"capacity": {
"available": 2,
"total": 3
},
"name": self.runtime_id
}
self.assertEqual(200, resp.status_int)
self.assertEqual(expected, resp.json)

View File

@ -562,3 +562,11 @@ class TestDefaultEngine(base.DbTestCase):
] ]
etcd_util_delete_workers_mock.assert_has_calls(expected) etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(3, etcd_util_delete_workers_mock.call_count) self.assertEqual(3, etcd_util_delete_workers_mock.call_count)
def test_get_runtime_pool(self):
runtime = self.create_runtime()
runtime_id = runtime.id
self.default_engine.get_runtime_pool(mock.Mock(), runtime_id)
self.orchestrator.get_pool.assert_called_once_with(runtime_id)

View File

@ -48,8 +48,7 @@ class TestKubernetesManager(base.DbTestCase):
'qinling.orchestrator.kubernetes.utils.get_k8s_clients', 'qinling.orchestrator.kubernetes.utils.get_k8s_clients',
return_value=clients return_value=clients
).start() ).start()
self.fake_namespace = self.rand_name('namespace', self.fake_namespace = self.rand_name('namespace', prefix=self.prefix)
prefix='TestKubernetesManager')
self.override_config('namespace', self.fake_namespace, self.override_config('namespace', self.fake_namespace,
config.KUBERNETES_GROUP) config.KUBERNETES_GROUP)
@ -140,9 +139,8 @@ class TestKubernetesManager(base.DbTestCase):
fake_replicas = 5 fake_replicas = 5
self.override_config('replicas', fake_replicas, self.override_config('replicas', fake_replicas,
config.KUBERNETES_GROUP) config.KUBERNETES_GROUP)
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') fake_image = self.rand_name('image', prefix=self.prefix)
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
self.manager.create_pool(fake_deployment_name, fake_image) self.manager.create_pool(fake_deployment_name, fake_image)
@ -174,9 +172,8 @@ class TestKubernetesManager(base.DbTestCase):
self.k8s_v1_ext.read_namespaced_deployment.side_effect = [ self.k8s_v1_ext.read_namespaced_deployment.side_effect = [
ret1, ret2, ret3 ret1, ret2, ret3
] ]
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') fake_image = self.rand_name('image', prefix=self.prefix)
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
self.manager.create_pool(fake_deployment_name, fake_image) self.manager.create_pool(fake_deployment_name, fake_image)
@ -188,9 +185,8 @@ class TestKubernetesManager(base.DbTestCase):
ret = mock.Mock() ret = mock.Mock()
ret.status.replicas = 0 ret.status.replicas = 0
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') fake_image = self.rand_name('image', prefix=self.prefix)
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
self.manager.create_pool(fake_deployment_name, fake_image) self.manager.create_pool(fake_deployment_name, fake_image)
@ -201,16 +197,15 @@ class TestKubernetesManager(base.DbTestCase):
def test_delete_pool(self): def test_delete_pool(self):
# Deleting namespaced service is also tested in this. # Deleting namespaced service is also tested in this.
svc1 = mock.Mock() svc1 = mock.Mock()
svc1_name = self.rand_name('service', prefix='TestKubernetesManager') svc1_name = self.rand_name('service', prefix=self.prefix)
svc1.metadata.name = svc1_name svc1.metadata.name = svc1_name
svc2 = mock.Mock() svc2 = mock.Mock()
svc2_name = self.rand_name('service', prefix='TestKubernetesManager') svc2_name = self.rand_name('service', prefix=self.prefix)
svc2.metadata.name = svc2_name svc2.metadata.name = svc2_name
services = mock.Mock() services = mock.Mock()
services.items = [svc1, svc2] services.items = [svc1, svc2]
self.k8s_v1_api.list_namespaced_service.return_value = services self.k8s_v1_api.list_namespaced_service.return_value = services
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager')
self.manager.delete_pool(fake_deployment_name) self.manager.delete_pool(fake_deployment_name)
@ -237,9 +232,8 @@ class TestKubernetesManager(base.DbTestCase):
label_selector='runtime_id=%s' % fake_deployment_name) label_selector='runtime_id=%s' % fake_deployment_name)
def test_update_pool(self): def test_update_pool(self):
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
image = self.rand_name('image', prefix='TestKubernetesManager')
body = { body = {
'spec': { 'spec': {
'template': { 'template': {
@ -269,9 +263,8 @@ class TestKubernetesManager(base.DbTestCase):
self.fake_namespace) self.fake_namespace)
def test_update_pool_retry(self): def test_update_pool_retry(self):
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
image = self.rand_name('image', prefix='TestKubernetesManager')
ret1 = mock.Mock() ret1 = mock.Mock()
ret1.status.unavailable_replicas = 1 ret1.status.unavailable_replicas = 1
ret2 = mock.Mock() ret2 = mock.Mock()
@ -289,9 +282,8 @@ class TestKubernetesManager(base.DbTestCase):
self.assertEqual(2, read_status.call_count) self.assertEqual(2, read_status.call_count)
def test_update_pool_rollback(self): def test_update_pool_rollback(self):
fake_deployment_name = self.rand_name('deployment', fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
image = self.rand_name('image', prefix='TestKubernetesManager')
ret = mock.Mock() ret = mock.Mock()
ret.status.unavailable_replicas = 1 ret.status.unavailable_replicas = 1
self.k8s_v1_ext.read_namespaced_deployment_status.return_value = ret self.k8s_v1_ext.read_namespaced_deployment_status.return_value = ret
@ -314,10 +306,37 @@ class TestKubernetesManager(base.DbTestCase):
rollback.assert_called_once_with( rollback.assert_called_once_with(
fake_deployment_name, self.fake_namespace, rollback_body) fake_deployment_name, self.fake_namespace, rollback_body)
def test_get_pool(self):
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
ret = mock.Mock()
ret.status.replicas = 3
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
list_pod_ret = mock.Mock()
list_pod_ret.items = [mock.Mock()]
self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret
pool_info = self.manager.get_pool(fake_deployment_name)
expected = {"total": 3, "available": 1}
self.assertEqual(expected, pool_info)
def test_get_pool_not_ready(self):
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
ret = mock.Mock()
ret.status.replicas = None
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
pool_info = self.manager.get_pool(fake_deployment_name)
expected = {"total": 0, "available": 0}
self.assertEqual(expected, pool_info)
def test_prepare_execution_no_image(self): def test_prepare_execution_no_image(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = {'pod1_key1': 'pod1_value1'} pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -371,7 +390,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_with_image(self): def test_prepare_execution_with_image(self):
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
image = self.rand_name('image', prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
identifier = ('%s-%s' % identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False), function_id) (common.generate_unicode_uuid(dashed=False), function_id)
)[:63] )[:63]
@ -401,7 +420,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_with_image_function_input(self): def test_prepare_execution_with_image_function_input(self):
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
image = self.rand_name('image', prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
identifier = ('%s-%s' % ( identifier = ('%s-%s' % (
common.generate_unicode_uuid(dashed=False), common.generate_unicode_uuid(dashed=False),
function_id) function_id)
@ -430,7 +449,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_with_image_json_input(self): def test_prepare_execution_with_image_json_input(self):
function_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid()
image = self.rand_name('image', prefix='TestKubernetesManager') image = self.rand_name('image', prefix=self.prefix)
identifier = ('%s-%s' % ( identifier = ('%s-%s' % (
common.generate_unicode_uuid(dashed=False), common.generate_unicode_uuid(dashed=False),
function_id) function_id)
@ -489,8 +508,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_service_already_exists(self): def test_prepare_execution_service_already_exists(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = {'pod1_key1': 'pod1_value1'} pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -517,8 +535,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_create_service_failed(self): def test_prepare_execution_create_service_failed(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = None pod.metadata.labels = None
ret_pods = mock.Mock() ret_pods = mock.Mock()
ret_pods.items = [pod] ret_pods.items = [pod]
@ -551,8 +568,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_prepare_execution_service_internal_ip(self): def test_prepare_execution_service_internal_ip(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = {'pod1_key1': 'pod1_value1'} pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -660,10 +676,10 @@ class TestKubernetesManager(base.DbTestCase):
def test_delete_function(self): def test_delete_function(self):
# Deleting namespaced service is also tested in this. # Deleting namespaced service is also tested in this.
svc1 = mock.Mock() svc1 = mock.Mock()
svc1_name = self.rand_name('service', prefix='TestKubernetesManager') svc1_name = self.rand_name('service', prefix=self.prefix)
svc1.metadata.name = svc1_name svc1.metadata.name = svc1_name
svc2 = mock.Mock() svc2 = mock.Mock()
svc2_name = self.rand_name('service', prefix='TestKubernetesManager') svc2_name = self.rand_name('service', prefix=self.prefix)
svc2.metadata.name = svc2_name svc2.metadata.name = svc2_name
services = mock.Mock() services = mock.Mock()
services.items = [svc1, svc2] services.items = [svc1, svc2]
@ -724,8 +740,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_scaleup_function(self): def test_scaleup_function(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = {'pod1_key1': 'pod1_value1'} pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -791,8 +806,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_scaleup_function_service_already_exists(self): def test_scaleup_function_service_already_exists(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = {'pod1_key1': 'pod1_value1'} pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -818,8 +832,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_scaleup_function_service_create_failed(self): def test_scaleup_function_service_create_failed(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = None pod.metadata.labels = None
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -837,8 +850,7 @@ class TestKubernetesManager(base.DbTestCase):
def test_scaleup_function_service_internal_ip(self): def test_scaleup_function_service_internal_ip(self):
pod = mock.Mock() pod = mock.Mock()
pod.metadata.name = self.rand_name('pod', pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
prefix='TestKubernetesManager')
pod.metadata.labels = None pod.metadata.labels = None
list_pod_ret = mock.Mock() list_pod_ret = mock.Mock()
list_pod_ret.items = [pod] list_pod_ret.items = [pod]
@ -861,7 +873,7 @@ class TestKubernetesManager(base.DbTestCase):
service_url) service_url)
def test_delete_worker(self): def test_delete_worker(self):
pod_name = self.rand_name('pod', prefix='TestKubernetesManager') pod_name = self.rand_name('pod', prefix=self.prefix)
self.manager.delete_worker(pod_name) self.manager.delete_worker(pod_name)

View File

@ -0,0 +1,5 @@
---
features:
- Add an administrative operation for getting the pool information for the
runtime, so that the admin user can check the capacity of the runtime and
scale up or scale down the pool accordingly.