orchestrator: fix several issues of kubernetes manager

- avoid labels being None and passed to selector generation method
- correct the name of create_namespaced_deployment_rollback
- update _choose_available_pods, taking the 'count' parameter into
  account
- fix potential errors in _create_pod as input may be a string rather
  than a dict

Change-Id: Iadc05caec871e37c7eeb9cfd9d358aec0b280315
This commit is contained in:
Hunt Xu 2018-03-02 20:00:06 +08:00
parent bfab9c508d
commit 803bb7baa6
3 changed files with 36 additions and 38 deletions

View File

@ -38,13 +38,11 @@ class DefaultEngine(object):
with db_api.transaction():
runtime = db_api.get_runtime(runtime_id)
labels = {'runtime_id': runtime_id}
try:
self.orchestrator.create_pool(
runtime_id,
runtime.image,
labels=labels,
runtime.image
)
runtime.status = status.AVAILABLE
LOG.info('Runtime %s created.', runtime_id)
@ -59,8 +57,7 @@ class DefaultEngine(object):
def delete_runtime(self, ctx, runtime_id):
LOG.info('Start to delete runtime %s.', runtime_id)
labels = {'runtime_id': runtime_id}
self.orchestrator.delete_pool(runtime_id, labels=labels)
self.orchestrator.delete_pool(runtime_id)
db_api.delete_runtime(runtime_id)
LOG.info('Deleted runtime %s.', runtime_id)
@ -71,9 +68,8 @@ class DefaultEngine(object):
runtime_id, image, pre_image
)
labels = {'runtime_id': runtime_id}
ret = self.orchestrator.update_pool(
runtime_id, labels=labels, image=image
runtime_id, image=image
)
if ret:
@ -171,7 +167,6 @@ class DefaultEngine(object):
(common.generate_unicode_uuid(dashed=False),
function_id)
)[:63]
labels = {'function_id': function_id}
else:
identifier = runtime_id
labels = {'runtime_id': runtime_id}
@ -221,8 +216,7 @@ class DefaultEngine(object):
"""Deletes underlying resources allocated for function."""
LOG.info('Start to delete function %s.', function_id)
labels = {'function_id': function_id}
self.orchestrator.delete_function(function_id, labels=labels)
self.orchestrator.delete_function(function_id)
LOG.info('Deleted function %s.', function_id)

View File

