Merge "Support to get pool information for the runtime"
This commit is contained in:
commit
48bfa1314b
|
@ -7,6 +7,7 @@
|
|||
"runtime:create": "rule:context_is_admin",
|
||||
"runtime:update": "rule:context_is_admin",
|
||||
"runtime:delete": "rule:context_is_admin",
|
||||
"runtime_pool:get_all": "rule:context_is_admin",
|
||||
|
||||
"function:get_all:all_projects": "rule:context_is_admin",
|
||||
"function_worker:get_all": "rule:context_is_admin",
|
||||
|
|
|
@ -282,6 +282,16 @@ class Runtimes(ResourceList):
|
|||
return sample
|
||||
|
||||
|
||||
class RuntimePoolCapacity(Resource):
|
||||
total = wsme.wsattr(int, readonly=True)
|
||||
available = wsme.wsattr(int, readonly=True)
|
||||
|
||||
|
||||
class RuntimePool(Resource):
|
||||
name = wsme.wsattr(wtypes.text, readonly=True)
|
||||
capacity = wsme.wsattr(RuntimePoolCapacity, readonly=True)
|
||||
|
||||
|
||||
class Execution(Resource):
|
||||
id = types.uuid
|
||||
function_id = wsme.wsattr(types.uuid, mandatory=True)
|
||||
|
|
|
@ -34,6 +34,8 @@ UPDATE_ALLOWED = set(['name', 'description', 'image'])
|
|||
|
||||
|
||||
class RuntimesController(rest.RestController):
|
||||
_custom_actions = {'pool': ['GET']}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.engine_client = rpc.get_engine_client()
|
||||
self.type = 'runtime'
|
||||
|
@ -162,3 +164,24 @@ class RuntimesController(rest.RestController):
|
|||
runtime_db = db_api.update_runtime(id, values)
|
||||
|
||||
return resources.Runtime.from_db_obj(runtime_db)
|
||||
|
||||
@rest_utils.wrap_wsme_controller_exception
|
||||
@wsme_pecan.wsexpose(resources.RuntimePool, types.uuid)
|
||||
def pool(self, id):
|
||||
"""Get the pool information for the runtime.
|
||||
|
||||
This operation should be admin only.
|
||||
|
||||
We don't check the runtime existence, because this function
|
||||
also helps us to check the underlying pool even after the runtime
|
||||
is already deleted.
|
||||
"""
|
||||
acl.enforce('runtime_pool:get_all', context.get_ctx())
|
||||
|
||||
LOG.info("Getting pool information for runtime %s.", id)
|
||||
capacity = self.engine_client.get_runtime_pool(id)
|
||||
pool_capacity = resources.RuntimePoolCapacity.from_dict(capacity)
|
||||
|
||||
return resources.RuntimePool.from_dict(
|
||||
{"name": id, "capacity": pool_capacity}
|
||||
)
|
||||
|
|
|
@ -84,6 +84,11 @@ class DefaultEngine(object):
|
|||
|
||||
LOG.info('Rollbacked runtime %s.', runtime_id)
|
||||
|
||||
def get_runtime_pool(self, ctx, runtime_id):
|
||||
LOG.info("Getting pool information for runtime %s", runtime_id)
|
||||
|
||||
return self.orchestrator.get_pool(runtime_id)
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_fixed(1),
|
||||
stop=tenacity.stop_after_attempt(30),
|
||||
|
|
|
@ -84,6 +84,11 @@ class DBEntityNotFoundError(DBError):
|
|||
message = "Object not found"
|
||||
|
||||
|
||||
class RuntimeNotFoundException(QinlingException):
|
||||
http_code = 404
|
||||
message = "Runtime not found"
|
||||
|
||||
|
||||
class ApplicationContextNotFoundException(QinlingException):
|
||||
http_code = 400
|
||||
message = "Application context not found"
|
||||
|
|
|
@ -38,6 +38,10 @@ class OrchestratorBase(object):
|
|||
def update_pool(self, name, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_pool(self, name, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepare_execution(self, function_id, function_version, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -103,6 +103,33 @@ class KubernetesManager(base.OrchestratorBase):
|
|||
|
||||
return ret.status.replicas == ret.status.available_replicas
|
||||
|
||||
def get_pool(self, name):
|
||||
total = 0
|
||||
available = 0
|
||||
|
||||
try:
|
||||
ret = self.v1extension.read_namespaced_deployment(
|
||||
name,
|
||||
namespace=self.conf.kubernetes.namespace
|
||||
)
|
||||
except Exception:
|
||||
raise exc.RuntimeNotFoundException()
|
||||
|
||||
if not ret.status.replicas:
|
||||
return {"total": total, "available": available}
|
||||
|
||||
total = ret.status.replicas
|
||||
|
||||
labels = {'runtime_id': name}
|
||||
selector = common.convert_dict_to_string(labels)
|
||||
ret = self.v1.list_namespaced_pod(
|
||||
self.conf.kubernetes.namespace,
|
||||
label_selector='!function_id,%s' % selector
|
||||
)
|
||||
available = len(ret.items)
|
||||
|
||||
return {"total": total, "available": available}
|
||||
|
||||
def create_pool(self, name, image):
|
||||
deployment_body = self.deployment_template.render(
|
||||
{
|
||||
|
|
|
@ -152,6 +152,14 @@ class EngineClient(object):
|
|||
pre_image=pre_image
|
||||
)
|
||||
|
||||
@wrap_messaging_exception
|
||||
def get_runtime_pool(self, runtime_id):
|
||||
return self._client.prepare(topic=self.topic, server=None).call(
|
||||
ctx.get_ctx(),
|
||||
'get_runtime_pool',
|
||||
runtime_id=runtime_id
|
||||
)
|
||||
|
||||
@wrap_messaging_exception
|
||||
def create_execution(self, execution_id, function_id, version, runtime_id,
|
||||
input=None, is_sync=True):
|
||||
|
|
|
@ -156,3 +156,20 @@ class TestRuntimeController(base.APITest):
|
|||
self.assertEqual(403, resp.status_int)
|
||||
mock_update_runtime.assert_not_called()
|
||||
mock_etcd_url.assert_called_once_with(function_id)
|
||||
|
||||
@mock.patch('qinling.rpc.EngineClient.get_runtime_pool')
|
||||
def test_get_runtime_pool(self, mock_get_pool):
|
||||
mock_get_pool.return_value = {"total": 3, "available": 2}
|
||||
|
||||
resp = self.app.get('/v1/runtimes/%s/pool' % self.runtime_id)
|
||||
|
||||
expected = {
|
||||
"capacity": {
|
||||
"available": 2,
|
||||
"total": 3
|
||||
},
|
||||
"name": self.runtime_id
|
||||
}
|
||||
|
||||
self.assertEqual(200, resp.status_int)
|
||||
self.assertEqual(expected, resp.json)
|
||||
|
|
|
@ -562,3 +562,11 @@ class TestDefaultEngine(base.DbTestCase):
|
|||
]
|
||||
etcd_util_delete_workers_mock.assert_has_calls(expected)
|
||||
self.assertEqual(3, etcd_util_delete_workers_mock.call_count)
|
||||
|
||||
def test_get_runtime_pool(self):
|
||||
runtime = self.create_runtime()
|
||||
runtime_id = runtime.id
|
||||
|
||||
self.default_engine.get_runtime_pool(mock.Mock(), runtime_id)
|
||||
|
||||
self.orchestrator.get_pool.assert_called_once_with(runtime_id)
|
||||
|
|
|
@ -48,8 +48,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
'qinling.orchestrator.kubernetes.utils.get_k8s_clients',
|
||||
return_value=clients
|
||||
).start()
|
||||
self.fake_namespace = self.rand_name('namespace',
|
||||
prefix='TestKubernetesManager')
|
||||
self.fake_namespace = self.rand_name('namespace', prefix=self.prefix)
|
||||
self.override_config('namespace', self.fake_namespace,
|
||||
config.KUBERNETES_GROUP)
|
||||
|
||||
|
@ -140,9 +139,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
fake_replicas = 5
|
||||
self.override_config('replicas', fake_replicas,
|
||||
config.KUBERNETES_GROUP)
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
fake_image = self.rand_name('image', prefix=self.prefix)
|
||||
|
||||
self.manager.create_pool(fake_deployment_name, fake_image)
|
||||
|
||||
|
@ -174,9 +172,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
self.k8s_v1_ext.read_namespaced_deployment.side_effect = [
|
||||
ret1, ret2, ret3
|
||||
]
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
fake_image = self.rand_name('image', prefix=self.prefix)
|
||||
|
||||
self.manager.create_pool(fake_deployment_name, fake_image)
|
||||
|
||||
|
@ -188,9 +185,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
ret = mock.Mock()
|
||||
ret.status.replicas = 0
|
||||
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
fake_image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
fake_image = self.rand_name('image', prefix=self.prefix)
|
||||
|
||||
self.manager.create_pool(fake_deployment_name, fake_image)
|
||||
|
||||
|
@ -201,16 +197,15 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
def test_delete_pool(self):
|
||||
# Deleting namespaced service is also tested in this.
|
||||
svc1 = mock.Mock()
|
||||
svc1_name = self.rand_name('service', prefix='TestKubernetesManager')
|
||||
svc1_name = self.rand_name('service', prefix=self.prefix)
|
||||
svc1.metadata.name = svc1_name
|
||||
svc2 = mock.Mock()
|
||||
svc2_name = self.rand_name('service', prefix='TestKubernetesManager')
|
||||
svc2_name = self.rand_name('service', prefix=self.prefix)
|
||||
svc2.metadata.name = svc2_name
|
||||
services = mock.Mock()
|
||||
services.items = [svc1, svc2]
|
||||
self.k8s_v1_api.list_namespaced_service.return_value = services
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
|
||||
self.manager.delete_pool(fake_deployment_name)
|
||||
|
||||
|
@ -237,9 +232,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
label_selector='runtime_id=%s' % fake_deployment_name)
|
||||
|
||||
def test_update_pool(self):
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
body = {
|
||||
'spec': {
|
||||
'template': {
|
||||
|
@ -269,9 +263,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
self.fake_namespace)
|
||||
|
||||
def test_update_pool_retry(self):
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
ret1 = mock.Mock()
|
||||
ret1.status.unavailable_replicas = 1
|
||||
ret2 = mock.Mock()
|
||||
|
@ -289,9 +282,8 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
self.assertEqual(2, read_status.call_count)
|
||||
|
||||
def test_update_pool_rollback(self):
|
||||
fake_deployment_name = self.rand_name('deployment',
|
||||
prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
ret = mock.Mock()
|
||||
ret.status.unavailable_replicas = 1
|
||||
self.k8s_v1_ext.read_namespaced_deployment_status.return_value = ret
|
||||
|
@ -314,10 +306,37 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
rollback.assert_called_once_with(
|
||||
fake_deployment_name, self.fake_namespace, rollback_body)
|
||||
|
||||
def test_get_pool(self):
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
|
||||
ret = mock.Mock()
|
||||
ret.status.replicas = 3
|
||||
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
|
||||
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [mock.Mock()]
|
||||
self.k8s_v1_api.list_namespaced_pod.return_value = list_pod_ret
|
||||
|
||||
pool_info = self.manager.get_pool(fake_deployment_name)
|
||||
|
||||
expected = {"total": 3, "available": 1}
|
||||
self.assertEqual(expected, pool_info)
|
||||
|
||||
def test_get_pool_not_ready(self):
|
||||
fake_deployment_name = self.rand_name('deployment', prefix=self.prefix)
|
||||
|
||||
ret = mock.Mock()
|
||||
ret.status.replicas = None
|
||||
self.k8s_v1_ext.read_namespaced_deployment.return_value = ret
|
||||
|
||||
pool_info = self.manager.get_pool(fake_deployment_name)
|
||||
|
||||
expected = {"total": 0, "available": 0}
|
||||
self.assertEqual(expected, pool_info)
|
||||
|
||||
def test_prepare_execution_no_image(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -371,7 +390,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_with_image(self):
|
||||
function_id = common.generate_unicode_uuid()
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
identifier = ('%s-%s' %
|
||||
(common.generate_unicode_uuid(dashed=False), function_id)
|
||||
)[:63]
|
||||
|
@ -401,7 +420,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_with_image_function_input(self):
|
||||
function_id = common.generate_unicode_uuid()
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
identifier = ('%s-%s' % (
|
||||
common.generate_unicode_uuid(dashed=False),
|
||||
function_id)
|
||||
|
@ -430,7 +449,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_with_image_json_input(self):
|
||||
function_id = common.generate_unicode_uuid()
|
||||
image = self.rand_name('image', prefix='TestKubernetesManager')
|
||||
image = self.rand_name('image', prefix=self.prefix)
|
||||
identifier = ('%s-%s' % (
|
||||
common.generate_unicode_uuid(dashed=False),
|
||||
function_id)
|
||||
|
@ -489,8 +508,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_service_already_exists(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -517,8 +535,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_create_service_failed(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = None
|
||||
ret_pods = mock.Mock()
|
||||
ret_pods.items = [pod]
|
||||
|
@ -551,8 +568,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_prepare_execution_service_internal_ip(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -660,10 +676,10 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
def test_delete_function(self):
|
||||
# Deleting namespaced service is also tested in this.
|
||||
svc1 = mock.Mock()
|
||||
svc1_name = self.rand_name('service', prefix='TestKubernetesManager')
|
||||
svc1_name = self.rand_name('service', prefix=self.prefix)
|
||||
svc1.metadata.name = svc1_name
|
||||
svc2 = mock.Mock()
|
||||
svc2_name = self.rand_name('service', prefix='TestKubernetesManager')
|
||||
svc2_name = self.rand_name('service', prefix=self.prefix)
|
||||
svc2.metadata.name = svc2_name
|
||||
services = mock.Mock()
|
||||
services.items = [svc1, svc2]
|
||||
|
@ -724,8 +740,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_scaleup_function(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -791,8 +806,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_scaleup_function_service_already_exists(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = {'pod1_key1': 'pod1_value1'}
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -818,8 +832,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_scaleup_function_service_create_failed(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = None
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -837,8 +850,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
|
||||
def test_scaleup_function_service_internal_ip(self):
|
||||
pod = mock.Mock()
|
||||
pod.metadata.name = self.rand_name('pod',
|
||||
prefix='TestKubernetesManager')
|
||||
pod.metadata.name = self.rand_name('pod', prefix=self.prefix)
|
||||
pod.metadata.labels = None
|
||||
list_pod_ret = mock.Mock()
|
||||
list_pod_ret.items = [pod]
|
||||
|
@ -861,7 +873,7 @@ class TestKubernetesManager(base.DbTestCase):
|
|||
service_url)
|
||||
|
||||
def test_delete_worker(self):
|
||||
pod_name = self.rand_name('pod', prefix='TestKubernetesManager')
|
||||
pod_name = self.rand_name('pod', prefix=self.prefix)
|
||||
|
||||
self.manager.delete_worker(pod_name)
|
||||
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- Add an administrative operation for getting the pool information for the
|
||||
runtime, so that the admin user can check the capacity of the runtime and
|
||||
scale up or scale down the pool accordingly.
|
Loading…
Reference in New Issue