Support MP cluster unavailable managers

This patch will add support for the case nsxlib is configured with
a cluster (few nsx managers), which change their availability, and
in a specific point in time might all be DOWN.

1) The nsxlib will succeed even if currently all the managers are
unavailable (state DOWN)
2) By configuration of cluster_unavailable_retry=True, when a request
is issues and all managers are DOWN, the endpoint selected will be
retried until one of the managers is back UP, or until max retries
is reached (10 by default)

Change-Id: I2e3d1a9734f37ef82859baf0082b39c11d6ce149
This commit is contained in:
Adit Sarfaty 2018-07-15 13:46:14 +03:00
parent aae966cc49
commit 919909ba51
4 changed files with 105 additions and 10 deletions

View File

@ -260,6 +260,48 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase):
eps[0]._state = cluster.EndpointState.UP
self.assertEqual(_get_schedule(4), [eps[0], eps[2], eps[0], eps[2]])
def test_cluster_select_endpoint(self):
conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13']
api = self.mock_nsx_clustered_api(nsx_api_managers=conf_managers)
api._validate = mock.Mock()
eps = list(api._endpoints.values())
# all up - select the first one
self.assertEqual(api._select_endpoint(), eps[0])
# run again - select the 2nd
self.assertEqual(api._select_endpoint(), eps[1])
# all down - return None
eps[0]._state = cluster.EndpointState.DOWN
eps[1]._state = cluster.EndpointState.DOWN
eps[2]._state = cluster.EndpointState.DOWN
self.assertEqual(api._select_endpoint(), None)
# up till now the validate method should not have been called
self.assertEqual(api._validate.call_count, 0)
# set up the retries flag, and check that validate was called
# until retries have been exhausted
api.nsxlib_config.cluster_unavailable_retry = True
self.assertEqual(api._select_endpoint(), None)
self.assertEqual(api._validate.call_count,
api.nsxlib_config.max_attempts * len(eps))
# simulate the case where 1 endpoint finally goes up
self.validate_count = 0
self.max_validate = 9
def _mock_validate(ep):
if self.validate_count >= self.max_validate:
ep._state = cluster.EndpointState.UP
self.validate_count += 1
api._validate = _mock_validate
self.assertEqual(api._select_endpoint(),
eps[(self.max_validate - 1) % len(eps)])
self.assertEqual(self.validate_count, self.max_validate + 1)
def test_reinitialize_cluster(self):
with mock.patch.object(cluster.TimeoutSession, 'request',
return_value=get_sess_create_resp()):

View File

@ -33,10 +33,12 @@ from requests import adapters
from requests import exceptions as requests_exceptions
import six
import six.moves.urllib.parse as urlparse
import tenacity
from vmware_nsxlib._i18n import _
from vmware_nsxlib.v3 import client as nsx_client
from vmware_nsxlib.v3 import exceptions
from vmware_nsxlib.v3 import utils
LOG = log.getLogger(__name__)
@ -401,7 +403,12 @@ class ClusteredAPI(object):
def _create_conn(p):
def _conn():
# called when a pool needs to create a new connection
return self._http_provider.new_connection(self, p)
try:
return self._http_provider.new_connection(self, p)
except Exception as e:
if self._http_provider.is_conn_open_exception(e):
LOG.warning("Timeout while trying to open a "
"connection with %s", p)
return _conn
@ -484,6 +491,11 @@ class ClusteredAPI(object):
def _validate(self, endpoint):
try:
with endpoint.pool.item() as conn:
if not conn:
LOG.warning("No connection established with endpoint "
"%(ep)s. ", {'ep': endpoint})
endpoint.set_state(EndpointState.DOWN)
return
self._http_provider.validate_connection(self, endpoint, conn)
endpoint.set_state(EndpointState.UP)
except exceptions.ClientCertificateNotTrusted:
@ -505,13 +517,39 @@ class ClusteredAPI(object):
{'ep': endpoint, 'err': e})
def _select_endpoint(self):
# check for UP state until exhausting all endpoints
seen, total = 0, len(self._endpoints.values())
while seen < total:
endpoint = next(self._endpoint_schedule)
if endpoint.state == EndpointState.UP:
return endpoint
seen += 1
"""Return an endpoint in UP state.
Go over all endpoint and return the next one which is UP
If all endpoints are currently DOWN, depending on the configuration
retry it until one is UP (or max retries exceeded)
"""
def _select_endpoint_internal(refresh=False):
# check for UP state until exhausting all endpoints
seen, total = 0, len(self._endpoints.values())
while seen < total:
endpoint = next(self._endpoint_schedule)
if refresh:
self._validate(endpoint)
if endpoint.state == EndpointState.UP:
return endpoint
seen += 1
@utils.retry_upon_none_result(self.nsxlib_config.max_attempts)
def _select_endpoint_internal_with_retry():
# redo endpoint selection with refreshing states
return _select_endpoint_internal(refresh=True)
# First attempt to get an UP endpoint
endpoint = _select_endpoint_internal()
if endpoint or not self.nsxlib_config.cluster_unavailable_retry:
return endpoint
# Retry the selection while refreshing the endpoints state
try:
return _select_endpoint_internal_with_retry()
except tenacity.RetryError:
# exhausted number of retries
return None
def endpoint_for_connection(self, conn):
# check all endpoint pools

View File

@ -73,7 +73,11 @@ class NsxLibConfig(object):
the requests, to allow admin user to update/
delete all entries.
:param rate_limit_retry: If True, the client will retry requests failed on
"Too many requests" error
"Too many requests" error.
:param cluster_unavailable_retry: If True, skip fatal errors when no
endpoint in the NSX management cluster is
available to serve a request, and retry
the request instead.
"""
def __init__(self,
@ -97,7 +101,8 @@ class NsxLibConfig(object):
dns_domain='openstacklocal',
dhcp_profile_uuid=None,
allow_overwrite_header=False,
rate_limit_retry=True):
rate_limit_retry=True,
cluster_unavailable_retry=False):
self.nsx_api_managers = nsx_api_managers
self._username = username
@ -119,6 +124,7 @@ class NsxLibConfig(object):
self.dns_domain = dns_domain
self.allow_overwrite_header = allow_overwrite_header
self.rate_limit_retry = rate_limit_retry
self.cluster_unavailable_retry = cluster_unavailable_retry
if dhcp_profile_uuid:
# this is deprecated, and never used.

View File

@ -175,6 +175,15 @@ def retry_random_upon_exception(exc, delay=0.5, max_delay=5,
before=_log_before_retry, after=_log_after_retry)
def retry_upon_none_result(max_attempts, delay=0.5, max_delay=2):
return tenacity.retry(reraise=True,
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_exponential(
multiplier=delay, max=max_delay),
stop=tenacity.stop_after_attempt(max_attempts),
before=_log_before_retry, after=_log_after_retry)
def list_match(list1, list2):
# Check if list1 and list2 have identical elements, but relaxed on
# dict elements where list1's dict element can be a subset of list2's