Merge "Parameterize k8s health check decorator"

This commit is contained in:
Zuul 2025-03-06 14:56:23 +00:00 committed by Gerrit Code Review
commit d1f6a35476
2 changed files with 74 additions and 12 deletions

View File

@ -11,6 +11,7 @@
from __future__ import absolute_import
from distutils.version import LooseVersion
from functools import wraps
from ipaddress import ip_address
from ipaddress import IPv4Address
import json
@ -223,7 +224,7 @@ def k8s_health_check(tries=20, try_sleep=5, timeout=5,
return rc
def k8s_wait_for_endpoints_health(tries=20, try_sleep=5, timeout=5):
def k8s_wait_for_endpoints_health(tries=20, try_sleep=5, timeout=5, quiet=False):
""" This checks each k8s control-plane endpoint health in parallel
and waits for each endpoint to be up and running.
@ -233,6 +234,8 @@ def k8s_wait_for_endpoints_health(tries=20, try_sleep=5, timeout=5):
:param tries: maximum number of tries
:param try_sleep: sleep interval between tries (seconds)
:param timeout: timeout waiting for response (seconds)
:param quiet: log only failed endpoints if true
log everything if false (default)
:return: True if all endpoints are healthy
False if at least one of the enpoints is unhealthy
@ -247,8 +250,10 @@ def k8s_wait_for_endpoints_health(tries=20, try_sleep=5, timeout=5):
threadpool = greenpool.GreenPool(len(healthz_endpoints))
result = True
if not quiet:
LOG.info("Checking Kubernetes health...")
# Check endpoints in parallel
LOG.info("Checking Kubernetes health...")
for endpoint in healthz_endpoints:
threads[endpoint] = threadpool.spawn(k8s_health_check,
tries,
@ -269,25 +274,82 @@ def k8s_wait_for_endpoints_health(tries=20, try_sleep=5, timeout=5):
if unhealthy:
result = False
LOG.error(f"The following Kubernetes endpoints are unhealthy: {unhealthy}")
else:
elif not quiet:
LOG.info("All Kubernetes endpoints are healthy.")
return result
def test_k8s_health(function):
"""Decorator that checks if k8s endpoints are ready before calling the function.
def test_k8s_health(*dargs, **dkw):
""" Decorator function that instantiates the K8sEndpointsCheck object
param: function: The function to be wrapped.
:param *dargs: positional arguments passed to K8sEndpointsCheck object
:param **dkw: keyword arguments passed to the K8sEndpointsCheck object
Returns: The wrapped function that checks Kubernetes health.
:return: The resulting call to K8sEndpointsTest.call()
"""
def wrapper(*args, **kwargs):
if k8s_wait_for_endpoints_health():
return function(*args, **kwargs)
# support both @test_k8s_health and @test_k8s_health() as valid syntax
if len(dargs) == 1 and callable(dargs[0]):
def wrap_simple(f):
@wraps(f)
def wrapped_f(*args, **kw):
return K8sEndpointsTest().call(f, *args, **kw)
return wrapped_f
return wrap_simple(dargs[0])
else:
def wrap(f):
@wraps(f)
def wrapped_f(*args, **kw):
return K8sEndpointsTest(*dargs, **dkw).call(f, *args, **kw)
return wrapped_f
return wrap
class K8sEndpointsTest(object):
""" Kubernetes endpoint test wrapper
:param tries: maximum number of tries
:param try_sleep: sleep interval between tries (seconds)
:param timeout: timeout waiting for response (seconds)
:param quiet: log only failed endpoints if true
log everything if false (default)
"""
def __init__(self,
tries=20,
try_sleep=5,
timeout=5,
quiet=False):
self.tries = tries
self.try_sleep = try_sleep
self.timeout = timeout
self.quiet = quiet
def call(self, fn, *args, **kwargs):
""" Calls the endpoint health check
:param fn: function to be called if the health check succeeds
:param *dargs: positional arguments to be passed to the fn function
:param **dkw: keyword arguments to be passed to the the fn function
:return: The function that checks Kubernetes health.
"""
if k8s_wait_for_endpoints_health(tries=self.tries,
try_sleep=self.try_sleep,
timeout=self.timeout,
quiet=self.quiet):
return fn(*args, **kwargs)
else:
raise Exception("Kubernetes is not responsive.")
return wrapper
def get_kube_versions():

View File

@ -1734,7 +1734,7 @@ class AppOperator(object):
raise exception.ApplicationApplyFailure(name=app.name)
@kubernetes.test_k8s_health
@kubernetes.test_k8s_health(quiet=True)
def _get_helmrelease_info(release_name, namespace):
""" get helmrelease data from a given chart