From 919909ba514bae3cddb78542ea2505d210575c0c Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Sun, 15 Jul 2018 13:46:14 +0300 Subject: [PATCH] Support MP cluster unavailable managers This patch will add support for the case nsxlib is configured with a cluster (few nsx managers), which change their availability, and in a specific point in time might all be DOWN. 1) The nsxlib will succeed even if currently all the managers are unavailable (state DOWN) 2) By configuration of cluster_unavailable_retry=True, when a request is issues and all managers are DOWN, the endpoint selected will be retried until one of the managers is back UP, or until max retries is reached (10 by default) Change-Id: I2e3d1a9734f37ef82859baf0082b39c11d6ce149 --- vmware_nsxlib/tests/unit/v3/test_cluster.py | 42 ++++++++++++++++ vmware_nsxlib/v3/cluster.py | 54 ++++++++++++++++++--- vmware_nsxlib/v3/config.py | 10 +++- vmware_nsxlib/v3/utils.py | 9 ++++ 4 files changed, 105 insertions(+), 10 deletions(-) diff --git a/vmware_nsxlib/tests/unit/v3/test_cluster.py b/vmware_nsxlib/tests/unit/v3/test_cluster.py index a3bc2c83..2a6261da 100644 --- a/vmware_nsxlib/tests/unit/v3/test_cluster.py +++ b/vmware_nsxlib/tests/unit/v3/test_cluster.py @@ -260,6 +260,48 @@ class ClusteredAPITestCase(nsxlib_testcase.NsxClientTestCase): eps[0]._state = cluster.EndpointState.UP self.assertEqual(_get_schedule(4), [eps[0], eps[2], eps[0], eps[2]]) + def test_cluster_select_endpoint(self): + conf_managers = ['8.9.10.11', '9.10.11.12', '10.11.12.13'] + api = self.mock_nsx_clustered_api(nsx_api_managers=conf_managers) + api._validate = mock.Mock() + eps = list(api._endpoints.values()) + + # all up - select the first one + self.assertEqual(api._select_endpoint(), eps[0]) + + # run again - select the 2nd + self.assertEqual(api._select_endpoint(), eps[1]) + + # all down - return None + eps[0]._state = cluster.EndpointState.DOWN + eps[1]._state = cluster.EndpointState.DOWN + eps[2]._state = cluster.EndpointState.DOWN + self.assertEqual(api._select_endpoint(), None) + + # up till now the validate method should not have been called + self.assertEqual(api._validate.call_count, 0) + + # set up the retries flag, and check that validate was called + # until retries have been exhausted + api.nsxlib_config.cluster_unavailable_retry = True + self.assertEqual(api._select_endpoint(), None) + self.assertEqual(api._validate.call_count, + api.nsxlib_config.max_attempts * len(eps)) + + # simulate the case where 1 endpoint finally goes up + self.validate_count = 0 + self.max_validate = 9 + + def _mock_validate(ep): + if self.validate_count >= self.max_validate: + ep._state = cluster.EndpointState.UP + self.validate_count += 1 + + api._validate = _mock_validate + self.assertEqual(api._select_endpoint(), + eps[(self.max_validate - 1) % len(eps)]) + self.assertEqual(self.validate_count, self.max_validate + 1) + def test_reinitialize_cluster(self): with mock.patch.object(cluster.TimeoutSession, 'request', return_value=get_sess_create_resp()): diff --git a/vmware_nsxlib/v3/cluster.py b/vmware_nsxlib/v3/cluster.py index ad11551f..f4343a96 100644 --- a/vmware_nsxlib/v3/cluster.py +++ b/vmware_nsxlib/v3/cluster.py @@ -33,10 +33,12 @@ from requests import adapters from requests import exceptions as requests_exceptions import six import six.moves.urllib.parse as urlparse +import tenacity from vmware_nsxlib._i18n import _ from vmware_nsxlib.v3 import client as nsx_client from vmware_nsxlib.v3 import exceptions +from vmware_nsxlib.v3 import utils LOG = log.getLogger(__name__) @@ -401,7 +403,12 @@ class ClusteredAPI(object): def _create_conn(p): def _conn(): # called when a pool needs to create a new connection - return self._http_provider.new_connection(self, p) + try: + return self._http_provider.new_connection(self, p) + except Exception as e: + if self._http_provider.is_conn_open_exception(e): + LOG.warning("Timeout while trying to open a " + "connection with %s", p) return _conn @@ -484,6 +491,11 @@ class ClusteredAPI(object): def _validate(self, endpoint): try: with endpoint.pool.item() as conn: + if not conn: + LOG.warning("No connection established with endpoint " + "%(ep)s. ", {'ep': endpoint}) + endpoint.set_state(EndpointState.DOWN) + return self._http_provider.validate_connection(self, endpoint, conn) endpoint.set_state(EndpointState.UP) except exceptions.ClientCertificateNotTrusted: @@ -505,13 +517,39 @@ class ClusteredAPI(object): {'ep': endpoint, 'err': e}) def _select_endpoint(self): - # check for UP state until exhausting all endpoints - seen, total = 0, len(self._endpoints.values()) - while seen < total: - endpoint = next(self._endpoint_schedule) - if endpoint.state == EndpointState.UP: - return endpoint - seen += 1 + """Return an endpoint in UP state. + + Go over all endpoint and return the next one which is UP + If all endpoints are currently DOWN, depending on the configuration + retry it until one is UP (or max retries exceeded) + """ + def _select_endpoint_internal(refresh=False): + # check for UP state until exhausting all endpoints + seen, total = 0, len(self._endpoints.values()) + while seen < total: + endpoint = next(self._endpoint_schedule) + if refresh: + self._validate(endpoint) + if endpoint.state == EndpointState.UP: + return endpoint + seen += 1 + + @utils.retry_upon_none_result(self.nsxlib_config.max_attempts) + def _select_endpoint_internal_with_retry(): + # redo endpoint selection with refreshing states + return _select_endpoint_internal(refresh=True) + + # First attempt to get an UP endpoint + endpoint = _select_endpoint_internal() + if endpoint or not self.nsxlib_config.cluster_unavailable_retry: + return endpoint + + # Retry the selection while refreshing the endpoints state + try: + return _select_endpoint_internal_with_retry() + except tenacity.RetryError: + # exhausted number of retries + return None def endpoint_for_connection(self, conn): # check all endpoint pools diff --git a/vmware_nsxlib/v3/config.py b/vmware_nsxlib/v3/config.py index 9cb6efb9..95812ac5 100644 --- a/vmware_nsxlib/v3/config.py +++ b/vmware_nsxlib/v3/config.py @@ -73,7 +73,11 @@ class NsxLibConfig(object): the requests, to allow admin user to update/ delete all entries. :param rate_limit_retry: If True, the client will retry requests failed on - "Too many requests" error + "Too many requests" error. + :param cluster_unavailable_retry: If True, skip fatal errors when no + endpoint in the NSX management cluster is + available to serve a request, and retry + the request instead. """ def __init__(self, @@ -97,7 +101,8 @@ class NsxLibConfig(object): dns_domain='openstacklocal', dhcp_profile_uuid=None, allow_overwrite_header=False, - rate_limit_retry=True): + rate_limit_retry=True, + cluster_unavailable_retry=False): self.nsx_api_managers = nsx_api_managers self._username = username @@ -119,6 +124,7 @@ class NsxLibConfig(object): self.dns_domain = dns_domain self.allow_overwrite_header = allow_overwrite_header self.rate_limit_retry = rate_limit_retry + self.cluster_unavailable_retry = cluster_unavailable_retry if dhcp_profile_uuid: # this is deprecated, and never used. diff --git a/vmware_nsxlib/v3/utils.py b/vmware_nsxlib/v3/utils.py index 59e0450e..3da6c05a 100644 --- a/vmware_nsxlib/v3/utils.py +++ b/vmware_nsxlib/v3/utils.py @@ -175,6 +175,15 @@ def retry_random_upon_exception(exc, delay=0.5, max_delay=5, before=_log_before_retry, after=_log_after_retry) +def retry_upon_none_result(max_attempts, delay=0.5, max_delay=2): + return tenacity.retry(reraise=True, + retry=tenacity.retry_if_result(lambda x: x is None), + wait=tenacity.wait_exponential( + multiplier=delay, max=max_delay), + stop=tenacity.stop_after_attempt(max_attempts), + before=_log_before_retry, after=_log_after_retry) + + def list_match(list1, list2): # Check if list1 and list2 have identical elements, but relaxed on # dict elements where list1's dict element can be a subset of list2's