Add SSL cert verification regression tests
A security bug (1357430) was introduced which meant that SSL certificate
verification was not occurring.
Add new tests which help prevent the 'requests' part of bug 115260
recurring.
Note: Cherry-picking onto the stable branch -- these tests are required
for proper testing of one of the SSL related fixes.
Change-Id: Iaf56fd8bc34fa8f35c2fd7051f9f8424002352cf
Related-bug: 1357430
(cherry picked from commit 64a1a0fdcc
)
This commit is contained in:
parent
7184759029
commit
e572ec1d1f
|
@ -20,42 +20,102 @@ try:
|
|||
from requests.packages.urllib3 import poolmanager
|
||||
except ImportError:
|
||||
from urllib3 import poolmanager
|
||||
import six
|
||||
import ssl
|
||||
import testtools
|
||||
import threading
|
||||
|
||||
from glanceclient.common import http
|
||||
from glanceclient.common import https
|
||||
|
||||
from glanceclient import Client
|
||||
from glanceclient import exc
|
||||
|
||||
if six.PY3 is True:
|
||||
import socketserver
|
||||
else:
|
||||
import SocketServer as socketserver
|
||||
|
||||
|
||||
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'var'))
|
||||
|
||||
|
||||
class TestRequestsIntegration(testtools.TestCase):
|
||||
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
def handle(self):
|
||||
self.request.recv(1024)
|
||||
response = b'somebytes'
|
||||
self.request.sendall(response)
|
||||
|
||||
def test_pool_patch(self):
|
||||
client = http.HTTPClient("https://localhost",
|
||||
ssl_compression=True)
|
||||
self.assertNotEqual(https.HTTPSConnectionPool,
|
||||
poolmanager.pool_classes_by_scheme["https"])
|
||||
|
||||
adapter = client.session.adapters.get("https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
def get_request(self):
|
||||
key_file = os.path.join(TEST_VAR_DIR, 'privatekey.key')
|
||||
cert_file = os.path.join(TEST_VAR_DIR, 'certificate.crt')
|
||||
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
||||
(_sock, addr) = socketserver.TCPServer.get_request(self)
|
||||
sock = ssl.wrap_socket(_sock,
|
||||
certfile=cert_file,
|
||||
keyfile=key_file,
|
||||
ca_certs=cacert,
|
||||
server_side=True,
|
||||
cert_reqs=ssl.CERT_REQUIRED)
|
||||
return sock, addr
|
||||
|
||||
adapter = client.session.adapters.get("glance+https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
|
||||
def test_custom_https_adapter(self):
|
||||
client = http.HTTPClient("https://localhost",
|
||||
ssl_compression=False)
|
||||
self.assertNotEqual(https.HTTPSConnectionPool,
|
||||
poolmanager.pool_classes_by_scheme["https"])
|
||||
class TestHTTPSVerifyCert(testtools.TestCase):
|
||||
"""Check 'requests' based ssl verification occurs
|
||||
|
||||
adapter = client.session.adapters.get("https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
The requests library performs SSL certificate validation,
|
||||
however there is still a need to check that the glance
|
||||
client is properly integrated with requests so that
|
||||
cert validation actually happens.
|
||||
"""
|
||||
def setUp(self):
|
||||
# Rather than spinning up a new process, we create
|
||||
# a thread to perform client/server interaction.
|
||||
# This should run more quickly.
|
||||
super(TestHTTPSVerifyCert, self).setUp()
|
||||
server = ThreadedTCPServer(('127.0.0.1', 0),
|
||||
ThreadedTCPRequestHandler)
|
||||
__, self.port = server.server_address
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
adapter = client.session.adapters.get("glance+https://")
|
||||
self.assertTrue(isinstance(adapter, https.HTTPSAdapter))
|
||||
def test_v1_requests_cert_verification(self):
|
||||
"""v1 regression test for bug 115260."""
|
||||
port = self.port
|
||||
url = 'https://0.0.0.0:%d' % port
|
||||
|
||||
try:
|
||||
client = Client('1', url,
|
||||
insecure=False,
|
||||
ssl_compression=True)
|
||||
client.images.get('image123')
|
||||
self.fail('No SSL exception raised')
|
||||
except exc.CommunicationError as e:
|
||||
if 'certificate verify failed' not in e.message:
|
||||
self.fail('No certificate failure message received')
|
||||
except Exception as e:
|
||||
self.fail('Unexpected exception raised')
|
||||
|
||||
def test_v2_requests_cert_verification(self):
|
||||
"""v2 regression test for bug 115260."""
|
||||
port = self.port
|
||||
url = 'https://0.0.0.0:%d' % port
|
||||
|
||||
try:
|
||||
gc = Client('2', url,
|
||||
insecure=False,
|
||||
ssl_compression=True)
|
||||
gc.images.get('image123')
|
||||
self.fail('No SSL exception raised')
|
||||
except exc.CommunicationError as e:
|
||||
if 'certificate verify failed' not in e.message:
|
||||
self.fail('No certificate failure message received')
|
||||
except Exception as e:
|
||||
self.fail('Unexpected exception raised')
|
||||
|
||||
|
||||
class TestVerifiedHTTPSConnection(testtools.TestCase):
|
||||
|
@ -328,3 +388,30 @@ class TestVerifiedHTTPSConnection(testtools.TestCase):
|
|||
cacert=cacert)
|
||||
except exc.SSLConfigurationError:
|
||||
self.fail('Failed to init VerifiedHTTPSConnection.')
|
||||
|
||||
|
||||
class TestRequestsIntegration(testtools.TestCase):
|
||||
|
||||
def test_pool_patch(self):
|
||||
client = http.HTTPClient("https://localhost",
|
||||
ssl_compression=True)
|
||||
self.assertNotEqual(https.HTTPSConnectionPool,
|
||||
poolmanager.pool_classes_by_scheme["https"])
|
||||
|
||||
adapter = client.session.adapters.get("https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
|
||||
adapter = client.session.adapters.get("glance+https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
|
||||
def test_custom_https_adapter(self):
|
||||
client = http.HTTPClient("https://localhost",
|
||||
ssl_compression=False)
|
||||
self.assertNotEqual(https.HTTPSConnectionPool,
|
||||
poolmanager.pool_classes_by_scheme["https"])
|
||||
|
||||
adapter = client.session.adapters.get("https://")
|
||||
self.assertFalse(isinstance(adapter, https.HTTPSAdapter))
|
||||
|
||||
adapter = client.session.adapters.get("glance+https://")
|
||||
self.assertTrue(isinstance(adapter, https.HTTPSAdapter))
|
||||
|
|
Loading…
Reference in New Issue