Merge "orchestrator: fix several issues of kubernetes manager"
This commit is contained in:
commit
ecf09b281a
@ -38,13 +38,11 @@ class DefaultEngine(object):
|
||||
|
||||
with db_api.transaction():
|
||||
runtime = db_api.get_runtime(runtime_id)
|
||||
labels = {'runtime_id': runtime_id}
|
||||
|
||||
try:
|
||||
self.orchestrator.create_pool(
|
||||
runtime_id,
|
||||
runtime.image,
|
||||
labels=labels,
|
||||
runtime.image
|
||||
)
|
||||
runtime.status = status.AVAILABLE
|
||||
LOG.info('Runtime %s created.', runtime_id)
|
||||
@ -59,8 +57,7 @@ class DefaultEngine(object):
|
||||
def delete_runtime(self, ctx, runtime_id):
|
||||
LOG.info('Start to delete runtime %s.', runtime_id)
|
||||
|
||||
labels = {'runtime_id': runtime_id}
|
||||
self.orchestrator.delete_pool(runtime_id, labels=labels)
|
||||
self.orchestrator.delete_pool(runtime_id)
|
||||
db_api.delete_runtime(runtime_id)
|
||||
|
||||
LOG.info('Deleted runtime %s.', runtime_id)
|
||||
@ -71,9 +68,8 @@ class DefaultEngine(object):
|
||||
runtime_id, image, pre_image
|
||||
)
|
||||
|
||||
labels = {'runtime_id': runtime_id}
|
||||
ret = self.orchestrator.update_pool(
|
||||
runtime_id, labels=labels, image=image
|
||||
runtime_id, image=image
|
||||
)
|
||||
|
||||
if ret:
|
||||
@ -171,7 +167,6 @@ class DefaultEngine(object):
|
||||
(common.generate_unicode_uuid(dashed=False),
|
||||
function_id)
|
||||
)[:63]
|
||||
labels = {'function_id': function_id}
|
||||
else:
|
||||
identifier = runtime_id
|
||||
labels = {'runtime_id': runtime_id}
|
||||
@ -221,8 +216,7 @@ class DefaultEngine(object):
|
||||
"""Deletes underlying resources allocated for function."""
|
||||
LOG.info('Start to delete function %s.', function_id)
|
||||
|
||||
labels = {'function_id': function_id}
|
||||
self.orchestrator.delete_function(function_id, labels=labels)
|
||||
self.orchestrator.delete_function(function_id)
|
||||
|
||||
LOG.info('Deleted function %s.', function_id)
|
||||
|
||||
|
@ -101,11 +101,11 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
|
||||
return ret.status.replicas == ret.status.available_replicas
|
||||
|
||||
def create_pool(self, name, image, labels=None):
|
||||
def create_pool(self, name, image):
|
||||
deployment_body = self.deployment_template.render(
|
||||
{
|
||||
"name": name,
|
||||
"labels": labels if labels else {},
|
||||
"labels": {'runtime_id': name},
|
||||
"replicas": self.conf.kubernetes.replicas,
|
||||
"container_name": 'worker',
|
||||
"image": image,
|
||||
@ -125,10 +125,11 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
|
||||
LOG.info("Deployment for runtime %s created.", name)
|
||||
|
||||
def delete_pool(self, name, labels=None):
|
||||
def delete_pool(self, name):
|
||||
"""Delete all resources belong to the deployment."""
|
||||
LOG.info("Deleting deployment %s", name)
|
||||
|
||||
labels = {'runtime_id': name}
|
||||
selector = common.convert_dict_to_string(labels)
|
||||
|
||||
self.v1extention.delete_collection_namespaced_replica_set(
|
||||
@ -162,7 +163,7 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
LOG.info("Pods in deployment %s deleted.", name)
|
||||
LOG.info("Deployment %s deleted.", name)
|
||||
|
||||
def update_pool(self, name, labels=None, image=None):
|
||||
def update_pool(self, name, image=None):
|
||||
"""Deployment rolling-update.
|
||||
|
||||
Return True if successful, otherwise return False after rolling back.
|
||||
@ -209,7 +210,7 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
"revision": 0
|
||||
}
|
||||
}
|
||||
self.v1extention.create_namespaced_deployment_rollback_rollback(
|
||||
self.v1extention.create_namespaced_deployment_rollback(
|
||||
name, self.conf.kubernetes.namespace, body
|
||||
)
|
||||
|
||||
@ -217,28 +218,28 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
|
||||
return True
|
||||
|
||||
def _choose_available_pod(self, labels, count=1, function_id=None):
|
||||
selector = common.convert_dict_to_string(labels)
|
||||
|
||||
def _choose_available_pods(self, labels, count=1, function_id=None):
|
||||
# If there is already a pod for function, reuse it.
|
||||
if function_id:
|
||||
ret = self.v1.list_namespaced_pod(
|
||||
self.conf.kubernetes.namespace,
|
||||
label_selector='function_id=%s' % function_id
|
||||
)
|
||||
if len(ret.items) > 0:
|
||||
if len(ret.items) >= count:
|
||||
LOG.debug(
|
||||
"Function %s already associates to a pod.", function_id
|
||||
"Function %s already associates to a pod with at least "
|
||||
"%d worker(s). ", function_id, count
|
||||
)
|
||||
return ret.items[:count]
|
||||
|
||||
selector = common.convert_dict_to_string(labels)
|
||||
ret = self.v1.list_namespaced_pod(
|
||||
self.conf.kubernetes.namespace,
|
||||
label_selector='!function_id,%s' % selector
|
||||
)
|
||||
|
||||
if len(ret.items) == 0:
|
||||
return None
|
||||
if len(ret.items) < count:
|
||||
return []
|
||||
|
||||
return ret.items[-count:]
|
||||
|
||||
@ -315,10 +316,10 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
def _create_pod(self, image, pod_name, labels, input):
|
||||
if not input:
|
||||
input_list = []
|
||||
elif input.get('__function_input'):
|
||||
elif isinstance(input, dict) and input.get('__function_input'):
|
||||
input_list = input.get('__function_input').split()
|
||||
else:
|
||||
input_list = [json.dumps(input)]
|
||||
input_list = list(json.loads(input))
|
||||
|
||||
pod_body = self.pod_template.render(
|
||||
{
|
||||
@ -338,7 +339,7 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
body=yaml.safe_load(pod_body),
|
||||
)
|
||||
|
||||
def _update_pod_label(self, pod, new_label=None):
|
||||
def _update_pod_label(self, pod, new_label):
|
||||
name = pod.metadata.name
|
||||
|
||||
pod_labels = copy.deepcopy(pod.metadata.labels) or {}
|
||||
@ -366,21 +367,23 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
For normal function, choose a pod from the pool and expose a service,
|
||||
return the service URL.
|
||||
"""
|
||||
pod = None
|
||||
pods = None
|
||||
|
||||
labels = labels or {'function_id': function_id}
|
||||
|
||||
if image:
|
||||
self._create_pod(image, identifier, labels, input)
|
||||
return identifier, None
|
||||
else:
|
||||
pod = self._choose_available_pod(labels, function_id=function_id)
|
||||
pods = self._choose_available_pods(labels, function_id=function_id)
|
||||
|
||||
if not pod:
|
||||
if not pods:
|
||||
LOG.critical('No worker available.')
|
||||
raise exc.OrchestratorException('Execution preparation failed.')
|
||||
|
||||
try:
|
||||
pod_name, url = self._prepare_pod(
|
||||
pod[0], identifier, function_id, labels
|
||||
pods[0], identifier, function_id, labels
|
||||
)
|
||||
return pod_name, url
|
||||
except Exception:
|
||||
@ -435,6 +438,7 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
return True, output
|
||||
|
||||
def delete_function(self, function_id, labels=None):
|
||||
labels = labels or {'function_id': function_id}
|
||||
selector = common.convert_dict_to_string(labels)
|
||||
|
||||
ret = self.v1.list_namespaced_service(
|
||||
@ -455,7 +459,7 @@ class KubernetesManager(base.OrchestratorBase):
|
||||
def scaleup_function(self, function_id, identifier=None, count=1):
|
||||
pod_names = []
|
||||
labels = {'runtime_id': identifier}
|
||||
pods = self._choose_available_pod(labels, count=count)
|
||||
pods = self._choose_available_pods(labels, count=count)
|
||||
|
||||
if not pods:
|
||||
raise exc.OrchestratorException('Not enough workers available.')
|
||||
|
@ -45,7 +45,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.default_engine.create_runtime(mock.Mock(), runtime_id)
|
||||
|
||||
self.orchestrator.create_pool.assert_called_once_with(
|
||||
runtime_id, runtime.image, labels={'runtime_id': runtime_id})
|
||||
runtime_id, runtime.image)
|
||||
runtime = db_api.get_runtime(runtime_id)
|
||||
self.assertEqual(status.AVAILABLE, runtime.status)
|
||||
|
||||
@ -59,7 +59,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.default_engine.create_runtime(mock.Mock(), runtime_id)
|
||||
|
||||
self.orchestrator.create_pool.assert_called_once_with(
|
||||
runtime_id, runtime.image, labels={'runtime_id': runtime_id})
|
||||
runtime_id, runtime.image)
|
||||
runtime = db_api.get_runtime(runtime_id)
|
||||
self.assertEqual(status.ERROR, runtime.status)
|
||||
|
||||
@ -70,7 +70,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.default_engine.delete_runtime(mock.Mock(), runtime_id)
|
||||
|
||||
self.orchestrator.delete_pool.assert_called_once_with(
|
||||
runtime_id, labels={'runtime_id': runtime_id})
|
||||
runtime_id)
|
||||
self.assertRaisesRegexp(
|
||||
exc.DBEntityNotFoundError,
|
||||
"^Runtime not found \[id=%s\]$" % runtime_id,
|
||||
@ -89,7 +89,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
mock.Mock(), runtime_id, image, pre_image)
|
||||
|
||||
self.orchestrator.update_pool.assert_called_once_with(
|
||||
runtime_id, labels={'runtime_id': runtime_id}, image=image)
|
||||
runtime_id, image=image)
|
||||
runtime = db_api.get_runtime(runtime_id)
|
||||
self.assertEqual(runtime.status, status.AVAILABLE)
|
||||
|
||||
@ -106,7 +106,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
mock.Mock(), runtime_id, image, pre_image)
|
||||
|
||||
self.orchestrator.update_pool.assert_called_once_with(
|
||||
runtime_id, labels={'runtime_id': runtime_id}, image=image)
|
||||
runtime_id, image=image)
|
||||
runtime = db_api.get_runtime(runtime_id)
|
||||
self.assertEqual(runtime.image, pre_image)
|
||||
self.assertEqual(runtime.status, status.AVAILABLE)
|
||||
@ -262,12 +262,12 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
mock.call(function_id,
|
||||
image=function.code['image'],
|
||||
identifier=mock.ANY,
|
||||
labels={'function_id': function_id},
|
||||
labels=None,
|
||||
input=None),
|
||||
mock.call(function_id,
|
||||
image=function.code['image'],
|
||||
identifier=mock.ANY,
|
||||
labels={'function_id': function_id},
|
||||
labels=None,
|
||||
input='input')]
|
||||
self.orchestrator.prepare_execution.assert_has_calls(prepare_calls)
|
||||
self.assertEqual(2, self.orchestrator.prepare_execution.call_count)
|
||||
@ -382,7 +382,7 @@ class TestDefaultEngine(base.DbTestCase):
|
||||
self.default_engine.delete_function(mock.Mock(), function_id)
|
||||
|
||||
self.orchestrator.delete_function.assert_called_once_with(
|
||||
function_id, labels={'function_id': function_id})
|
||||
function_id)
|
||||
|
||||
@mock.patch('qinling.utils.etcd_util.create_service_url')
|
||||
@mock.patch('qinling.utils.etcd_util.create_worker')
|
||||
|
Loading…
x
Reference in New Issue
Block a user