
Had a discussion with Scott and I agree that checking node status (instead of pod readiness) should be the way to determine if the cluster-join process is completed. This P.S. adds the operator to address that in the short run. The long term solution will be to add a promenade API endpoint that checks cluster-join and Airflow will query that endpoint instead. This P.S. also updates some of the comments/wordings used in the 'check_k8s_pod_status' operator. Change-Id: I2e5390f1f64c7d8ce374b2537e6d14e65dffecd6
105 lines
4.0 KiB
Python
105 lines
4.0 KiB
Python
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
import time
|
|
|
|
from airflow.exceptions import AirflowException
|
|
from kubernetes import client, config
|
|
|
|
|
|
def check_node_status(time_out, interval):
|
|
"""This function retrieves the current state of the nodes in the
|
|
Kubernetes cluster. We can use it to check the state of the
|
|
cluster join process (drydock/promenade) and determine if all
|
|
the bare metal nodes have successfully joined the Kubernetes
|
|
cluster.
|
|
|
|
:param time_out: Node should be in Ready state before Time Out
|
|
:param interval: Time interval in which we query node state
|
|
|
|
Example::
|
|
|
|
from check_k8s_node_status import check_node_status
|
|
|
|
# Calls function to check that all nodes are in Ready State
|
|
# Time out in this case is set to 15 mins, the time interval
|
|
# has been set to 60 seconds
|
|
check_node_status(900, 60)
|
|
"""
|
|
# Initialize Variable
|
|
not_ready_node_list = []
|
|
|
|
# Note that we are using 'in_cluster_config'
|
|
config.load_incluster_config()
|
|
v1 = client.CoreV1Api()
|
|
|
|
# Logs initial state of all nodes in the cluster
|
|
ret_init = v1.list_node(watch=False)
|
|
|
|
for i in ret_init.items:
|
|
logging.info("Current state of nodes in Cluster is")
|
|
logging.info("%s\t%s\t%s", i.metadata.name,
|
|
i.status.conditions[-1].status,
|
|
i.status.conditions[-1].type)
|
|
|
|
# Populates the list of nodes in the Cluster
|
|
not_ready_node_list.append(i.metadata.name)
|
|
|
|
# Calculate number of times to execute the 'for' loop
|
|
# Ensure that 'time_out' and 'interval' is passed in as integer
|
|
# The result from the division will be a floating number which
|
|
# We will round off to nearest whole number
|
|
end_range = round(int(time_out) / int(interval))
|
|
|
|
for i in range(0, end_range + 1):
|
|
# Reset node_ready to True for each iteration
|
|
cluster_ready = True
|
|
|
|
# Get updated snapshot view of Cluster for each iteration
|
|
ret = v1.list_node(watch=False)
|
|
|
|
# Check the current state of nodes that are not in Ready state
|
|
# from the previous iteration
|
|
for j in ret.items:
|
|
if j.metadata.name in not_ready_node_list:
|
|
if j.status.conditions[-1].status != 'True':
|
|
# Set cluster_ready to False
|
|
cluster_ready = False
|
|
|
|
# Print current state of node
|
|
logging.info("Node %s is not Ready", j.metadata.name)
|
|
logging.debug("Current status of %s is %s",
|
|
j.metadata.name,
|
|
j.status.conditions[-1].message)
|
|
else:
|
|
# Remove 'Ready' node from list
|
|
not_ready_node_list.remove(j.metadata.name)
|
|
|
|
logging.info("Node %s is in Ready state", j.metadata.name)
|
|
|
|
# Raise Time Out Exception
|
|
if not cluster_ready and i == end_range:
|
|
raise AirflowException("Timed Out! One or more Nodes fail to "
|
|
"get into Ready State!")
|
|
|
|
# Exit loop if Cluster is in Ready state
|
|
if cluster_ready:
|
|
logging.info("All nodes are in Ready state")
|
|
break
|
|
else:
|
|
# Back off and check again in next iteration
|
|
logging.info("Wait for %d seconds...", int(interval))
|
|
time.sleep(int(interval))
|