Rewrites the serveapp method into a fixture

As a fixture we will no longer have to remember to kill the server when
you are done with it. Additionally we are able to use the fixture as a
context manager.

Change-Id: If283e12cde362d3eadc96318b5cf6799a813c14a
Partial-Bug: #1247443
This commit is contained in:
David Stanek 2013-11-03 16:26:36 +00:00
parent 2ab2c62435
commit 8ae6f39f46
6 changed files with 190 additions and 121 deletions

View File

@ -39,14 +39,11 @@ from keystone.openstack.common import gettextutils
# Accept-Language in the request rather than the Keystone server locale.
gettextutils.install('keystone', lazy=True)
from keystone.common import environment
environment.use_eventlet()
from keystone import assignment
from keystone import catalog
from keystone.common import cache
from keystone.common import dependency
from keystone.common import environment
from keystone.common import kvs
from keystone.common import sql
from keystone.common import utils
@ -384,20 +381,6 @@ class TestCase(testtools.TestCase):
def appconfig(self, config):
return deploy.appconfig(self._paste_config(config))
def serveapp(self, config, name=None, cert=None, key=None, ca=None,
cert_required=None, host="127.0.0.1", port=0):
app = self.loadapp(config, name=name)
server = environment.Server(app, host, port)
if cert is not None and ca is not None and key is not None:
server.set_ssl(certfile=cert, keyfile=key, ca_certs=ca,
cert_required=cert_required)
server.start(key='socket')
# Service catalog tests need to know the port we ran on.
port = server.socket_info['socket'][1]
self.opt(public_port=port, admin_port=port)
return server
def client(self, app, *args, **kw):
return TestClient(app, *args, **kw)

0
keystone/tests/fixtures/__init__.py vendored Normal file
View File

83
keystone/tests/fixtures/appserver.py vendored Normal file
View File

@ -0,0 +1,83 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import fixtures
from paste import deploy
from keystone.common import environment
from keystone import config
CONF = config.CONF
environment.use_eventlet()
MAIN = 'main'
ADMIN = 'admin'
class AppServer(fixtures.Fixture):
"""A fixture for managing an application server instance.
"""
def __init__(self, config, name, cert=None, key=None, ca=None,
cert_required=False, host='127.0.0.1', port=0):
super(AppServer, self).__init__()
self.config = config
self.name = name
self.cert = cert
self.key = key
self.ca = ca
self.cert_required = cert_required
self.host = host
self.port = port
def setUp(self):
super(AppServer, self).setUp()
app = deploy.loadapp(self.config, name=self.name)
self.server = environment.Server(app, self.host, self.port)
self._setup_SSL_if_requested()
self.server.start(key='socket')
# some tests need to know the port we ran on.
self.port = self.server.socket_info['socket'][1]
self._update_config_opt()
self.addCleanup(self.server.kill)
def _setup_SSL_if_requested(self):
# TODO(dstanek): fix environment.Server to take a SSLOpts instance
# so that the params are either always set or not
if (self.cert is not None and
self.ca is not None and
self.key is not None):
self.server.set_ssl(certfile=self.cert,
keyfile=self.key,
ca_certs=self.ca,
cert_required=self.cert_required)
def _update_config_opt(self):
"""Updates the config with the actual port used."""
opt_name = self._get_config_option_for_section_name()
CONF.set_override(opt_name, self.port)
def _get_config_option_for_section_name(self):
"""Maps Paster config section names to port option names."""
return {'admin': 'admin_port', 'main': 'public_port'}[self.name]

View File

@ -18,6 +18,7 @@
from keystone.common import environment
from keystone import config
from keystone import tests
from keystone.tests.fixtures import appserver
CONF = config.CONF
@ -32,17 +33,18 @@ class IPv6TestCase(tests.TestCase):
def test_ipv6_ok(self):
"""Make sure both public and admin API work with ipv6."""
self.public_server = self.serveapp('keystone', name='main',
host="::1", port=0)
self.admin_server = self.serveapp('keystone', name='admin',
host="::1", port=0)
paste_conf = self._paste_config('keystone')
# Verify Admin
conn = environment.httplib.HTTPConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.ADMIN, host="::1"):
conn = environment.httplib.HTTPConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = environment.httplib.HTTPConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.MAIN, host="::1"):
conn = environment.httplib.HTTPConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)

View File

@ -22,6 +22,7 @@ from keystone.openstack.common import jsonutils
from keystone.openstack.common import timeutils
from keystone import tests
from keystone.tests import default_fixtures
from keystone.tests.fixtures import appserver
CONF = config.CONF
@ -47,20 +48,16 @@ class CompatTestCase(tests.NoModule, tests.TestCase):
self.tenant_bar['id'],
self.role_admin['id'])
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
conf = self._paste_config('keystone')
fixture = self.useFixture(appserver.AppServer(conf, appserver.MAIN))
self.public_server = fixture.server
fixture = self.useFixture(appserver.AppServer(conf, appserver.ADMIN))
self.admin_server = fixture.server
revdir = tests.checkout_vendor(*self.get_checkout())
self.add_path(revdir)
self.clear_module('keystoneclient')
def tearDown(self):
self.public_server.kill()
self.admin_server.kill()
self.public_server = None
self.admin_server = None
super(CompatTestCase, self).tearDown()
def _public_url(self):
public_port = self.public_server.socket_info['socket'][1]
return "http://localhost:%s/v2.0" % public_port

