Browse Source

Runtime implementation for image type function

Limit image type function resources by using customized cpu/mem saved
in function database. And this patch is based on patch [0].

[0]: Change-Id: I7a245f93a445a00c2722238d3f94d3a960f16af4
Story: 2001586
Task: 14415

Change-Id: I1e86d5061e68eb584283b0d2f5482d4931d7e4f2
changes/35/564235/5
Jiangyuan 3 years ago
parent
commit
ff38280ac2
5 changed files with 68 additions and 25 deletions
  1. +5
    -0
      qinling/engine/default_engine.py
  2. +18
    -6
      qinling/orchestrator/kubernetes/manager.py
  3. +4
    -4
      qinling/orchestrator/kubernetes/templates/pod.j2
  4. +10
    -2
      qinling/tests/unit/engine/test_default_engine.py
  5. +31
    -13
      qinling/tests/unit/orchestrator/kubernetes/test_manager.py

+ 5
- 0
qinling/engine/default_engine.py View File

@ -128,6 +128,10 @@ class DefaultEngine(object):
function = db_api.get_function(function_id)
source = function.code['source']
rlimit = {
'cpu': function.cpu,
'memory_size': function.memory_size
}
image = None
identifier = None
labels = None
@ -186,6 +190,7 @@ class DefaultEngine(object):
_, svc_url = self.orchestrator.prepare_execution(
function_id,
function_version,
rlimit=rlimit,
image=image,
identifier=identifier,
labels=labels,


+ 18
- 6
qinling/orchestrator/kubernetes/manager.py View File

@ -324,7 +324,7 @@ class KubernetesManager(base.OrchestratorBase):
return pod_name, pod_service_url
def _create_pod(self, image, pod_name, labels, input):
def _create_pod(self, image, rlimit, pod_name, labels, input):
"""Create pod for image type function."""
if not input:
input_list = []
@ -338,7 +338,11 @@ class KubernetesManager(base.OrchestratorBase):
"pod_name": pod_name,
"labels": labels,
"pod_image": image,
"input": input_list
"input": input_list,
"req_cpu": str(rlimit['cpu']),
"limit_cpu": str(rlimit['cpu']),
"req_memory": str(rlimit['memory_size']),
"limit_memory": str(rlimit['memory_size'])
}
)
@ -369,12 +373,14 @@ class KubernetesManager(base.OrchestratorBase):
return pod_labels
def prepare_execution(self, function_id, version, image=None,
def prepare_execution(self, function_id, version, rlimit=None, image=None,
identifier=None, labels=None, input=None):
"""Prepare service URL for function version.
For image function, create a single pod with input, so the function
will be executed.
:param rlimit: optional argument passed to limit cpu/mem resources.
For image function, create a single pod with rlimit and input, so the
function will be executed in the resource limited pod.
For normal function, choose a pod from the pool and expose a service,
return the service URL.
@ -386,7 +392,13 @@ class KubernetesManager(base.OrchestratorBase):
labels = labels or {'function_id': function_id}
if image:
self._create_pod(image, identifier, labels, input)
if not rlimit:
LOG.critical('Param rlimit is required for image function.')
raise exc.OrchestratorException(
'Execution preparation failed.'
)
self._create_pod(image, rlimit, identifier, labels, input)
return identifier, None
else:
pods = self._choose_available_pods(labels, function_id=function_id,


+ 4
- 4
qinling/orchestrator/kubernetes/templates/pod.j2 View File

@ -22,8 +22,8 @@ spec:
restartPolicy: Never
resources:
limits:
cpu: "0.3"
memory: 128Mi
cpu: {{ limit_cpu }}m
memory: {{ limit_memory }}
requests:
cpu: "0.1"
memory: 32Mi
cpu: {{ req_cpu }}m
memory: {{ req_memory }}

+ 10
- 2
qinling/tests/unit/engine/test_default_engine.py View File

@ -13,6 +13,7 @@
# limitations under the License.
import mock
from oslo_config import cfg
from qinling.db import api as db_api
from qinling.engine import default_engine
@ -31,6 +32,10 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine = default_engine.DefaultEngine(
self.orchestrator, self.qinling_endpoint
)
self.rlimit = {
'cpu': cfg.CONF.resource_limits.default_cpu,
'memory_size': cfg.CONF.resource_limits.default_memory
}
def _create_running_executions(self, function_id, num):
for _ in range(num):
@ -237,12 +242,14 @@ class TestDefaultEngine(base.DbTestCase):
prepare_calls = [
mock.call(function_id,
0,
rlimit=self.rlimit,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
input=None),
mock.call(function_id,
0,
rlimit=self.rlimit,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
@ -345,8 +352,9 @@ class TestDefaultEngine(base.DbTestCase):
function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
self.orchestrator.prepare_execution.assert_called_once_with(
function_id, 0, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id}, input=None)
function_id, 0, rlimit=self.rlimit, image=None,
identifier=runtime_id, labels={'runtime_id': runtime_id},
input=None)
self.orchestrator.run_execution.assert_called_once_with(
execution_id, function_id, 0, input=None, identifier=runtime_id,
service_url='svc_url', entry=function.entry,


+ 31
- 13
qinling/tests/unit/orchestrator/kubernetes/test_manager.py View File

@ -36,6 +36,10 @@ class TestKubernetesManager(base.DbTestCase):
self.conf = CONF
self.qinling_endpoint = 'http://127.0.0.1:7070'
self.rlimit = {
'cpu': cfg.CONF.resource_limits.default_cpu,
'memory_size': cfg.CONF.resource_limits.default_memory
}
self.k8s_v1_api = mock.Mock()
self.k8s_v1_ext = mock.Mock()
clients = {'v1': self.k8s_v1_api,
@ -327,7 +331,7 @@ class TestKubernetesManager(base.DbTestCase):
function_id = common.generate_unicode_uuid()
pod_names, service_url = self.manager.prepare_execution(
function_id, 0, image=None, identifier=runtime_id,
function_id, 0, rlimit=None, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
self.assertEqual(pod.metadata.name, pod_names)
@ -372,7 +376,8 @@ class TestKubernetesManager(base.DbTestCase):
)[:63]
pod_name, url = self.manager.prepare_execution(
function_id, 0, image=image, identifier=identifier)
function_id, 0, rlimit=self.rlimit, image=image,
identifier=identifier)
self.assertEqual(identifier, pod_name)
self.assertIsNone(url)
@ -383,7 +388,11 @@ class TestKubernetesManager(base.DbTestCase):
'pod_name': identifier,
'labels': {'function_id': function_id},
'pod_image': image,
'input': []
'input': [],
'req_cpu': str(cfg.CONF.resource_limits.default_cpu),
'req_memory': str(cfg.CONF.resource_limits.default_memory),
'limit_cpu': str(cfg.CONF.resource_limits.default_cpu),
'limit_memory': str(cfg.CONF.resource_limits.default_memory)
}
)
self.k8s_v1_api.create_namespaced_pod.assert_called_once_with(
@ -399,8 +408,8 @@ class TestKubernetesManager(base.DbTestCase):
fake_input = {'__function_input': 'input_item1 input_item2'}
pod_name, url = self.manager.prepare_execution(
function_id, 0, image=image, identifier=identifier,
input=fake_input)
function_id, 0, rlimit=self.rlimit, image=image,
identifier=identifier, input=fake_input)
# in _create_pod
pod_body = self.manager.pod_template.render(
@ -408,7 +417,11 @@ class TestKubernetesManager(base.DbTestCase):
'pod_name': identifier,
'labels': {'function_id': function_id},
'pod_image': image,
'input': ['input_item1', 'input_item2']
'input': ['input_item1', 'input_item2'],
'req_cpu': str(cfg.CONF.resource_limits.default_cpu),
'req_memory': str(cfg.CONF.resource_limits.default_memory),
'limit_cpu': str(cfg.CONF.resource_limits.default_cpu),
'limit_memory': str(cfg.CONF.resource_limits.default_memory)
}
)
self.k8s_v1_api.create_namespaced_pod.assert_called_once_with(
@ -424,8 +437,8 @@ class TestKubernetesManager(base.DbTestCase):
fake_input = '["input_item3", "input_item4"]'
pod_name, url = self.manager.prepare_execution(
function_id, 0, image=image, identifier=identifier,
input=fake_input)
function_id, 0, rlimit=self.rlimit, image=image,
identifier=identifier, input=fake_input)
# in _create_pod
pod_body = self.manager.pod_template.render(
@ -433,7 +446,11 @@ class TestKubernetesManager(base.DbTestCase):
'pod_name': identifier,
'labels': {'function_id': function_id},
'pod_image': image,
'input': ['input_item3', 'input_item4']
'input': ['input_item3', 'input_item4'],
'req_cpu': str(cfg.CONF.resource_limits.default_cpu),
'req_memory': str(cfg.CONF.resource_limits.default_memory),
'limit_cpu': str(cfg.CONF.resource_limits.default_cpu),
'limit_memory': str(cfg.CONF.resource_limits.default_memory)
}
)
self.k8s_v1_api.create_namespaced_pod.assert_called_once_with(
@ -451,7 +468,8 @@ class TestKubernetesManager(base.DbTestCase):
exc.OrchestratorException,
"^Execution preparation failed\.$",
self.manager.prepare_execution,
function_id, 0, image=None, identifier=runtime_id, labels=labels)
function_id, 0, rlimit=None, image=None,
identifier=runtime_id, labels=labels)
# in _choose_available_pods
list_calls = [
@ -489,7 +507,7 @@ class TestKubernetesManager(base.DbTestCase):
function_id = common.generate_unicode_uuid()
pod_names, service_url = self.manager.prepare_execution(
function_id, 0, image=None, identifier=runtime_id,
function_id, 0, rlimit=None, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
# in _prepare_pod
@ -517,7 +535,7 @@ class TestKubernetesManager(base.DbTestCase):
exc.OrchestratorException,
'^Execution preparation failed\.$',
self.manager.prepare_execution,
function_id, 0, image=None, identifier=runtime_id,
function_id, 0, rlimit=None, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
delete_function_mock.assert_called_once_with(
@ -548,7 +566,7 @@ class TestKubernetesManager(base.DbTestCase):
function_id = common.generate_unicode_uuid()
pod_names, service_url = self.manager.prepare_execution(
function_id, 0, image=None, identifier=runtime_id,
function_id, 0, rlimit=None, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
self.assertEqual(pod.metadata.name, pod_names)


Loading…
Cancel
Save