@ -101,11 +101,11 @@ class KubernetesManager(base.OrchestratorBase):
return ret.status.replicas == ret.status.available_replicas
def create_pool(self, name, image, labels=None):
def create_pool(self, name, image):
deployment_body = self.deployment_template.render(
{
"name": name,
"labels": labels if labels else {},
"labels": {'runtime_id': name},
"replicas": self.conf.kubernetes.replicas,
"container_name": 'worker',
"image": image,
@ -125,10 +125,11 @@ class KubernetesManager(base.OrchestratorBase):
LOG.info("Deployment for runtime %s created.", name)
def delete_pool(self, name, labels=None):
def delete_pool(self, name):
"""Delete all resources belong to the deployment."""
LOG.info("Deleting deployment %s", name)
labels = {'runtime_id': name}
selector = common.convert_dict_to_string(labels)
self.v1extention.delete_collection_namespaced_replica_set(
@ -162,7 +163,7 @@ class KubernetesManager(base.OrchestratorBase):
LOG.info("Pods in deployment %s deleted.", name)
LOG.info("Deployment %s deleted.", name)
def update_pool(self, name, labels=None, image=None):
def update_pool(self, name, image=None):
"""Deployment rolling-update.
Return True if successful, otherwise return False after rolling back.
@ -209,7 +210,7 @@ class KubernetesManager(base.OrchestratorBase):
"revision": 0
}
}
self.v1extention.create_namespaced_deployment_rollback_rollback(
self.v1extention.create_namespaced_deployment_rollback(
name, self.conf.kubernetes.namespace, body
)
@ -217,28 +218,28 @@ class KubernetesManager(base.OrchestratorBase):
return True
def _choose_available_pod(self, labels, count=1, function_id=None):
selector = common.convert_dict_to_string(labels)
def _choose_available_pods(self, labels, count=1, function_id=None):
# If there is already a pod for function, reuse it.
if function_id:
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='function_id=%s' % function_id
)
if len(ret.items) > 0:
if len(ret.items) >= count:
LOG.debug(
"Function %s already associates to a pod.", function_id
"Function %s already associates to a pod with at least "
"%d worker(s). ", function_id, count
)
return ret.items[:count]
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='!function_id,%s' % selector
)
if len(ret.items) == 0:
return None
if len(ret.items) < count:
return []
return ret.items[-count:]
@ -315,10 +316,10 @@ class KubernetesManager(base.OrchestratorBase):
def _create_pod(self, image, pod_name, labels, input):
if not input:
input_list = []
elif input.get('__function_input'):
elif isinstance(input, dict) and input.get('__function_input'):
input_list = input.get('__function_input').split()
else:
input_list = [json.dumps(input)]
input_list = list(json.loads(input))
pod_body = self.pod_template.render(
{
@ -338,7 +339,7 @@ class KubernetesManager(base.OrchestratorBase):
body=yaml.safe_load(pod_body),
)
def _update_pod_label(self, pod, new_label=None):
def _update_pod_label(self, pod, new_label):
name = pod.metadata.name
pod_labels = copy.deepcopy(pod.metadata.labels) or {}
@ -366,21 +367,23 @@ class KubernetesManager(base.OrchestratorBase):
For normal function, choose a pod from the pool and expose a service,
return the service URL.
"""
pod = None
pods = None
labels = labels or {'function_id': function_id}
if image:
self._create_pod(image, identifier, labels, input)
return identifier, None
else:
pod = self._choose_available_pod(labels, function_id=function_id)
pods = self._choose_available_pods(labels, function_id=function_id)
if not pod:
if not pods:
LOG.critical('No worker available.')
raise exc.OrchestratorException('Execution preparation failed.')
try:
pod_name, url = self._prepare_pod(
pod[0], identifier, function_id, labels
pods[0], identifier, function_id, labels
)
return pod_name, url
except Exception:
@ -435,6 +438,7 @@ class KubernetesManager(base.OrchestratorBase):
return True, output
def delete_function(self, function_id, labels=None):
labels = labels or {'function_id': function_id}
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_service(
@ -455,7 +459,7 @@ class KubernetesManager(base.OrchestratorBase):
def scaleup_function(self, function_id, identifier=None, count=1):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pod(labels, count=count)
pods = self._choose_available_pods(labels, count=count)
if not pods:
raise exc.OrchestratorException('Not enough workers available.')

View File

@ -45,7 +45,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.create_runtime(mock.Mock(), runtime_id)
self.orchestrator.create_pool.assert_called_once_with(
runtime_id, runtime.image, labels={'runtime_id': runtime_id})
runtime_id, runtime.image)
runtime = db_api.get_runtime(runtime_id)
self.assertEqual(status.AVAILABLE, runtime.status)
@ -59,7 +59,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.create_runtime(mock.Mock(), runtime_id)
self.orchestrator.create_pool.assert_called_once_with(
runtime_id, runtime.image, labels={'runtime_id': runtime_id})
runtime_id, runtime.image)
runtime = db_api.get_runtime(runtime_id)
self.assertEqual(status.ERROR, runtime.status)
@ -70,7 +70,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.delete_runtime(mock.Mock(), runtime_id)
self.orchestrator.delete_pool.assert_called_once_with(
runtime_id, labels={'runtime_id': runtime_id})
runtime_id)
self.assertRaisesRegexp(
exc.DBEntityNotFoundError,
"^Runtime not found \[id=%s\]$" % runtime_id,
@ -89,7 +89,7 @@ class TestDefaultEngine(base.DbTestCase):
mock.Mock(), runtime_id, image, pre_image)
self.orchestrator.update_pool.assert_called_once_with(
runtime_id, labels={'runtime_id': runtime_id}, image=image)
runtime_id, image=image)
runtime = db_api.get_runtime(runtime_id)
self.assertEqual(runtime.status, status.AVAILABLE)
@ -106,7 +106,7 @@ class TestDefaultEngine(base.DbTestCase):
mock.Mock(), runtime_id, image, pre_image)
self.orchestrator.update_pool.assert_called_once_with(
runtime_id, labels={'runtime_id': runtime_id}, image=image)
runtime_id, image=image)
runtime = db_api.get_runtime(runtime_id)
self.assertEqual(runtime.image, pre_image)
self.assertEqual(runtime.status, status.AVAILABLE)
@ -262,12 +262,12 @@ class TestDefaultEngine(base.DbTestCase):
mock.call(function_id,
image=function.code['image'],
identifier=mock.ANY,
labels={'function_id': function_id},
labels=None,
input=None),
mock.call(function_id,
image=function.code['image'],
identifier=mock.ANY,
labels={'function_id': function_id},
labels=None,
input='input')]
self.orchestrator.prepare_execution.assert_has_calls(prepare_calls)
self.assertEqual(2, self.orchestrator.prepare_execution.call_count)
@ -382,7 +382,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.delete_function(mock.Mock(), function_id)
self.orchestrator.delete_function.assert_called_once_with(
function_id, labels={'function_id': function_id})
function_id)
@mock.patch('qinling.utils.etcd_util.create_service_url')
@mock.patch('qinling.utils.etcd_util.create_worker')