View File

@ -21,6 +21,7 @@ import ssl
from keystone.common import environment
from keystone import config
from keystone import tests
from keystone.tests.fixtures import appserver
CONF = config.CONF
@ -40,66 +41,69 @@ class SSLTestCase(tests.TestCase):
def test_1way_ssl_ok(self):
"""Make sure both public and admin API work with 1-way SSL."""
self.public_server = self.serveapp('keystone', name='main',
cert=CERT, key=KEY, ca=CA)
self.admin_server = self.serveapp('keystone', name='admin',
cert=CERT, key=KEY, ca=CA)
paste_conf = self._paste_config('keystone')
ssl_kwargs = dict(cert=CERT, key=KEY, ca=CA)
# Verify Admin
conn = environment.httplib.HTTPSConnection('127.0.0.1',
CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.ADMIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = environment.httplib.HTTPSConnection('127.0.0.1',
CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.MAIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_2way_ssl_ok(self):
"""Make sure both public and admin API work with 2-way SSL.
Requires client certificate.
"""
self.public_server = self.serveapp(
'keystone', name='main', cert=CERT,
key=KEY, ca=CA, cert_required=True)
self.admin_server = self.serveapp(
'keystone', name='admin', cert=CERT,
key=KEY, ca=CA, cert_required=True)
paste_conf = self._paste_config('keystone')
ssl_kwargs = dict(cert=CERT, key=KEY, ca=CA, cert_required=True)
# Verify Admin
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.admin_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.ADMIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.admin_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.public_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.MAIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.public_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_1way_ssl_with_ipv6_ok(self):
"""Make sure both public and admin API work with 1-way ipv6 & SSL."""
self.skip_if_no_ipv6()
self.public_server = self.serveapp('keystone', name='main',
cert=CERT, key=KEY, ca=CA,
host="::1", port=0)
self.admin_server = self.serveapp('keystone', name='admin',
cert=CERT, key=KEY, ca=CA,
host="::1", port=0)
paste_conf = self._paste_config('keystone')
ssl_kwargs = dict(cert=CERT, key=KEY, ca=CA, host="::1")
# Verify Admin
conn = environment.httplib.HTTPSConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.ADMIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection('::1', CONF.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = environment.httplib.HTTPSConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.MAIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection('::1', CONF.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_2way_ssl_with_ipv6_ok(self):
"""Make sure both public and admin API work with 2-way ipv6 & SSL.
@ -107,48 +111,48 @@ class SSLTestCase(tests.TestCase):
Requires client certificate.
"""
self.skip_if_no_ipv6()
self.public_server = self.serveapp(
'keystone', name='main', cert=CERT,
key=KEY, ca=CA, cert_required=True,
host="::1", port=0)
self.admin_server = self.serveapp(
'keystone', name='admin', cert=CERT,
key=KEY, ca=CA, cert_required=True,
host="::1", port=0)
paste_conf = self._paste_config('keystone')
ssl_kwargs = dict(cert=CERT, key=KEY, ca=CA,
cert_required=True, host="::1")
# Verify Admin
conn = environment.httplib.HTTPSConnection(
'::1', CONF.admin_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.ADMIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'::1', CONF.admin_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
# Verify Public
conn = environment.httplib.HTTPSConnection(
'::1', CONF.public_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
with appserver.AppServer(paste_conf, appserver.MAIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'::1', CONF.public_port, CLIENT, CLIENT)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
def test_2way_ssl_fail(self):
"""Expect to fail when client does not present proper certificate."""
self.public_server = self.serveapp(
'keystone', name='main', cert=CERT,
key=KEY, ca=CA, cert_required=True)
self.admin_server = self.serveapp(
'keystone', name='admin', cert=CERT,
key=KEY, ca=CA, cert_required=True)
paste_conf = self._paste_config('keystone')
ssl_kwargs = dict(cert=CERT, key=KEY, ca=CA, cert_required=True)
# Verify Admin
conn = environment.httplib.HTTPSConnection('127.0.0.1',
CONF.admin_port)
try:
conn.request('GET', '/')
self.fail('Admin API shoulda failed with SSL handshake!')
except ssl.SSLError:
pass
with appserver.AppServer(paste_conf, appserver.ADMIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.admin_port)
try:
conn.request('GET', '/')
self.fail('Admin API shoulda failed with SSL handshake!')
except ssl.SSLError:
pass
# Verify Public
conn = environment.httplib.HTTPSConnection('127.0.0.1',
CONF.public_port)
try:
conn.request('GET', '/')
self.fail('Public API shoulda failed with SSL handshake!')
except ssl.SSLError:
pass
with appserver.AppServer(paste_conf, appserver.MAIN, **ssl_kwargs):
conn = environment.httplib.HTTPSConnection(
'127.0.0.1', CONF.public_port)
try:
conn.request('GET', '/')
self.fail('Public API shoulda failed with SSL handshake!')
except ssl.SSLError:
pass