Enhance wsgi to listen on ipv6 address

Check if the hostname is ipv6 and set the family appropriately.
Add tests to ensure that IPv6 and IPv6 with SSL works properly.

Change-Id: Ibcf0a9387691d124888c0c0540d4322b0a3b3d67
This commit is contained in:
Davanum Srinivas 2013-01-14 22:37:07 -05:00
parent e1abe0fca3
commit 7a4e3738ea
4 changed files with 118 additions and 7 deletions

View File

@ -20,6 +20,7 @@
"""Utility methods for working with WSGI servers."""
import socket
import sys
import eventlet.wsgi
@ -74,23 +75,35 @@ class Server(object):
{'arg0': sys.argv[0],
'host': self.host,
'port': self.port})
socket = eventlet.listen((self.host, self.port), backlog=backlog)
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(self.host,
self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
_socket = eventlet.listen(info[-1],
family=info[0],
backlog=backlog)
if key:
self.socket_info[key] = socket.getsockname()
self.socket_info[key] = _socket.getsockname()
# SSL is enabled
if self.do_ssl:
if self.cert_required:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
sslsocket = eventlet.wrap_ssl(socket, certfile=self.certfile,
sslsocket = eventlet.wrap_ssl(_socket, certfile=self.certfile,
keyfile=self.keyfile,
server_side=True,
cert_reqs=cert_reqs,
ca_certs=self.ca_certs)
socket = sslsocket
_socket = sslsocket
self.greenthread = self.pool.spawn(self._run, self.application, socket)
self.greenthread = self.pool.spawn(self._run,
self.application,
_socket)
def set_ssl(self, certfile, keyfile=None, ca_certs=None,
cert_required=True):

View File

@ -276,9 +276,9 @@ class TestCase(NoModule, unittest.TestCase):
return deploy.appconfig(self._paste_config(config))
def serveapp(self, config, name=None, cert=None, key=None, ca=None,
cert_required=None):
cert_required=None, host="127.0.0.1", port=0):
app = self.loadapp(config, name=name)
server = wsgi.Server(app, host="127.0.0.1", port=0)
server = wsgi.Server(app, host, port)
if cert is not None and ca is not None and key is not None:
server.set_ssl(certfile=cert, keyfile=key, ca_certs=ca,
cert_required=cert_required)

51
tests/test_ipv6.py Normal file
View File

@ -0,0 +1,51 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httplib
import os
import ssl
from keystone import config
from keystone import test
CONF = config.CONF
class IPv6TestCase(test.TestCase):
def setUp(self):
super(IPv6TestCase, self).setUp()
self.load_backends()
def test_ipv6_ok(self):
"""
Make sure both public and admin API work with ipv6.
"""
self.public_server = self.serveapp('keystone', name='main',
host="::1", port=0)
self.admin_server = self.serveapp('keystone', name='admin',
host="::1", port=0)
# Verify Admin
conn = httplib.HTTPConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = httplib.HTTPConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)

View File

@ -81,6 +81,53 @@ class SSLTestCase(test.TestCase):
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_1way_ssl_with_ipv6_ok(self):
"""
Make sure both public and admin API work with 1-way ipv6 & SSL.
"""
self.public_server = self.serveapp('keystone', name='main',
cert=CERT, key=KEY, ca=CA,
host="::1", port=0)
self.admin_server = self.serveapp('keystone', name='admin',
cert=CERT, key=KEY, ca=CA,
host="::1", port=0)
# Verify Admin
conn = httplib.HTTPSConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = httplib.HTTPSConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_2way_ssl_with_ipv6_ok(self):
"""
Make sure both public and admin API work with 2-way ipv6 & SSL.
Requires client certificate.
"""
self.public_server = self.serveapp(
'keystone', name='main', cert=CERT,
key=KEY, ca=CA, cert_required=True,
host="::1", port=0)
self.admin_server = self.serveapp(
'keystone', name='admin', cert=CERT,
key=KEY, ca=CA, cert_required=True,
host="::1", port=0)
# Verify Admin
conn = httplib.HTTPSConnection(
'::1', CONF.admin_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = httplib.HTTPSConnection(
'::1', CONF.public_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_2way_ssl_fail(self):
"""
Expect to fail when client does not present proper certificate.