From 7225cc76cbf3239b06b6fcc12ebba51d748e1e95 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Thu, 7 Dec 2017 08:37:57 +0000 Subject: [PATCH] Add Operator to Check Node Status Had a discussion with Scott and I agree that checking node status (instead of pod readiness) should be the way to determine if the cluster-join process is completed. This P.S. adds the operator to address that in the short run. The long term solution will be to add a promenade API endpoint that checks cluster-join and Airflow will query that endpoint instead. This P.S. also updates some of the comments/wordings used in the 'check_k8s_pod_status' operator. Change-Id: I2e5390f1f64c7d8ce374b2537e6d14e65dffecd6 --- .../plugins/check_k8s_node_status.py | 104 ++++++++++++++++++ .../plugins/check_k8s_pod_status.py | 15 ++- 2 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 shipyard_airflow/plugins/check_k8s_node_status.py diff --git a/shipyard_airflow/plugins/check_k8s_node_status.py b/shipyard_airflow/plugins/check_k8s_node_status.py new file mode 100644 index 00000000..b4d2359c --- /dev/null +++ b/shipyard_airflow/plugins/check_k8s_node_status.py @@ -0,0 +1,104 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.exceptions import AirflowException +from kubernetes import client, config + + +def check_node_status(time_out, interval): + """This function retrieves the current state of the nodes in the + Kubernetes cluster. We can use it to check the state of the + cluster join process (drydock/promenade) and determine if all + the bare metal nodes have successfully joined the Kubernetes + cluster. + + :param time_out: Node should be in Ready state before Time Out + :param interval: Time interval in which we query node state + + Example:: + + from check_k8s_node_status import check_node_status + + # Calls function to check that all nodes are in Ready State + # Time out in this case is set to 15 mins, the time interval + # has been set to 60 seconds + check_node_status(900, 60) + """ + # Initialize Variable + not_ready_node_list = [] + + # Note that we are using 'in_cluster_config' + config.load_incluster_config() + v1 = client.CoreV1Api() + + # Logs initial state of all nodes in the cluster + ret_init = v1.list_node(watch=False) + + for i in ret_init.items: + logging.info("Current state of nodes in Cluster is") + logging.info("%s\t%s\t%s", i.metadata.name, + i.status.conditions[-1].status, + i.status.conditions[-1].type) + + # Populates the list of nodes in the Cluster + not_ready_node_list.append(i.metadata.name) + + # Calculate number of times to execute the 'for' loop + # Ensure that 'time_out' and 'interval' is passed in as integer + # The result from the division will be a floating number which + # We will round off to nearest whole number + end_range = round(int(time_out) / int(interval)) + + for i in range(0, end_range + 1): + # Reset node_ready to True for each iteration + cluster_ready = True + + # Get updated snapshot view of Cluster for each iteration + ret = v1.list_node(watch=False) + + # Check the current state of nodes that are not in Ready state + # from the previous iteration + for j in ret.items: + if j.metadata.name in not_ready_node_list: + if j.status.conditions[-1].status != 'True': + # Set cluster_ready to False + cluster_ready = False + + # Print current state of node + logging.info("Node %s is not Ready", j.metadata.name) + logging.debug("Current status of %s is %s", + j.metadata.name, + j.status.conditions[-1].message) + else: + # Remove 'Ready' node from list + not_ready_node_list.remove(j.metadata.name) + + logging.info("Node %s is in Ready state", j.metadata.name) + + # Raise Time Out Exception + if not cluster_ready and i == end_range: + raise AirflowException("Timed Out! One or more Nodes fail to " + "get into Ready State!") + + # Exit loop if Cluster is in Ready state + if cluster_ready: + logging.info("All nodes are in Ready state") + break + else: + # Back off and check again in next iteration + logging.info("Wait for %d seconds...", int(interval)) + time.sleep(int(interval)) diff --git a/shipyard_airflow/plugins/check_k8s_pod_status.py b/shipyard_airflow/plugins/check_k8s_pod_status.py index b5eb3e58..cdf94cda 100644 --- a/shipyard_airflow/plugins/check_k8s_pod_status.py +++ b/shipyard_airflow/plugins/check_k8s_pod_status.py @@ -21,10 +21,9 @@ from kubernetes import client, config def check_pods_status(required_pods): """This function retrieves the current state of pods in the - Kubernetes cluster. We can use it to check the state of the - cluster join process (drydock/promenade) and determine if all - the bare metal nodes have successfully joined the Kubernetes - cluster. + Kubernetes cluster. We can use it to check that all the pods + that are of interest to us are up in the cluster. Function + returns boolean True/False and queries every 30 seconds. :param required_pods: List of pods that are of interest to us @@ -32,12 +31,12 @@ def check_pods_status(required_pods): from check_k8s_pod_status import check_pods_status - join_complete = False + pods_ready = False required_pods = ['ceph-', 'calico-', 'coredns-', 'kubernetes-'] # Time out can be set as required - while not join_complete: - join_complete = check_pods_status(required_pods) + while not pods_ready: + pods_ready = check_pods_status(required_pods) """ # Note that we are using 'in_cluster_config' config.load_incluster_config() @@ -52,7 +51,7 @@ def check_pods_status(required_pods): # Return Boolean 'False' if required pods are not in # 'Succeeded' or 'Running' state if i.status.phase not in ['Succeeded', 'Running']: - logging.info("Cluster is not in ready state...") + logging.info("Pods are not in ready state...") logging.info("%s is in %s state", i.metadata.name, i.status.phase) logging.info("Wait for 30 seconds...")