@ -20,6 +20,7 @@ import datetime
import eventlet
import itertools
import logging
import OpenSSL
import requests
import six
import six . moves . urllib . parse as urlparse
@ -33,7 +34,6 @@ from requests import exceptions as requests_exceptions
from vmware_nsxlib . _i18n import _ , _LI , _LW
from vmware_nsxlib . v3 import client as nsx_client
from vmware_nsxlib . v3 import exceptions
from vmware_nsxlib . v3 import nsx_constants
LOG = log . getLogger ( __name__ )
@ -88,14 +88,55 @@ class TimeoutSession(requests.Session):
def __init__ ( self , timeout , read_timeout ) :
self . timeout = timeout
self . read_timeout = read_timeout
self . cert_provider = None
super ( TimeoutSession , self ) . __init__ ( )
@property
def cert_provider ( self ) :
return self . _cert_provider
@cert_provider.setter
def cert_provider ( self , value ) :
self . _cert_provider = value
# wrapper timeouts at the session level
# see: https://goo.gl/xNk7aM
def request ( self , * args , * * kwargs ) :
if ' timeout ' not in kwargs :
kwargs [ ' timeout ' ] = ( self . timeout , self . read_timeout )
return super ( TimeoutSession , self ) . request ( * args , * * kwargs )
if not self . _cert_provider :
return super ( TimeoutSession , self ) . request ( * args , * * kwargs )
if self . cert is not None :
# connection should be open (unless server closed it),
# in which case cert is not needed
try :
return super ( TimeoutSession , self ) . request ( * args , * * kwargs )
except OpenSSL . SSL . Error as e :
# This is most probably "client cert not found" error (this
# happens when server closed the connection and requests
# reopen it). Try reloading client cert.
LOG . warning ( _LW ( " SSL error: %s , retrying.. " ) % e )
except OSError as e :
# Lack of client cert file can come in form of OSError,
# in this case filename will appear in the error. Try
# reloading client cert.
if self . _cert_provider . filename ( ) not in str ( e ) :
raise e
# Don't expose cert file name to the logs
LOG . warning ( _LW ( " Reloading client certificate.. " ) )
# The following with statement allows for preparing certificate and
# private key file and dispose it once connections are spawned
# (since PK is sensitive information, immediate disposal is
# important). This is done of first request of the session or when
# above exceptions indicate cert is missing.
with self . _cert_provider :
self . cert = self . _cert_provider . filename ( )
ret = super ( TimeoutSession , self ) . request ( * args , * * kwargs )
return ret
class NSXRequestsHTTPProvider ( AbstractHTTPProvider ) :
@ -122,8 +163,8 @@ class NSXRequestsHTTPProvider(AbstractHTTPProvider):
config = cluster_api . nsxlib_config
session = TimeoutSession ( config . http_timeout ,
config . http_read_timeout )
if provider. client_cert_file :
session . cert = provider . client_cert_file
if config. client_cert_provider :
session . cert _provider = config . client_cert_provider
else :
session . auth = ( provider . username , provider . password )
@ -178,13 +219,11 @@ class Provider(object):
Which has a unique id a connection URL , and the credential details .
"""
def __init__ ( self , provider_id , provider_url ,
username , password , ca_file , client_cert_file = None ) :
def __init__ ( self , provider_id , provider_url , username , password , ca_file ) :
self . id = provider_id
self . url = provider_url
self . username = username
self . password = password
self . client_cert_file = client_cert_file
self . ca_file = ca_file
def __str__ ( self ) :
@ -207,12 +246,6 @@ class Endpoint(object):
self . _state = EndpointState . INITIALIZED
self . _last_updated = datetime . datetime . now ( )
def regenerate_pool ( self ) :
self . pool = pools . Pool ( min_size = self . pool . min_size ,
max_size = self . pool . max_size ,
order_as_stack = True ,
create = self . pool . create )
@property
def last_updated ( self ) :
return self . _last_updated
@ -267,7 +300,6 @@ class ClusteredAPI(object):
self . _http_provider = http_provider
self . _keepalive_interval = keepalive_interval
self . _callbacks = { }
def _init_cluster ( * args , * * kwargs ) :
self . _init_endpoints ( providers ,
@ -288,6 +320,7 @@ class ClusteredAPI(object):
def _conn ( ) :
# called when a pool needs to create a new connection
return self . _http_provider . new_connection ( self , p )
return _conn
self . _endpoints = { }
@ -366,30 +399,11 @@ class ClusteredAPI(object):
if up == len ( self . _endpoints )
else ClusterHealth . ORANGE )
def subscribe ( self , callback , event ) :
if event in self . _callbacks :
self . _callbacks [ event ] . append ( callback )
else :
self . _callbacks [ event ] = [ callback ]
def _notify ( self , event ) :
if event in self . _callbacks :
for callback in self . _callbacks [ event ] :
callback ( )
def _validate ( self , endpoint ) :
try :
with endpoint . pool . item ( ) as conn :
self . _http_provider . validate_connection ( self , endpoint , conn )
endpoint . set_state ( EndpointState . UP )
except exceptions . ClientCertificateNotTrusted :
LOG . warning ( _LW ( " Failed to validate API cluster endpoint "
" ' %(ep)s ' due to untrusted client certificate " ) ,
{ ' ep ' : endpoint } )
# allow nsxlib user to reload certificate that possibly changed
self . _notify ( nsx_constants . ON_CLIENT_CERT_UNTRUSTED )
# regenerate connection pool based on new certificate
endpoint . regenerate_pool ( )
except Exception as e :
endpoint . set_state ( EndpointState . DOWN )
LOG . warning ( _LW ( " Failed to validate API cluster endpoint "
@ -525,6 +539,5 @@ class NSXClusteredAPI(ClusteredAPI):
urlparse . urlunparse ( conf_url ) ,
self . nsxlib_config . username ( provider_index ) ,
self . nsxlib_config . password ( provider_index ) ,
self . nsxlib_config . ca_file ( provider_index ) ,
self . nsxlib_config . client_cert_file ( provider_index ) ) )
self . nsxlib_config . ca_file ( provider_index ) ) )
return providers