Wait for jobs to complete
- Wait for jobs to show as completed, instead of relying on pods associated with the job to show healthy, as the pods can go healthy or be removed while the job is still processing. Armada would continue forward as soon as all pods in current scope show as healthy. - Refactor delete pod action a bit, including removing unused code. - Fixed bug in waiting for pods to delete (in tiller handler L274). Bug caused a hung state while deleting pods as a pre-update hook, by passing timeout value in the incorrect position. Change-Id: I2a942f0a6290e8337fd7a43c3e8c9b4c9e350a10
This commit is contained in:
parent
cd0242780e
commit
ad790b98d7
@ -55,15 +55,17 @@ class K8s(object):
|
||||
propagation_policy='Foreground',
|
||||
timeout=DEFAULT_K8S_TIMEOUT):
|
||||
'''
|
||||
Delete a job from a namespace (see _delete_item_action).
|
||||
|
||||
:param name: name of job
|
||||
:param namespace: namespace of job
|
||||
:param namespace: namespace
|
||||
:param propagation_policy: The Kubernetes propagation_policy to apply
|
||||
to the delete. Default 'Foreground' means that child pods to the
|
||||
job will be deleted before the job is marked as deleted.
|
||||
to the delete.
|
||||
:param timeout: The timeout to wait for the delete to complete
|
||||
'''
|
||||
self._delete_job_action(self.batch_api.list_namespaced_job,
|
||||
self.batch_api.delete_namespaced_job, "job",
|
||||
name, namespace, propagation_policy, timeout)
|
||||
self._delete_item_action(self.batch_api.list_namespaced_job,
|
||||
self.batch_api.delete_namespaced_job, "job",
|
||||
name, namespace, propagation_policy, timeout)
|
||||
|
||||
def delete_cron_job_action(self,
|
||||
name,
|
||||
@ -71,30 +73,69 @@ class K8s(object):
|
||||
propagation_policy='Foreground',
|
||||
timeout=DEFAULT_K8S_TIMEOUT):
|
||||
'''
|
||||
Delete a cron job from a namespace (see _delete_item_action).
|
||||
|
||||
:param name: name of cron job
|
||||
:param namespace: namespace of cron job
|
||||
:param namespace: namespace
|
||||
:param propagation_policy: The Kubernetes propagation_policy to apply
|
||||
to the delete. Default 'Foreground' means that child pods of the
|
||||
cron job will be deleted before the cron job is marked as deleted.
|
||||
to the delete.
|
||||
:param timeout: The timeout to wait for the delete to complete
|
||||
'''
|
||||
self._delete_job_action(
|
||||
self._delete_item_action(
|
||||
self.batch_v1beta1_api.list_namespaced_cron_job,
|
||||
self.batch_v1beta1_api.delete_namespaced_cron_job, "cron job",
|
||||
name, namespace, propagation_policy, timeout)
|
||||
|
||||
def _delete_job_action(self,
|
||||
list_func,
|
||||
delete_func,
|
||||
job_type_description,
|
||||
name,
|
||||
namespace="default",
|
||||
propagation_policy='Foreground',
|
||||
timeout=DEFAULT_K8S_TIMEOUT):
|
||||
def delete_pod_action(self,
|
||||
name,
|
||||
namespace="default",
|
||||
propagation_policy='Foreground',
|
||||
timeout=DEFAULT_K8S_TIMEOUT):
|
||||
'''
|
||||
Delete a pod from a namespace (see _delete_item_action).
|
||||
|
||||
:param name: name of pod
|
||||
:param namespace: namespace
|
||||
:param propagation_policy: The Kubernetes propagation_policy to apply
|
||||
to the delete.
|
||||
:param timeout: The timeout to wait for the delete to complete
|
||||
'''
|
||||
self._delete_item_action(self.client.list_namespaced_pod,
|
||||
self.client.delete_namespaced_pod, "pod",
|
||||
name, namespace, propagation_policy, timeout)
|
||||
|
||||
def _delete_item_action(self,
|
||||
list_func,
|
||||
delete_func,
|
||||
object_type_description,
|
||||
name,
|
||||
namespace="default",
|
||||
propagation_policy='Foreground',
|
||||
timeout=DEFAULT_K8S_TIMEOUT):
|
||||
'''
|
||||
This function takes the action to delete an object (job, cronjob, pod)
|
||||
from kubernetes. It will wait for the object to be fully deleted before
|
||||
returning to processing or timing out.
|
||||
|
||||
:param list_func: The callback function to list the specified object
|
||||
type
|
||||
:param delete_func: The callback function to delete the specified
|
||||
object type
|
||||
:param object_type_description: The types of objects to delete,
|
||||
in `job`, `cronjob`, or `pod`
|
||||
:param name: The name of the object to delete
|
||||
:param namespace: The namespace of the object
|
||||
:param propagation_policy: The Kubernetes propagation_policy to apply
|
||||
to the delete. Default 'Foreground' means that child objects
|
||||
will be deleted before the given object is marked as deleted.
|
||||
See: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents # noqa
|
||||
:param timeout: The timeout to wait for the delete to complete
|
||||
'''
|
||||
try:
|
||||
timeout = self._check_timeout(timeout)
|
||||
|
||||
LOG.debug('Watching to delete %s %s, Wait timeout=%s',
|
||||
job_type_description, name, timeout)
|
||||
object_type_description, name, timeout)
|
||||
body = client.V1DeleteOptions()
|
||||
w = watch.Watch()
|
||||
issue_delete = True
|
||||
@ -110,29 +151,29 @@ class K8s(object):
|
||||
issue_delete = False
|
||||
|
||||
event_type = event['type'].upper()
|
||||
job_name = event['object'].metadata.name
|
||||
LOG.debug('Watch event %s on %s', event_type, job_name)
|
||||
item_name = event['object'].metadata.name
|
||||
LOG.debug('Watch event %s on %s', event_type, item_name)
|
||||
|
||||
if job_name == name:
|
||||
if item_name == name:
|
||||
found_events = True
|
||||
if event_type == 'DELETED':
|
||||
LOG.info('Successfully deleted %s %s',
|
||||
job_type_description, job_name)
|
||||
object_type_description, item_name)
|
||||
return
|
||||
|
||||
if not found_events:
|
||||
LOG.warn('Saw no delete events for %s %s in namespace=%s',
|
||||
job_type_description, name, namespace)
|
||||
object_type_description, name, namespace)
|
||||
|
||||
err_msg = ('Reached timeout while waiting to delete %s: '
|
||||
'name=%s, namespace=%s' % (job_type_description, name,
|
||||
namespace))
|
||||
'name=%s, namespace=%s' % (object_type_description,
|
||||
name, namespace))
|
||||
LOG.error(err_msg)
|
||||
raise exceptions.KubernetesWatchTimeoutException(err_msg)
|
||||
|
||||
except ApiException as e:
|
||||
LOG.exception("Exception when deleting %s: name=%s, namespace=%s",
|
||||
job_type_description, name, namespace)
|
||||
object_type_description, name, namespace)
|
||||
raise e
|
||||
|
||||
def get_namespace_job(self, namespace="default", label_selector=''):
|
||||
@ -223,19 +264,6 @@ class K8s(object):
|
||||
return self.extension_api.delete_namespaced_daemon_set(
|
||||
name, namespace, body)
|
||||
|
||||
def delete_namespace_pod(self, name, namespace="default", body=None):
|
||||
'''
|
||||
:param name: name of the Pod
|
||||
:param namespace: namespace of the Pod
|
||||
:param body: V1DeleteOptions
|
||||
|
||||
Deletes pod by name and returns V1Status object
|
||||
'''
|
||||
if body is None:
|
||||
body = client.V1DeleteOptions()
|
||||
|
||||
return self.client.delete_namespaced_pod(name, namespace, body)
|
||||
|
||||
def wait_for_pod_redeployment(self, old_pod_name, namespace):
|
||||
'''
|
||||
:param old_pod_name: name of pods
|
||||
@ -338,8 +366,16 @@ class K8s(object):
|
||||
LOG.warn('"label_selector" not specified, waiting with no labels '
|
||||
'may cause unintended consequences.')
|
||||
|
||||
# Track the overall deadline for timing out during waits
|
||||
deadline = time.time() + timeout
|
||||
|
||||
# First, we should watch for jobs before checking pods, as a job can
|
||||
# still be running even after its current pods look healthy or have
|
||||
# been removed and are pending reschedule
|
||||
found_jobs = self.get_namespace_job(namespace, label_selector)
|
||||
if len(found_jobs.items):
|
||||
self._watch_job_completion(namespace, label_selector, timeout)
|
||||
|
||||
# NOTE(mark-burnett): Attempt to wait multiple times without
|
||||
# modification, in case new pods appear after our watch exits.
|
||||
|
||||
@ -347,9 +383,13 @@ class K8s(object):
|
||||
while successes < wait_attempts:
|
||||
deadline_remaining = int(round(deadline - time.time()))
|
||||
if deadline_remaining <= 0:
|
||||
return False
|
||||
LOG.info('Timed out while waiting for pods.')
|
||||
raise exceptions.KubernetesWatchTimeoutException(
|
||||
'Timed out while waiting on namespace=(%s) labels=(%s)' %
|
||||
(namespace, label_selector))
|
||||
|
||||
timed_out, modified_pods, unready_pods, found_events = (
|
||||
self._wait_one_time(
|
||||
self._watch_pod_completions(
|
||||
namespace=namespace,
|
||||
label_selector=label_selector,
|
||||
timeout=deadline_remaining))
|
||||
@ -357,8 +397,8 @@ class K8s(object):
|
||||
if not found_events:
|
||||
LOG.warn(
|
||||
'Saw no install/update events for release=%s, '
|
||||
'namespace=%s, labels=(%s)', release, namespace,
|
||||
label_selector)
|
||||
'namespace=%s, labels=(%s). Are the labels correct?',
|
||||
release, namespace, label_selector)
|
||||
|
||||
if timed_out:
|
||||
LOG.info('Timed out waiting for pods: %s',
|
||||
@ -366,7 +406,6 @@ class K8s(object):
|
||||
raise exceptions.KubernetesWatchTimeoutException(
|
||||
'Timed out while waiting on namespace=(%s) labels=(%s)' %
|
||||
(namespace, label_selector))
|
||||
return False
|
||||
|
||||
if modified_pods:
|
||||
successes = 0
|
||||
@ -381,9 +420,14 @@ class K8s(object):
|
||||
|
||||
return True
|
||||
|
||||
def _wait_one_time(self, namespace, label_selector, timeout=100):
|
||||
def _watch_pod_completions(self, namespace, label_selector, timeout=100):
|
||||
'''
|
||||
Watch and wait for pod completions.
|
||||
Returns lists of pods in various conditions for the calling function
|
||||
to handle.
|
||||
'''
|
||||
LOG.debug(
|
||||
'Starting to wait: namespace=%s, label_selector=(%s), '
|
||||
'Starting to wait on pods: namespace=%s, label_selector=(%s), '
|
||||
'timeout=%s', namespace, label_selector, timeout)
|
||||
ready_pods = {}
|
||||
modified_pods = set()
|
||||
@ -476,3 +520,56 @@ class K8s(object):
|
||||
'using default %ss.', DEFAULT_K8S_TIMEOUT)
|
||||
timeout = DEFAULT_K8S_TIMEOUT
|
||||
return timeout
|
||||
|
||||
def _watch_job_completion(self, namespace, label_selector, timeout):
|
||||
'''
|
||||
Watch and wait for job completion.
|
||||
Returns when conditions are met, or raises a timeout exception.
|
||||
'''
|
||||
try:
|
||||
timeout = self._check_timeout(timeout)
|
||||
|
||||
ready_jobs = {}
|
||||
w = watch.Watch()
|
||||
for event in w.stream(
|
||||
self.batch_api.list_namespaced_job,
|
||||
namespace=namespace,
|
||||
label_selector=label_selector,
|
||||
timeout_seconds=timeout):
|
||||
|
||||
job_name = event['object'].metadata.name
|
||||
LOG.debug('Watch event %s on job %s', event['type'].upper(),
|
||||
job_name)
|
||||
|
||||
# Track the expected and actual number of completed pods
|
||||
# See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ # noqa
|
||||
expected = event['object'].spec.completions
|
||||
completed = event['object'].status.succeeded
|
||||
|
||||
if expected != completed:
|
||||
ready_jobs[job_name] = False
|
||||
else:
|
||||
ready_jobs[job_name] = True
|
||||
LOG.debug(
|
||||
'Job %s complete (spec.completions=%s, '
|
||||
'status.succeeded=%s)', job_name, expected, completed)
|
||||
|
||||
if all(ready_jobs.values()):
|
||||
return True
|
||||
|
||||
except ApiException as e:
|
||||
LOG.exception(
|
||||
"Exception when watching jobs: namespace=%s, labels=(%s)",
|
||||
namespace, label_selector)
|
||||
raise e
|
||||
|
||||
if not ready_jobs:
|
||||
LOG.warn(
|
||||
'Saw no job events for namespace=%s, labels=(%s). '
|
||||
'Are the labels correct?', namespace, label_selector)
|
||||
return False
|
||||
|
||||
err_msg = ('Reached timeout while waiting for job completions: '
|
||||
'namespace=%s, labels=(%s)' % (namespace, label_selector))
|
||||
LOG.error(err_msg)
|
||||
raise exceptions.KubernetesWatchTimeoutException(err_msg)
|
||||
|
@ -265,8 +265,13 @@ class Tiller(object):
|
||||
action_type = action.get('type')
|
||||
labels = action.get('labels', None)
|
||||
|
||||
self.delete_resources(release_name, name, action_type, labels,
|
||||
namespace, timeout)
|
||||
self.delete_resources(
|
||||
release_name,
|
||||
name,
|
||||
action_type,
|
||||
labels,
|
||||
namespace,
|
||||
timeout=timeout)
|
||||
except Exception:
|
||||
LOG.warn("PRE: Could not delete anything, please check yaml")
|
||||
raise ex.PreUpdateJobDeleteException(name, namespace)
|
||||
@ -660,7 +665,7 @@ class Tiller(object):
|
||||
|
||||
LOG.info("Deleting pod %s in namespace: %s", pod_name,
|
||||
namespace)
|
||||
self.k8s.delete_namespace_pod(pod_name, namespace)
|
||||
self.k8s.delete_pod_action(pod_name, namespace)
|
||||
if wait:
|
||||
self.k8s.wait_for_pod_redeployment(pod_name, namespace)
|
||||
handled = True
|
||||
|
Loading…
Reference in New Issue
Block a user