From ff38280ac28ad088e527bf08201acf77bc09bd69 Mon Sep 17 00:00:00 2001 From: Jiangyuan Date: Wed, 25 Apr 2018 21:25:19 +0800 Subject: [PATCH] Runtime implementation for image type function Limit image type function resources by using customized cpu/mem saved in function database. And this patch is based on patch [0]. [0]: Change-Id: I7a245f93a445a00c2722238d3f94d3a960f16af4 Story: 2001586 Task: 14415 Change-Id: I1e86d5061e68eb584283b0d2f5482d4931d7e4f2 --- qinling/engine/default_engine.py | 5 +++ qinling/orchestrator/kubernetes/manager.py | 24 +++++++--- .../orchestrator/kubernetes/templates/pod.j2 | 8 ++-- .../tests/unit/engine/test_default_engine.py | 12 ++++- .../orchestrator/kubernetes/test_manager.py | 44 +++++++++++++------ 5 files changed, 68 insertions(+), 25 deletions(-) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 3b4d1a29..ba9d00ef 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -128,6 +128,10 @@ class DefaultEngine(object): function = db_api.get_function(function_id) source = function.code['source'] + rlimit = { + 'cpu': function.cpu, + 'memory_size': function.memory_size + } image = None identifier = None labels = None @@ -186,6 +190,7 @@ class DefaultEngine(object): _, svc_url = self.orchestrator.prepare_execution( function_id, function_version, + rlimit=rlimit, image=image, identifier=identifier, labels=labels, diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index a236c646..a529b0ab 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -324,7 +324,7 @@ class KubernetesManager(base.OrchestratorBase): return pod_name, pod_service_url - def _create_pod(self, image, pod_name, labels, input): + def _create_pod(self, image, rlimit, pod_name, labels, input): """Create pod for image type function.""" if not input: input_list = [] @@ -338,7 +338,11 @@ class KubernetesManager(base.OrchestratorBase): "pod_name": pod_name, "labels": labels, "pod_image": image, - "input": input_list + "input": input_list, + "req_cpu": str(rlimit['cpu']), + "limit_cpu": str(rlimit['cpu']), + "req_memory": str(rlimit['memory_size']), + "limit_memory": str(rlimit['memory_size']) } ) @@ -369,12 +373,14 @@ class KubernetesManager(base.OrchestratorBase): return pod_labels - def prepare_execution(self, function_id, version, image=None, + def prepare_execution(self, function_id, version, rlimit=None, image=None, identifier=None, labels=None, input=None): """Prepare service URL for function version. - For image function, create a single pod with input, so the function - will be executed. + :param rlimit: optional argument passed to limit cpu/mem resources. + + For image function, create a single pod with rlimit and input, so the + function will be executed in the resource limited pod. For normal function, choose a pod from the pool and expose a service, return the service URL. @@ -386,7 +392,13 @@ class KubernetesManager(base.OrchestratorBase): labels = labels or {'function_id': function_id} if image: - self._create_pod(image, identifier, labels, input) + if not rlimit: + LOG.critical('Param rlimit is required for image function.') + raise exc.OrchestratorException( + 'Execution preparation failed.' + ) + + self._create_pod(image, rlimit, identifier, labels, input) return identifier, None else: pods = self._choose_available_pods(labels, function_id=function_id, diff --git a/qinling/orchestrator/kubernetes/templates/pod.j2 b/qinling/orchestrator/kubernetes/templates/pod.j2 index 083f56f6..32a947f3 100644 --- a/qinling/orchestrator/kubernetes/templates/pod.j2 +++ b/qinling/orchestrator/kubernetes/templates/pod.j2 @@ -22,8 +22,8 @@ spec: restartPolicy: Never resources: limits: - cpu: "0.3" - memory: 128Mi + cpu: {{ limit_cpu }}m + memory: {{ limit_memory }} requests: - cpu: "0.1" - memory: 32Mi + cpu: {{ req_cpu }}m + memory: {{ req_memory }} diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index 19110596..ff37c2ad 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -13,6 +13,7 @@ # limitations under the License. import mock +from oslo_config import cfg from qinling.db import api as db_api from qinling.engine import default_engine @@ -31,6 +32,10 @@ class TestDefaultEngine(base.DbTestCase): self.default_engine = default_engine.DefaultEngine( self.orchestrator, self.qinling_endpoint ) + self.rlimit = { + 'cpu': cfg.CONF.resource_limits.default_cpu, + 'memory_size': cfg.CONF.resource_limits.default_memory + } def _create_running_executions(self, function_id, num): for _ in range(num): @@ -237,12 +242,14 @@ class TestDefaultEngine(base.DbTestCase): prepare_calls = [ mock.call(function_id, 0, + rlimit=self.rlimit, image=function.code['image'], identifier=mock.ANY, labels=None, input=None), mock.call(function_id, 0, + rlimit=self.rlimit, image=function.code['image'], identifier=mock.ANY, labels=None, @@ -345,8 +352,9 @@ class TestDefaultEngine(base.DbTestCase): function_id, 0, runtime_id) etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) self.orchestrator.prepare_execution.assert_called_once_with( - function_id, 0, image=None, identifier=runtime_id, - labels={'runtime_id': runtime_id}, input=None) + function_id, 0, rlimit=self.rlimit, image=None, + identifier=runtime_id, labels={'runtime_id': runtime_id}, + input=None) self.orchestrator.run_execution.assert_called_once_with( execution_id, function_id, 0, input=None, identifier=runtime_id, service_url='svc_url', entry=function.entry, diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 62b72355..ecb4667f 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -36,6 +36,10 @@ class TestKubernetesManager(base.DbTestCase): self.conf = CONF self.qinling_endpoint = 'http://127.0.0.1:7070' + self.rlimit = { + 'cpu': cfg.CONF.resource_limits.default_cpu, + 'memory_size': cfg.CONF.resource_limits.default_memory + } self.k8s_v1_api = mock.Mock() self.k8s_v1_ext = mock.Mock() clients = {'v1': self.k8s_v1_api, @@ -327,7 +331,7 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, 0, image=None, identifier=runtime_id, + function_id, 0, rlimit=None, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) self.assertEqual(pod.metadata.name, pod_names) @@ -372,7 +376,8 @@ class TestKubernetesManager(base.DbTestCase): )[:63] pod_name, url = self.manager.prepare_execution( - function_id, 0, image=image, identifier=identifier) + function_id, 0, rlimit=self.rlimit, image=image, + identifier=identifier) self.assertEqual(identifier, pod_name) self.assertIsNone(url) @@ -383,7 +388,11 @@ class TestKubernetesManager(base.DbTestCase): 'pod_name': identifier, 'labels': {'function_id': function_id}, 'pod_image': image, - 'input': [] + 'input': [], + 'req_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'req_memory': str(cfg.CONF.resource_limits.default_memory), + 'limit_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'limit_memory': str(cfg.CONF.resource_limits.default_memory) } ) self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( @@ -399,8 +408,8 @@ class TestKubernetesManager(base.DbTestCase): fake_input = {'__function_input': 'input_item1 input_item2'} pod_name, url = self.manager.prepare_execution( - function_id, 0, image=image, identifier=identifier, - input=fake_input) + function_id, 0, rlimit=self.rlimit, image=image, + identifier=identifier, input=fake_input) # in _create_pod pod_body = self.manager.pod_template.render( @@ -408,7 +417,11 @@ class TestKubernetesManager(base.DbTestCase): 'pod_name': identifier, 'labels': {'function_id': function_id}, 'pod_image': image, - 'input': ['input_item1', 'input_item2'] + 'input': ['input_item1', 'input_item2'], + 'req_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'req_memory': str(cfg.CONF.resource_limits.default_memory), + 'limit_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'limit_memory': str(cfg.CONF.resource_limits.default_memory) } ) self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( @@ -424,8 +437,8 @@ class TestKubernetesManager(base.DbTestCase): fake_input = '["input_item3", "input_item4"]' pod_name, url = self.manager.prepare_execution( - function_id, 0, image=image, identifier=identifier, - input=fake_input) + function_id, 0, rlimit=self.rlimit, image=image, + identifier=identifier, input=fake_input) # in _create_pod pod_body = self.manager.pod_template.render( @@ -433,7 +446,11 @@ class TestKubernetesManager(base.DbTestCase): 'pod_name': identifier, 'labels': {'function_id': function_id}, 'pod_image': image, - 'input': ['input_item3', 'input_item4'] + 'input': ['input_item3', 'input_item4'], + 'req_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'req_memory': str(cfg.CONF.resource_limits.default_memory), + 'limit_cpu': str(cfg.CONF.resource_limits.default_cpu), + 'limit_memory': str(cfg.CONF.resource_limits.default_memory) } ) self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( @@ -451,7 +468,8 @@ class TestKubernetesManager(base.DbTestCase): exc.OrchestratorException, "^Execution preparation failed\.$", self.manager.prepare_execution, - function_id, 0, image=None, identifier=runtime_id, labels=labels) + function_id, 0, rlimit=None, image=None, + identifier=runtime_id, labels=labels) # in _choose_available_pods list_calls = [ @@ -489,7 +507,7 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, 0, image=None, identifier=runtime_id, + function_id, 0, rlimit=None, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) # in _prepare_pod @@ -517,7 +535,7 @@ class TestKubernetesManager(base.DbTestCase): exc.OrchestratorException, '^Execution preparation failed\.$', self.manager.prepare_execution, - function_id, 0, image=None, identifier=runtime_id, + function_id, 0, rlimit=None, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) delete_function_mock.assert_called_once_with( @@ -548,7 +566,7 @@ class TestKubernetesManager(base.DbTestCase): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, 0, image=None, identifier=runtime_id, + function_id, 0, rlimit=None, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) self.assertEqual(pod.metadata.name, pod_names)