Reduce retries during cluster health validation
This change reduces retries during cluster health validation. There are multiple retry levels today: * retry on urllib3 http level * retry in validating cluster health * retry in _proxy_internal This causes retry storm, which brings significant delays to API calls. This is especially relevant when nsxlib is configured with cluster_unavailable_retry = True (this is always the case with single endpoint). This change reduces configurable retry attempts in cluster health validation to single retry per endpoint. In addition, this change fixes scenario when client configures nsxlib with no validation, in which case cluster should not mark endpoint as UP in validation related code. Change-Id: I33b4101a0e0c0f4088e10776e126cc495dabd89c
This commit is contained in:
parent
cddf191084
commit
1e675ae459
@ -175,7 +175,7 @@ class MemoryMockAPIProvider(nsx_cluster.AbstractHTTPProvider):
|
|||||||
return "Memory mock API"
|
return "Memory mock API"
|
||||||
|
|
||||||
def validate_connection(self, cluster_api, endpoint, conn):
|
def validate_connection(self, cluster_api, endpoint, conn):
|
||||||
return
|
return True
|
||||||
|
|
||||||
def new_connection(self, cluster_api, provider):
|
def new_connection(self, cluster_api, provider):
|
||||||
# all callers use the same backing
|
# all callers use the same backing
|
||||||
@ -335,6 +335,7 @@ class NsxClientTestCase(NsxLibTestCase):
|
|||||||
|
|
||||||
def validate_connection(self, cluster_api, endpoint, conn):
|
def validate_connection(self, cluster_api, endpoint, conn):
|
||||||
assert conn is not None
|
assert conn is not None
|
||||||
|
return True
|
||||||
|
|
||||||
def mock_nsx_clustered_api(self, session_response=None, **kwargs):
|
def mock_nsx_clustered_api(self, session_response=None, **kwargs):
|
||||||
orig_request = nsx_cluster.TimeoutSession.request
|
orig_request = nsx_cluster.TimeoutSession.request
|
||||||
|
@ -30,7 +30,7 @@ from vmware_nsxlib.v3 import exceptions as nsxlib_exc
|
|||||||
|
|
||||||
|
|
||||||
def _validate_conn_up(*args, **kwargs):
|
def _validate_conn_up(*args, **kwargs):
|
||||||
return
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _validate_conn_down(*args, **kwargs):
|
def _validate_conn_down(*args, **kwargs):
|
||||||
@ -312,6 +312,8 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||||||
if endpoint.provider.id == '8.9.10.11':
|
if endpoint.provider.id == '8.9.10.11':
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
self._test_health(_validate, cluster.ClusterHealth.ORANGE)
|
self._test_health(_validate, cluster.ClusterHealth.ORANGE)
|
||||||
|
|
||||||
def test_green_health(self):
|
def test_green_health(self):
|
||||||
@ -415,8 +417,7 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
|
|||||||
# until retries have been exhausted
|
# until retries have been exhausted
|
||||||
api.nsxlib_config.cluster_unavailable_retry = True
|
api.nsxlib_config.cluster_unavailable_retry = True
|
||||||
self.assertEqual(api._select_endpoint(), None)
|
self.assertEqual(api._select_endpoint(), None)
|
||||||
self.assertEqual(api._validate.call_count,
|
self.assertEqual(api._validate.call_count, len(eps))
|
||||||
api.nsxlib_config.max_attempts * len(eps))
|
|
||||||
|
|
||||||
# simulate the case where 1 endpoint finally goes up
|
# simulate the case where 1 endpoint finally goes up
|
||||||
self.validate_count = 0
|
self.validate_count = 0
|
||||||
|
@ -215,6 +215,7 @@ class RESTClient(object):
|
|||||||
silent=False, expected_results=None, **kwargs):
|
silent=False, expected_results=None, **kwargs):
|
||||||
request_headers = headers.copy() if headers else {}
|
request_headers = headers.copy() if headers else {}
|
||||||
request_headers.update(self._default_headers)
|
request_headers.update(self._default_headers)
|
||||||
|
|
||||||
if utils.INJECT_HEADERS_CALLBACK:
|
if utils.INJECT_HEADERS_CALLBACK:
|
||||||
inject_headers = utils.INJECT_HEADERS_CALLBACK()
|
inject_headers = utils.INJECT_HEADERS_CALLBACK()
|
||||||
request_headers.update(inject_headers)
|
request_headers.update(inject_headers)
|
||||||
|
@ -33,7 +33,6 @@ from requests import adapters
|
|||||||
from requests import exceptions as requests_exceptions
|
from requests import exceptions as requests_exceptions
|
||||||
import six
|
import six
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
import tenacity
|
|
||||||
import urllib3
|
import urllib3
|
||||||
|
|
||||||
from vmware_nsxlib._i18n import _
|
from vmware_nsxlib._i18n import _
|
||||||
@ -186,22 +185,32 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
|
|||||||
return "%s-%s" % (requests.__title__, requests.__version__)
|
return "%s-%s" % (requests.__title__, requests.__version__)
|
||||||
|
|
||||||
def validate_connection(self, cluster_api, endpoint, conn):
|
def validate_connection(self, cluster_api, endpoint, conn):
|
||||||
|
# We don't need to retry with different endpoint during validation,
|
||||||
|
# thus limit max_attempts to 1
|
||||||
|
# on connection level, validation will be retried according to
|
||||||
|
# nsxlib 'retries' and 'http_timeout' parameters.
|
||||||
client = nsx_client.NSX3Client(
|
client = nsx_client.NSX3Client(
|
||||||
conn, url_prefix=endpoint.provider.url,
|
conn, url_prefix=endpoint.provider.url,
|
||||||
url_path_base=cluster_api.nsxlib_config.url_base,
|
url_path_base=cluster_api.nsxlib_config.url_base,
|
||||||
default_headers=conn.default_headers)
|
default_headers=conn.default_headers,
|
||||||
|
max_attempts=1)
|
||||||
|
|
||||||
|
validation_done = False
|
||||||
# Check the manager state directly
|
# Check the manager state directly
|
||||||
if cluster_api.nsxlib_config.validate_connection_method:
|
if cluster_api.nsxlib_config.validate_connection_method:
|
||||||
cluster_api.nsxlib_config.validate_connection_method(
|
cluster_api.nsxlib_config.validate_connection_method(
|
||||||
client, endpoint.provider.url)
|
client, endpoint.provider.url)
|
||||||
|
validation_done = True
|
||||||
|
|
||||||
# If keeplive section returns a list, it is assumed to be non-empty
|
# If keeplive section returns a list, it is assumed to be non-empty
|
||||||
keepalive_section = cluster_api.nsxlib_config.keepalive_section
|
keepalive_section = cluster_api.nsxlib_config.keepalive_section
|
||||||
# When validate connection also has the effect of keep-alive,
|
# When validate connection also has the effect of keep-alive,
|
||||||
# keepalive_section can be disabled by passing in an empty value
|
# keepalive_section can be disabled by passing in an empty value
|
||||||
if keepalive_section:
|
if keepalive_section:
|
||||||
result = client.get(keepalive_section, silent=True)
|
result = client.get(keepalive_section,
|
||||||
|
silent=True,
|
||||||
|
with_retries=False)
|
||||||
|
validation_done = True
|
||||||
if not result or result.get('result_count', 1) <= 0:
|
if not result or result.get('result_count', 1) <= 0:
|
||||||
msg = _("No %(section)s found "
|
msg = _("No %(section)s found "
|
||||||
"for '%(url)s'") % {'section': keepalive_section,
|
"for '%(url)s'") % {'section': keepalive_section,
|
||||||
@ -210,6 +219,8 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
|
|||||||
raise exceptions.ResourceNotFound(
|
raise exceptions.ResourceNotFound(
|
||||||
manager=endpoint.provider.url, operation=msg)
|
manager=endpoint.provider.url, operation=msg)
|
||||||
|
|
||||||
|
return validation_done
|
||||||
|
|
||||||
def new_connection(self, cluster_api, provider):
|
def new_connection(self, cluster_api, provider):
|
||||||
config = cluster_api.nsxlib_config
|
config = cluster_api.nsxlib_config
|
||||||
session = TimeoutSession(config.http_timeout,
|
session = TimeoutSession(config.http_timeout,
|
||||||
@ -574,8 +585,12 @@ class ClusteredAPI(object):
|
|||||||
def _validate(self, endpoint):
|
def _validate(self, endpoint):
|
||||||
try:
|
try:
|
||||||
with endpoint.pool.item() as conn:
|
with endpoint.pool.item() as conn:
|
||||||
self._http_provider.validate_connection(self, endpoint, conn)
|
# with some configurations, validation will be skipped
|
||||||
endpoint.set_state(EndpointState.UP)
|
result = self._http_provider.validate_connection(self,
|
||||||
|
endpoint,
|
||||||
|
conn)
|
||||||
|
if result:
|
||||||
|
endpoint.set_state(EndpointState.UP)
|
||||||
except exceptions.ClientCertificateNotTrusted:
|
except exceptions.ClientCertificateNotTrusted:
|
||||||
LOG.warning("Failed to validate API cluster endpoint "
|
LOG.warning("Failed to validate API cluster endpoint "
|
||||||
"'%(ep)s' due to untrusted client certificate",
|
"'%(ep)s' due to untrusted client certificate",
|
||||||
@ -612,22 +627,13 @@ class ClusteredAPI(object):
|
|||||||
return endpoint
|
return endpoint
|
||||||
seen += 1
|
seen += 1
|
||||||
|
|
||||||
@utils.retry_upon_none_result(self.nsxlib_config.max_attempts)
|
|
||||||
def _select_endpoint_internal_with_retry():
|
|
||||||
# redo endpoint selection with refreshing states
|
|
||||||
return _select_endpoint_internal(refresh=True)
|
|
||||||
|
|
||||||
# First attempt to get an UP endpoint
|
# First attempt to get an UP endpoint
|
||||||
endpoint = _select_endpoint_internal()
|
endpoint = _select_endpoint_internal()
|
||||||
if endpoint or not self.nsxlib_config.cluster_unavailable_retry:
|
if endpoint or not self.nsxlib_config.cluster_unavailable_retry:
|
||||||
return endpoint
|
return endpoint
|
||||||
|
|
||||||
# Retry the selection while refreshing the endpoints state
|
# Retry the selection while refreshing the endpoints state
|
||||||
try:
|
return _select_endpoint_internal(refresh=True)
|
||||||
return _select_endpoint_internal_with_retry()
|
|
||||||
except tenacity.RetryError:
|
|
||||||
# exhausted number of retries
|
|
||||||
return None
|
|
||||||
|
|
||||||
def endpoint_for_connection(self, conn):
|
def endpoint_for_connection(self, conn):
|
||||||
# check all endpoint pools
|
# check all endpoint pools
|
||||||
|
Loading…
x
Reference in New Issue
Block a user