Add chart API to wait on k8s resource types/labels
This adds a `wait.resources` key to chart documents which allows waiting on a list of k8s type+labels configurations to wait on. Initially supported types are pods, jobs, deployments, daemonsets, and statefulsets. The behavior for controller types is similar to that of `kubectl rollout status`. If `wait.resources` is omitted, it waits on pods and jobs (if any exist) as before. The existing `wait.labels` key still have the same behavior, but if `wait.resources` is also included, the labels are added to each resource wait in that array. Thus they serve to specify base labels that apply to all resources in the release, so as to not have to duplicate them. This may also be useful later for example to use them as labels to wait for when deleting a chart. Controller types additionaly have a `min_ready` field which represents the minimum amount of pods of the controller which must be ready in order for the controller to be considered ready. The value can either be an integer or a percent string e.g. "80%", similar to e.g. `maxUnavailable` in k8s. Default is "100%". This also wraps up moving the rest of the wait code into its own module. Change-Id: If72881af0c74e8f765bbb57ac5ffc8d709cd3c16
This commit is contained in:
parent
d1ecac2f6b
commit
9fad5cff0a
@ -80,7 +80,7 @@ class Apply(api.BaseResource):
|
||||
'enable_chart_cleanup'),
|
||||
dry_run=req.get_param_as_bool('dry_run'),
|
||||
force_wait=req.get_param_as_bool('wait'),
|
||||
timeout=req.get_param_as_int('timeout') or 0,
|
||||
timeout=req.get_param_as_int('timeout'),
|
||||
tiller_host=req.get_param('tiller_host'),
|
||||
tiller_port=req.get_param_as_int('tiller_port') or
|
||||
CONF.tiller_port,
|
||||
|
@ -108,8 +108,7 @@ SHORT_DESC = "Command installs manifest charts."
|
||||
'--timeout',
|
||||
help="Specifies time to wait for each chart to fully "
|
||||
"finish deploying.",
|
||||
type=int,
|
||||
default=0)
|
||||
type=int)
|
||||
@click.option(
|
||||
'--values',
|
||||
'-f',
|
||||
|
@ -68,17 +68,6 @@ class InvalidOverrideValuesYamlException(ArmadaException):
|
||||
super(InvalidValuesYamlException, self).__init__(self._message)
|
||||
|
||||
|
||||
class InvalidWaitTypeException(ArmadaException):
|
||||
'''
|
||||
Exception that occurs when Armada encounters an invalid wait type.
|
||||
'''
|
||||
|
||||
def __init__(self, wait_type):
|
||||
self._message = (
|
||||
'Armada encountered invalid wait type: %s' % wait_type)
|
||||
super(InvalidWaitTypeException, self).__init__(self._message)
|
||||
|
||||
|
||||
class ChartDeployException(ArmadaException):
|
||||
'''
|
||||
Exception that occurs while deploying charts.
|
||||
@ -87,3 +76,13 @@ class ChartDeployException(ArmadaException):
|
||||
def __init__(self, chart_names):
|
||||
self._message = ('Exception deploying charts: %s' % chart_names)
|
||||
super(ChartDeployException, self).__init__(self._message)
|
||||
|
||||
|
||||
class WaitException(ArmadaException):
|
||||
'''
|
||||
Exception that occurs while waiting for resources to become ready.
|
||||
'''
|
||||
|
||||
def __init__(self, message):
|
||||
self._message = message
|
||||
super(WaitException, self).__init__(message)
|
||||
|
@ -48,7 +48,7 @@ class Armada(object):
|
||||
dry_run=False,
|
||||
set_ovr=None,
|
||||
force_wait=False,
|
||||
timeout=0,
|
||||
timeout=None,
|
||||
tiller_host=None,
|
||||
tiller_port=None,
|
||||
tiller_namespace=None,
|
||||
|
@ -16,12 +16,11 @@ from oslo_log import log as logging
|
||||
import time
|
||||
import yaml
|
||||
|
||||
from armada import const
|
||||
from armada.exceptions import armada_exceptions
|
||||
from armada.handlers.chartbuilder import ChartBuilder
|
||||
from armada.handlers.test import test_release_for_success
|
||||
from armada.handlers.release_diff import ReleaseDiff
|
||||
from armada.handlers.wait import get_wait_for
|
||||
from armada.handlers.wait import ChartWait
|
||||
from armada.exceptions import tiller_exceptions
|
||||
from armada.utils.release import release_prefixer
|
||||
|
||||
@ -79,36 +78,23 @@ class ChartDeploy(object):
|
||||
self.tiller.uninstall_release(release_name)
|
||||
result['purge'] = release_name
|
||||
|
||||
wait_values = chart.get('wait', {})
|
||||
wait_labels = wait_values.get('labels', {})
|
||||
chart_wait = ChartWait(
|
||||
self.tiller.k8s,
|
||||
release_name,
|
||||
chart,
|
||||
namespace,
|
||||
k8s_wait_attempts=self.k8s_wait_attempts,
|
||||
k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep,
|
||||
timeout=self.timeout)
|
||||
|
||||
wait_timeout = self.timeout
|
||||
if wait_timeout <= 0:
|
||||
wait_timeout = wait_values.get('timeout', wait_timeout)
|
||||
native_wait_enabled = chart_wait.is_native_enabled()
|
||||
|
||||
# TODO(MarshM): Deprecated, remove `timeout` key.
|
||||
deprecated_timeout = chart.get('timeout', None)
|
||||
if isinstance(deprecated_timeout, int):
|
||||
LOG.warn('The `timeout` key is deprecated and support '
|
||||
'for this will be removed soon. Use '
|
||||
'`wait.timeout` instead.')
|
||||
if wait_timeout <= 0:
|
||||
wait_timeout = deprecated_timeout or wait_timeout
|
||||
|
||||
if wait_timeout <= 0:
|
||||
LOG.info('No Chart timeout specified, using default: %ss',
|
||||
const.DEFAULT_CHART_TIMEOUT)
|
||||
wait_timeout = const.DEFAULT_CHART_TIMEOUT
|
||||
|
||||
native_wait = wait_values.get('native', {})
|
||||
native_wait_enabled = native_wait.get('enabled', True)
|
||||
# Begin Chart timeout deadline
|
||||
deadline = time.time() + chart_wait.get_timeout()
|
||||
|
||||
chartbuilder = ChartBuilder(chart)
|
||||
new_chart = chartbuilder.get_helm_chart()
|
||||
|
||||
# Begin Chart timeout deadline
|
||||
deadline = time.time() + wait_timeout
|
||||
|
||||
# TODO(mark-burnett): It may be more robust to directly call
|
||||
# tiller status to decide whether to install/upgrade rather
|
||||
# than checking for list membership.
|
||||
@ -202,7 +188,7 @@ class ChartDeploy(object):
|
||||
result['install'] = release_name
|
||||
|
||||
timer = int(round(deadline - time.time()))
|
||||
self._wait_until_ready(release_name, wait_labels, namespace, timer)
|
||||
chart_wait.wait(timer)
|
||||
|
||||
test_chart_override = chart.get('test')
|
||||
# Use old default value when not using newer `test` key
|
||||
@ -226,37 +212,6 @@ class ChartDeploy(object):
|
||||
|
||||
return result
|
||||
|
||||
def _wait_until_ready(self, release_name, wait_labels, namespace, timeout):
|
||||
if self.dry_run:
|
||||
LOG.info(
|
||||
'Skipping wait during `dry-run`, would have waited on '
|
||||
'namespace=%s, labels=(%s) for %ss.', namespace, wait_labels,
|
||||
timeout)
|
||||
return
|
||||
|
||||
LOG.info('Waiting for release=%s', release_name)
|
||||
|
||||
waits = [
|
||||
get_wait_for('job', self.tiller.k8s, skip_if_none_found=True),
|
||||
get_wait_for('pod', self.tiller.k8s)
|
||||
]
|
||||
deadline = time.time() + timeout
|
||||
deadline_remaining = timeout
|
||||
for wait in waits:
|
||||
if deadline_remaining <= 0:
|
||||
reason = (
|
||||
'Timeout expired waiting for release=%s' % release_name)
|
||||
LOG.error(reason)
|
||||
raise armada_exceptions.ArmadaTimeoutException(reason)
|
||||
|
||||
wait.wait(
|
||||
labels=wait_labels,
|
||||
namespace=namespace,
|
||||
k8s_wait_attempts=self.k8s_wait_attempts,
|
||||
k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep,
|
||||
timeout=timeout)
|
||||
deadline_remaining = int(round(deadline - time.time()))
|
||||
|
||||
def _test_chart(self, release_name, timeout, cleanup):
|
||||
if self.dry_run:
|
||||
LOG.info(
|
||||
|
@ -13,13 +13,17 @@
|
||||
# limitations under the License.
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
import collections
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from armada import const
|
||||
from armada.utils.release import label_selectors
|
||||
from armada.const import DEFAULT_K8S_TIMEOUT
|
||||
from armada.exceptions import k8s_exceptions
|
||||
from armada.exceptions import manifest_exceptions
|
||||
from armada.exceptions import armada_exceptions
|
||||
from kubernetes import watch
|
||||
|
||||
@ -28,26 +32,110 @@ LOG = logging.getLogger(__name__)
|
||||
ROLLING_UPDATE_STRATEGY_TYPE = 'RollingUpdate'
|
||||
|
||||
|
||||
def get_wait_for(resource_type, k8s, **kwargs):
|
||||
# TODO: Validate this object up front in armada validate flow.
|
||||
class ChartWait():
|
||||
|
||||
if resource_type == 'pod':
|
||||
return PodWait(resource_type, k8s, **kwargs)
|
||||
elif resource_type == 'job':
|
||||
return JobWait(resource_type, k8s, **kwargs)
|
||||
def __init__(self, k8s, release_name, chart, namespace, k8s_wait_attempts,
|
||||
k8s_wait_attempt_sleep, timeout):
|
||||
self.k8s = k8s
|
||||
self.release_name = release_name
|
||||
self.chart = chart
|
||||
self.wait_config = chart.get('wait', {})
|
||||
self.namespace = namespace
|
||||
self.k8s_wait_attempts = max(k8s_wait_attempts, 1)
|
||||
self.k8s_wait_attempt_sleep = max(k8s_wait_attempt_sleep, 1)
|
||||
|
||||
# TODO: Also validate this up front in armada validate flow.
|
||||
raise armada_exceptions.InvalidWaitTypeException(resource_type)
|
||||
resources = self.wait_config.get('resources')
|
||||
labels = self.wait_config.get('labels', {})
|
||||
|
||||
if resources is not None:
|
||||
waits = []
|
||||
for resource_config in resources:
|
||||
# Initialize labels
|
||||
resource_config.setdefault('labels', {})
|
||||
# Add base labels
|
||||
resource_config['labels'].update(labels)
|
||||
waits.append(self.get_resource_wait(resource_config))
|
||||
else:
|
||||
waits = [
|
||||
JobWait('job', self, labels, skip_if_none_found=True),
|
||||
PodWait('pod', self, labels)
|
||||
]
|
||||
self.waits = waits
|
||||
|
||||
# Calculate timeout
|
||||
wait_timeout = timeout
|
||||
if wait_timeout is None:
|
||||
wait_timeout = self.wait_config.get('timeout')
|
||||
|
||||
# TODO(MarshM): Deprecated, remove `timeout` key.
|
||||
deprecated_timeout = self.chart.get('timeout')
|
||||
if deprecated_timeout is not None:
|
||||
LOG.warn('The `timeout` key is deprecated and support '
|
||||
'for this will be removed soon. Use '
|
||||
'`wait.timeout` instead.')
|
||||
if wait_timeout is None:
|
||||
wait_timeout = deprecated_timeout
|
||||
|
||||
if wait_timeout is None:
|
||||
LOG.info('No Chart timeout specified, using default: %ss',
|
||||
const.DEFAULT_CHART_TIMEOUT)
|
||||
wait_timeout = const.DEFAULT_CHART_TIMEOUT
|
||||
|
||||
self.timeout = wait_timeout
|
||||
|
||||
def get_timeout(self):
|
||||
return self.timeout
|
||||
|
||||
def is_native_enabled(self):
|
||||
native_wait = self.wait_config.get('native', {})
|
||||
return native_wait.get('enabled', True)
|
||||
|
||||
def wait(self, timeout):
|
||||
deadline = time.time() + timeout
|
||||
# TODO(seaneagan): Parallelize waits
|
||||
for wait in self.waits:
|
||||
wait.wait(timeout=timeout)
|
||||
timeout = int(round(deadline - time.time()))
|
||||
|
||||
def get_resource_wait(self, resource_config):
|
||||
|
||||
kwargs = dict(resource_config)
|
||||
resource_type = kwargs.pop('type')
|
||||
labels = kwargs.pop('labels')
|
||||
|
||||
try:
|
||||
if resource_type == 'pod':
|
||||
return PodWait(resource_type, self, labels, **kwargs)
|
||||
elif resource_type == 'job':
|
||||
return JobWait(resource_type, self, labels, **kwargs)
|
||||
if resource_type == 'deployment':
|
||||
return DeploymentWait(resource_type, self, labels, **kwargs)
|
||||
elif resource_type == 'daemonset':
|
||||
return DaemonSetWait(resource_type, self, labels, **kwargs)
|
||||
elif resource_type == 'statefulset':
|
||||
return StatefulSetWait(resource_type, self, labels, **kwargs)
|
||||
except TypeError as e:
|
||||
raise manifest_exceptions.ManifestException(
|
||||
'invalid config for item in `wait.resources`: {}'.format(
|
||||
resource_config))
|
||||
|
||||
raise manifest_exceptions.ManifestException(
|
||||
'invalid `type` for item in `wait.resources`: {}'.format(
|
||||
resource_config['type']))
|
||||
|
||||
|
||||
class Wait(ABC):
|
||||
class ResourceWait(ABC):
|
||||
|
||||
def __init__(self,
|
||||
resource_type,
|
||||
k8s,
|
||||
chart_wait,
|
||||
labels,
|
||||
get_resources,
|
||||
skip_if_none_found=False):
|
||||
self.resource_type = resource_type
|
||||
self.k8s = k8s
|
||||
self.chart_wait = chart_wait
|
||||
self.label_selector = label_selectors(labels)
|
||||
self.get_resources = get_resources
|
||||
self.skip_if_none_found = skip_if_none_found
|
||||
|
||||
@ -55,56 +143,40 @@ class Wait(ABC):
|
||||
def is_resource_ready(self, resource):
|
||||
'''
|
||||
:param resource: resource to check readiness of.
|
||||
:returns: 3-tuple of (status message, ready bool, error message).
|
||||
:returns: 2-tuple of (status message, ready bool).
|
||||
:raises: WaitException
|
||||
'''
|
||||
pass
|
||||
|
||||
def handle_resource(self, resource):
|
||||
resource_name = resource.metadata.name
|
||||
|
||||
message, resource_ready, err = self.is_resource_ready(resource)
|
||||
try:
|
||||
message, resource_ready = self.is_resource_ready(resource)
|
||||
|
||||
if err:
|
||||
# TODO: Handle error
|
||||
pass
|
||||
elif resource_ready:
|
||||
LOG.debug('Resource %s is ready!', resource_name)
|
||||
else:
|
||||
LOG.debug('Resource %s not ready: %s', resource_name, message)
|
||||
if resource_ready:
|
||||
LOG.debug('Resource %s is ready!', resource_name)
|
||||
else:
|
||||
LOG.debug('Resource %s not ready: %s', resource_name, message)
|
||||
|
||||
return resource_ready
|
||||
return resource_ready
|
||||
except armada_exceptions.WaitException as e:
|
||||
LOG.warn('Resource %s unlikely to become ready: %s', resource_name,
|
||||
e)
|
||||
return False
|
||||
|
||||
def wait(self,
|
||||
labels,
|
||||
namespace,
|
||||
timeout=DEFAULT_K8S_TIMEOUT,
|
||||
k8s_wait_attempts=1,
|
||||
k8s_wait_attempt_sleep=1):
|
||||
def wait(self, timeout):
|
||||
'''
|
||||
Wait until all resources become ready given the filters provided by
|
||||
``labels`` and ``namespace``.
|
||||
|
||||
:param namespace: namespace of resources to wait on
|
||||
:param labels: labels of resources to wait on
|
||||
:param timeout: time before disconnecting ``Watch`` stream
|
||||
:param k8s_wait_attempts: number of times to attempt waiting
|
||||
for resources to become ready (minimum 1).
|
||||
:param k8s_wait_attempt_sleep: time in seconds to sleep
|
||||
between attempts (minimum 1).
|
||||
'''
|
||||
|
||||
label_selector = label_selectors(labels) if labels else ''
|
||||
|
||||
wait_attempts = (k8s_wait_attempts if k8s_wait_attempts >= 1 else 1)
|
||||
sleep_time = (k8s_wait_attempt_sleep
|
||||
if k8s_wait_attempt_sleep >= 1 else 1)
|
||||
|
||||
LOG.info(
|
||||
"Waiting for resource type=%s, namespace=%s labels=%s for %ss "
|
||||
"(k8s wait %s times, sleep %ss)", self.resource_type, namespace,
|
||||
label_selector, timeout, wait_attempts, sleep_time)
|
||||
|
||||
if not label_selector:
|
||||
"(k8s wait %s times, sleep %ss)", self.resource_type,
|
||||
self.chart_wait.namespace, self.label_selector, timeout,
|
||||
self.chart_wait.k8s_wait_attempts,
|
||||
self.chart_wait.k8s_wait_attempt_sleep)
|
||||
if not self.label_selector:
|
||||
LOG.warn('"label_selector" not specified, waiting with no labels '
|
||||
'may cause unintended consequences.')
|
||||
|
||||
@ -118,17 +190,16 @@ class Wait(ABC):
|
||||
while True:
|
||||
deadline_remaining = int(round(deadline - time.time()))
|
||||
if deadline_remaining <= 0:
|
||||
LOG.info('Timed out while waiting for resources.')
|
||||
raise k8s_exceptions.KubernetesWatchTimeoutException(
|
||||
'Timed out while waiting on namespace=(%s) labels=(%s)' %
|
||||
(namespace, label_selector))
|
||||
error = (
|
||||
"Timed out waiting for resource type={}, namespace={}, "
|
||||
"labels={}".format(self.resource_type,
|
||||
self.chart_wait.namespace,
|
||||
self.label_selector))
|
||||
LOG.error(error)
|
||||
raise k8s_exceptions.KubernetesWatchTimeoutException(error)
|
||||
|
||||
timed_out, modified, unready, found_resources = (
|
||||
self._watch_resource_completions(
|
||||
namespace=namespace,
|
||||
label_selector=label_selector,
|
||||
timeout=deadline_remaining))
|
||||
|
||||
self._watch_resource_completions(timeout=deadline_remaining))
|
||||
if not found_resources:
|
||||
if self.skip_if_none_found:
|
||||
return
|
||||
@ -136,18 +207,17 @@ class Wait(ABC):
|
||||
LOG.warn(
|
||||
'Saw no resources for '
|
||||
'resource type=%s, namespace=%s, labels=(%s). Are the '
|
||||
'labels correct?', self.resource_type, namespace,
|
||||
label_selector)
|
||||
'labels correct?', self.resource_type,
|
||||
self.chart_wait.namespace, self.label_selector)
|
||||
|
||||
# TODO(seaneagan): Should probably fail here even when resources
|
||||
# were not found, at least once we have an option to ignore
|
||||
# wait timeouts.
|
||||
if timed_out and found_resources:
|
||||
LOG.info('Timed out waiting for resources: %s',
|
||||
sorted(unready))
|
||||
raise k8s_exceptions.KubernetesWatchTimeoutException(
|
||||
'Timed out while waiting on namespace=(%s) labels=(%s)' %
|
||||
(namespace, label_selector))
|
||||
error = "Timed out waiting for resources={}".format(
|
||||
sorted(unready))
|
||||
LOG.error(error)
|
||||
raise k8s_exceptions.KubernetesWatchTimeoutException(error)
|
||||
|
||||
if modified:
|
||||
successes = 0
|
||||
@ -156,19 +226,18 @@ class Wait(ABC):
|
||||
successes += 1
|
||||
LOG.debug('Found no modified resources.')
|
||||
|
||||
if successes >= wait_attempts:
|
||||
if successes >= self.chart_wait.k8s_wait_attempts:
|
||||
break
|
||||
|
||||
LOG.debug(
|
||||
'Continuing to wait: {} consecutive attempts without '
|
||||
'modified resources of {} required.', successes, wait_attempts)
|
||||
'modified resources of {} required.', successes,
|
||||
self.chart_wait.k8s_wait_attempts)
|
||||
time.sleep(self.chart_wait.k8s_wait_attempt_sleep)
|
||||
|
||||
time.sleep(sleep_time)
|
||||
return True
|
||||
|
||||
def _watch_resource_completions(self,
|
||||
namespace,
|
||||
label_selector,
|
||||
timeout=100):
|
||||
def _watch_resource_completions(self, timeout):
|
||||
'''
|
||||
Watch and wait for resource completions.
|
||||
Returns lists of resources in various conditions for the calling
|
||||
@ -176,15 +245,15 @@ class Wait(ABC):
|
||||
'''
|
||||
LOG.debug(
|
||||
'Starting to wait on: namespace=%s, resource type=%s, '
|
||||
'label_selector=(%s), timeout=%s', namespace, self.resource_type,
|
||||
label_selector, timeout)
|
||||
'label_selector=(%s), timeout=%s', self.chart_wait.namespace,
|
||||
self.resource_type, self.label_selector, timeout)
|
||||
ready = {}
|
||||
modified = set()
|
||||
found_resources = False
|
||||
|
||||
kwargs = {
|
||||
'namespace': namespace,
|
||||
'label_selector': label_selector,
|
||||
'namespace': self.chart_wait.namespace,
|
||||
'label_selector': self.label_selector,
|
||||
'timeout_seconds': timeout
|
||||
}
|
||||
|
||||
@ -212,8 +281,8 @@ class Wait(ABC):
|
||||
resource_version = resource.metadata.resource_version
|
||||
msg = ('Watch event: type=%s, name=%s, namespace=%s,'
|
||||
'resource_version=%s')
|
||||
LOG.debug(msg, event_type, resource_name, namespace,
|
||||
resource_version)
|
||||
LOG.debug(msg, event_type, resource_name,
|
||||
self.chart_wait.namespace, resource_version)
|
||||
|
||||
if event_type in {'ADDED', 'MODIFIED'}:
|
||||
found_resources = True
|
||||
@ -254,11 +323,12 @@ class Wait(ABC):
|
||||
return pc
|
||||
|
||||
|
||||
class PodWait(Wait):
|
||||
class PodWait(ResourceWait):
|
||||
|
||||
def __init__(self, resource_type, k8s, **kwargs):
|
||||
super(PodWait, self).__init__(resource_type, k8s,
|
||||
k8s.client.list_namespaced_pod, **kwargs)
|
||||
def __init__(self, resource_type, chart_wait, labels, **kwargs):
|
||||
super(PodWait, self).__init__(
|
||||
resource_type, chart_wait, labels,
|
||||
chart_wait.k8s.client.list_namespaced_pod, **kwargs)
|
||||
|
||||
def is_resource_ready(self, resource):
|
||||
pod = resource
|
||||
@ -268,22 +338,23 @@ class PodWait(Wait):
|
||||
phase = status.phase
|
||||
|
||||
if phase == 'Succeeded':
|
||||
return ("Pod {} succeeded\n".format(name), True, None)
|
||||
return ("Pod {} succeeded".format(name), True)
|
||||
|
||||
if phase == 'Running':
|
||||
cond = self._get_resource_condition(status.conditions, 'Ready')
|
||||
if cond and cond.status == 'True':
|
||||
return ("Pod {} ready\n".format(name), True, None)
|
||||
return ("Pod {} ready".format(name), True)
|
||||
|
||||
msg = "Waiting for pod {} to be ready...\n"
|
||||
return (msg.format(name), False, None)
|
||||
msg = "Waiting for pod {} to be ready..."
|
||||
return (msg.format(name), False)
|
||||
|
||||
|
||||
class JobWait(Wait):
|
||||
class JobWait(ResourceWait):
|
||||
|
||||
def __init__(self, resource_type, k8s, **kwargs):
|
||||
def __init__(self, resource_type, chart_wait, labels, **kwargs):
|
||||
super(JobWait, self).__init__(
|
||||
resource_type, k8s, k8s.batch_api.list_namespaced_job, **kwargs)
|
||||
resource_type, chart_wait, labels,
|
||||
chart_wait.k8s.batch_api.list_namespaced_job, **kwargs)
|
||||
|
||||
def is_resource_ready(self, resource):
|
||||
job = resource
|
||||
@ -293,7 +364,191 @@ class JobWait(Wait):
|
||||
completed = job.status.succeeded
|
||||
|
||||
if expected != completed:
|
||||
msg = "Waiting for job {} to be successfully completed...\n"
|
||||
return (msg.format(name), False, None)
|
||||
msg = "job {} successfully completed\n"
|
||||
return (msg.format(name), True, None)
|
||||
msg = "Waiting for job {} to be successfully completed..."
|
||||
return (msg.format(name), False)
|
||||
msg = "job {} successfully completed"
|
||||
return (msg.format(name), True)
|
||||
|
||||
|
||||
CountOrPercent = collections.namedtuple('CountOrPercent',
|
||||
'number is_percent source')
|
||||
|
||||
# Controller logic (Deployment, DaemonSet, StatefulSet) is adapted from
|
||||
# `kubectl rollout status`:
|
||||
# https://github.com/kubernetes/kubernetes/blob/master/pkg/kubectl/rollout_status.go
|
||||
|
||||
|
||||
class ControllerWait(ResourceWait):
|
||||
|
||||
def __init__(self,
|
||||
resource_type,
|
||||
chart_wait,
|
||||
labels,
|
||||
get_resources,
|
||||
min_ready="100%",
|
||||
**kwargs):
|
||||
super(ControllerWait, self).__init__(resource_type, chart_wait, labels,
|
||||
get_resources, **kwargs)
|
||||
|
||||
if isinstance(min_ready, str):
|
||||
match = re.match('(.*)%$', min_ready)
|
||||
if match:
|
||||
min_ready_percent = int(match.group(1))
|
||||
self.min_ready = CountOrPercent(
|
||||
number=min_ready_percent,
|
||||
is_percent=True,
|
||||
source=min_ready)
|
||||
else:
|
||||
raise manifest_exceptions.ManifestException(
|
||||
"`min_ready` as string must be formatted as a percent "
|
||||
"e.g. '80%'")
|
||||
else:
|
||||
self.min_ready = CountOrPercent(
|
||||
number=min_ready, is_percent=False, source=min_ready)
|
||||
|
||||
def _is_min_ready(self, ready, total):
|
||||
if self.min_ready.is_percent:
|
||||
min_ready = math.ceil(total * (self.min_ready.number / 100))
|
||||
else:
|
||||
min_ready = self.min_ready.number
|
||||
return ready >= min_ready
|
||||
|
||||
|
||||
class DeploymentWait(ControllerWait):
|
||||
|
||||
def __init__(self, resource_type, chart_wait, labels, **kwargs):
|
||||
super(DeploymentWait, self).__init__(
|
||||
resource_type, chart_wait, labels,
|
||||
chart_wait.k8s.apps_v1_api.list_namespaced_deployment, **kwargs)
|
||||
|
||||
def is_resource_ready(self, resource):
|
||||
deployment = resource
|
||||
name = deployment.metadata.name
|
||||
spec = deployment.spec
|
||||
status = deployment.status
|
||||
|
||||
if deployment.metadata.generation <= status.observed_generation:
|
||||
cond = self._get_resource_condition(status.conditions,
|
||||
'Progressing')
|
||||
if cond and cond.reason == 'ProgressDeadlineExceeded':
|
||||
msg = "deployment {} exceeded its progress deadline"
|
||||
return ("", False, msg.format(name))
|
||||
|
||||
if spec.replicas and status.updated_replicas < spec.replicas:
|
||||
msg = ("Waiting for deployment {} rollout to finish: {} out "
|
||||
"of {} new replicas have been updated...")
|
||||
return (msg.format(name, status.updated_replicas,
|
||||
spec.replicas), False)
|
||||
|
||||
if status.replicas > status.updated_replicas:
|
||||
msg = ("Waiting for deployment {} rollout to finish: {} old "
|
||||
"replicas are pending termination...")
|
||||
pending = status.replicas - status.updated_replicas
|
||||
return (msg.format(name, pending), False)
|
||||
|
||||
if not self._is_min_ready(status.available_replicas,
|
||||
status.updated_replicas):
|
||||
msg = ("Waiting for deployment {} rollout to finish: {} of {} "
|
||||
"updated replicas are available, with min_ready={}")
|
||||
return (msg.format(name, status.available_replicas,
|
||||
status.updated_replicas,
|
||||
self.min_ready.source), False, None)
|
||||
msg = "deployment {} successfully rolled out\n"
|
||||
return (msg.format(name), True)
|
||||
|
||||
msg = "Waiting for deployment spec update to be observed..."
|
||||
return (msg.format(), False)
|
||||
|
||||
|
||||
class DaemonSetWait(ControllerWait):
|
||||
|
||||
def __init__(self, resource_type, chart_wait, labels, **kwargs):
|
||||
super(DaemonSetWait, self).__init__(
|
||||
resource_type, chart_wait, labels,
|
||||
chart_wait.k8s.apps_v1_api.list_namespaced_daemon_set, **kwargs)
|
||||
|
||||
def is_resource_ready(self, resource):
|
||||
daemon = resource
|
||||
name = daemon.metadata.name
|
||||
spec = daemon.spec
|
||||
status = daemon.status
|
||||
|
||||
if spec.update_strategy.type != ROLLING_UPDATE_STRATEGY_TYPE:
|
||||
msg = ("Assuming non-readiness for strategy type {}, can only "
|
||||
"determine for {}")
|
||||
raise armada_exceptions.WaitException(
|
||||
msg.format(spec.update_strategy.type,
|
||||
ROLLING_UPDATE_STRATEGY_TYPE))
|
||||
|
||||
if daemon.metadata.generation <= status.observed_generation:
|
||||
if (status.updated_number_scheduled <
|
||||
status.desired_number_scheduled):
|
||||
msg = ("Waiting for daemon set {} rollout to finish: {} out "
|
||||
"of {} new pods have been updated...")
|
||||
return (msg.format(name, status.updated_number_scheduled,
|
||||
status.desired_number_scheduled), False)
|
||||
|
||||
if not self._is_min_ready(status.number_available,
|
||||
status.desired_number_scheduled):
|
||||
msg = ("Waiting for daemon set {} rollout to finish: {} of {} "
|
||||
"updated pods are available, with min_ready={}")
|
||||
return (msg.format(name, status.number_available,
|
||||
status.desired_number_scheduled,
|
||||
self.min_ready.source), False)
|
||||
|
||||
msg = "daemon set {} successfully rolled out"
|
||||
return (msg.format(name), True)
|
||||
|
||||
msg = "Waiting for daemon set spec update to be observed..."
|
||||
return (msg.format(), False)
|
||||
|
||||
|
||||
class StatefulSetWait(ControllerWait):
|
||||
|
||||
def __init__(self, resource_type, chart_wait, labels, **kwargs):
|
||||
super(StatefulSetWait, self).__init__(
|
||||
resource_type, chart_wait, labels,
|
||||
chart_wait.k8s.apps_v1_api.list_namespaced_stateful_set, **kwargs)
|
||||
|
||||
def is_resource_ready(self, resource):
|
||||
sts = resource
|
||||
name = sts.metadata.name
|
||||
spec = sts.spec
|
||||
status = sts.status
|
||||
|
||||
if spec.update_strategy.type != ROLLING_UPDATE_STRATEGY_TYPE:
|
||||
msg = ("Assuming non-readiness for strategy type {}, can only "
|
||||
"determine for {}")
|
||||
|
||||
raise armada_exceptions.WaitException(
|
||||
msg.format(spec.update_strategy.type,
|
||||
ROLLING_UPDATE_STRATEGY_TYPE))
|
||||
|
||||
if (status.observed_generation == 0 or
|
||||
sts.metadata.generation > status.observed_generation):
|
||||
msg = "Waiting for statefulset spec update to be observed..."
|
||||
return (msg, False)
|
||||
|
||||
if spec.replicas and not self._is_min_ready(status.ready_replicas,
|
||||
spec.replicas):
|
||||
msg = ("Waiting for statefulset {} rollout to finish: {} of {} "
|
||||
"pods are ready, with min_ready={}")
|
||||
return (msg.format(name, status.ready_replicas, spec.replicas,
|
||||
self.min_ready.source), False)
|
||||
|
||||
if (spec.update_strategy.type == ROLLING_UPDATE_STRATEGY_TYPE and
|
||||
spec.update_strategy.rolling_update):
|
||||
if spec.replicas and spec.update_strategy.rolling_update.partition:
|
||||
msg = ("Waiting on partitioned rollout not supported, "
|
||||
"assuming non-readiness of statefulset {}")
|
||||
return (msg.format(name), False)
|
||||
|
||||
if status.update_revision != status.current_revision:
|
||||
msg = ("waiting for statefulset rolling update to complete {} "
|
||||
"pods at revision {}...")
|
||||
return (msg.format(status.updated_replicas,
|
||||
status.update_revision), False)
|
||||
|
||||
msg = "statefulset rolling update complete {} pods at revision {}..."
|
||||
return (msg.format(status.current_replicas, status.current_revision),
|
||||
True)
|
||||
|
@ -81,6 +81,21 @@ data:
|
||||
properties:
|
||||
timeout:
|
||||
type: integer
|
||||
resources:
|
||||
type: array
|
||||
items:
|
||||
properties:
|
||||
type:
|
||||
type: string
|
||||
labels:
|
||||
$ref: '#/definitions/labels'
|
||||
min_ready:
|
||||
anyOf:
|
||||
- type: integer
|
||||
- type: string
|
||||
required:
|
||||
- type
|
||||
additionalProperties: false
|
||||
labels:
|
||||
$ref: "#/definitions/labels"
|
||||
# Config for helm's native `--wait` param.
|
||||
|
187
armada/tests/unit/handlers/test_wait.py
Normal file
187
armada/tests/unit/handlers/test_wait.py
Normal file
@ -0,0 +1,187 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from armada import const
|
||||
from armada.exceptions import manifest_exceptions
|
||||
from armada.handlers import wait
|
||||
from armada.tests.unit import base
|
||||
|
||||
test_chart = {'wait': {'timeout': 10, 'native': {'enabled': False}}}
|
||||
|
||||
|
||||
class ChartWaitTestCase(base.ArmadaTestCase):
|
||||
|
||||
def get_unit(self, chart, timeout=None):
|
||||
return wait.ChartWait(
|
||||
k8s=mock.MagicMock(),
|
||||
release_name='test-test',
|
||||
chart=chart,
|
||||
namespace='test',
|
||||
k8s_wait_attempts=1,
|
||||
k8s_wait_attempt_sleep=1,
|
||||
timeout=timeout)
|
||||
|
||||
def test_get_timeout(self):
|
||||
unit = self.get_unit({'timeout': 5, 'wait': {'timeout': 10}})
|
||||
self.assertEquals(unit.get_timeout(), 10)
|
||||
|
||||
def test_get_timeout_default(self):
|
||||
unit = self.get_unit({})
|
||||
self.assertEquals(unit.get_timeout(), const.DEFAULT_CHART_TIMEOUT)
|
||||
|
||||
def test_get_timeout_override(self):
|
||||
unit = self.get_unit(
|
||||
timeout=20, chart={
|
||||
'timeout': 5,
|
||||
'wait': {
|
||||
'timeout': 10
|
||||
}
|
||||
})
|
||||
|
||||
self.assertEquals(unit.get_timeout(), 20)
|
||||
|
||||
def test_get_timeout_deprecated(self):
|
||||
unit = self.get_unit({'timeout': 5})
|
||||
self.assertEquals(unit.get_timeout(), 5)
|
||||
|
||||
def test_is_native_enabled_default_true(self):
|
||||
unit = self.get_unit({})
|
||||
self.assertEquals(unit.is_native_enabled(), True)
|
||||
|
||||
def test_is_native_enabled_true(self):
|
||||
unit = self.get_unit({'wait': {'native': {'enabled': True}}})
|
||||
self.assertEquals(unit.is_native_enabled(), True)
|
||||
|
||||
def test_is_native_enabled_false(self):
|
||||
unit = self.get_unit({'wait': {'native': {'enabled': False}}})
|
||||
self.assertEquals(unit.is_native_enabled(), False)
|
||||
|
||||
def test_waits_init(self):
|
||||
unit = self.get_unit({
|
||||
'wait': {
|
||||
'resources': [{
|
||||
'type': 'pod',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
}
|
||||
}, {
|
||||
'type': 'job',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
}
|
||||
}, {
|
||||
'type': 'daemonset',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
},
|
||||
'min_ready': 5
|
||||
}, {
|
||||
'type': 'deployment',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
},
|
||||
'min_ready': '50%'
|
||||
}, {
|
||||
'type': 'statefulset',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
}
|
||||
}]
|
||||
}
|
||||
}) # yapf: disable
|
||||
|
||||
self.assertEqual(5, len(unit.waits))
|
||||
self.assertIsInstance(unit.waits[0], wait.PodWait)
|
||||
self.assertIsInstance(unit.waits[1], wait.JobWait)
|
||||
self.assertIsInstance(unit.waits[2], wait.DaemonSetWait)
|
||||
self.assertIsInstance(unit.waits[3], wait.DeploymentWait)
|
||||
self.assertIsInstance(unit.waits[4], wait.StatefulSetWait)
|
||||
|
||||
def test_waits_init_min_ready_fails_if_not_controller(self):
|
||||
|
||||
def create_pod_wait_min_ready():
|
||||
self.get_unit({
|
||||
'wait': {
|
||||
'resources': [{
|
||||
'type': 'pod',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
},
|
||||
'min_ready': 5
|
||||
}]
|
||||
}
|
||||
})
|
||||
|
||||
self.assertRaises(manifest_exceptions.ManifestException,
|
||||
create_pod_wait_min_ready)
|
||||
|
||||
def create_job_wait_min_ready():
|
||||
self.get_unit({
|
||||
'wait': {
|
||||
'resources': [{
|
||||
'type': 'job',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
},
|
||||
'min_ready': 5
|
||||
}]
|
||||
}
|
||||
})
|
||||
|
||||
self.assertRaises(manifest_exceptions.ManifestException,
|
||||
create_job_wait_min_ready)
|
||||
|
||||
def test_waits_init_invalid_type(self):
|
||||
|
||||
def create_with_invalid_type():
|
||||
self.get_unit({
|
||||
'wait': {
|
||||
'resources': [{
|
||||
'type': 'invalid',
|
||||
'labels': {
|
||||
'foo': 'bar'
|
||||
},
|
||||
'min_ready': 5
|
||||
}]
|
||||
}
|
||||
})
|
||||
|
||||
self.assertRaises(manifest_exceptions.ManifestException,
|
||||
create_with_invalid_type)
|
||||
|
||||
@mock.patch.object(wait.ChartWait, 'get_resource_wait')
|
||||
def test_wait(self, get_resource_wait):
|
||||
|
||||
def return_mock(*args, **kwargs):
|
||||
return mock.MagicMock()
|
||||
|
||||
get_resource_wait.side_effect = return_mock
|
||||
|
||||
unit = self.get_unit({
|
||||
'wait': {
|
||||
'resources': [{
|
||||
'type': 'foo'
|
||||
}, {
|
||||
'type': 'bar'
|
||||
}]
|
||||
}
|
||||
})
|
||||
|
||||
unit.wait(10)
|
||||
|
||||
self.assertEqual(2, len(unit.waits))
|
||||
for w in unit.waits:
|
||||
w.wait.assert_called_once()
|
@ -26,3 +26,23 @@ Armada Exceptions
|
||||
:members:
|
||||
:show-inheritance:
|
||||
:undoc-members:
|
||||
|
||||
.. autoexception:: armada.exceptions.armada_exceptions.InvalidValuesYamlException
|
||||
:members:
|
||||
:show-inheritance:
|
||||
:undoc-members:
|
||||
|
||||
.. autoexception:: armada.exceptions.armada_exceptions.InvalidOverrideValuesYamlException
|
||||
:members:
|
||||
:show-inheritance:
|
||||
:undoc-members:
|
||||
|
||||
.. autoexception:: armada.exceptions.armada_exceptions.ChartDeployException
|
||||
:members:
|
||||
:show-inheritance:
|
||||
:undoc-members:
|
||||
|
||||
.. autoexception:: armada.exceptions.armada_exceptions.WaitException
|
||||
:members:
|
||||
:show-inheritance:
|
||||
:undoc-members:
|
||||
|
@ -99,7 +99,7 @@ Chart
|
||||
+-----------------+----------+---------------------------------------------------------------------------------------+
|
||||
| namespace | string | namespace of your chart |
|
||||
+-----------------+----------+---------------------------------------------------------------------------------------+
|
||||
| wait | object | See Wait_. |
|
||||
| wait | object | See `Wait`_. |
|
||||
+-----------------+----------+---------------------------------------------------------------------------------------+
|
||||
| protected | object | do not delete FAILED releases when encountered from previous run (provide the |
|
||||
| | | 'continue_processing' bool to continue or halt execution (default: halt)) |
|
||||
@ -125,11 +125,30 @@ Wait
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| keyword | type | action |
|
||||
+=============+==========+====================================================================+
|
||||
| native | object | See `Wait Native`_. |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| timeout | int | time (in seconds) to wait for chart to deploy |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| labels | object | k:v mapping of labels to select Kubernetes resources |
|
||||
| resources | array | Array of `Wait Resource`_ to wait on, with ``labels`` added to each|
|
||||
| | | item. Defaults to pods and jobs (if any exist) matching ``labels``.|
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| labels | object | Base mapping of labels to wait on. They are added to any labels in |
|
||||
| | | each item in the ``resources`` array. |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| native | boolean | See `Wait Native`_. |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
|
||||
Wait Resource
|
||||
^^^^^^^^^^^^^
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| keyword | type | action |
|
||||
+=============+==========+====================================================================+
|
||||
| type | string | k8s resource type, supports: controllers ('deployment', |
|
||||
| | | 'daemonset', 'statefulset'), 'pod', 'job' |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| labels | object | mapping of kubernetes resource labels |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
| min\_ready | int | Only for controller ``type``s. Amount of pods in a controller |
|
||||
| | string | which must be ready. Can be integer or percent string e.g. ``80%``.|
|
||||
| | | Default ``100%``. |
|
||||
+-------------+----------+--------------------------------------------------------------------+
|
||||
|
||||
Wait Native
|
||||
|
Loading…
Reference in New Issue
Block a user