Added CORS wildcard handling

The CORS specification permits the declaration of '*' as a response
wildcard domain, which explicitly allows _all_ domains to break
the single-origin policy. While we DO NOT recommend this method,
the ability to set a global policy should be included for the
sake of completeness.

Change-Id: Ifcc65ca74fa976dbd322a7ffd4ffba5443d1df5b
This commit is contained in:
Michael Krotscheck 2015-05-22 10:03:14 -07:00
parent 06c44a8710
commit cab38ce307
3 changed files with 258 additions and 57 deletions

View File

@ -6,6 +6,14 @@ This middleware provides a comprehensive, configurable implementation of the
CORS_ (Cross Origin Resource Sharing) specification as oslo-supported python
wsgi middleware.
.. note::
While this middleware supports the use of the `*` wildcard origin in the
specification, this feature is not recommended for security reasons. It
is provided to simplify basic use of CORS, practically meaning "I don't
care how this is used." In an intranet setting, this could lead to leakage
of data beyond the intranet and therefore should be avoided.
Quickstart
----------
First, include the middleware in your application::
@ -46,10 +54,10 @@ legibility, we recommend using a reasonable human-readable string::
# CORS Configuration for horizon, which uses global options.
allowed_origin=https://horizon.example.com:443
[cors.dashboard]
# CORS Configuration for a hypothetical dashboard, which only permits
# HTTP GET requests.
allowed_origin=https://dashboard.example.com:443
[cors.wildcard]
# CORS Configuration for the CORS specified domain wildcard, which only
# permits HTTP GET requests.
allowed_origin=*
allow_methods=GET

View File

@ -154,6 +154,9 @@ class CORS(base.Middleware):
# Is this origin registered? (Section 6.2.2)
origin = request.headers['Origin']
if origin not in self.allowed_origins:
if '*' in self.allowed_origins:
origin = '*'
else:
LOG.debug('CORS request from origin \'%s\' not permitted.'
% (origin,))
return

View File

@ -26,46 +26,6 @@ class CORSTestBase(test_base.BaseTestCase):
Sets up applications and helper methods.
"""
def setUp(self):
super(CORSTestBase, self).setUp()
@webob.dec.wsgify
def application(req):
return 'Hello, World!!!'
# Set up the config fixture.
config = self.useFixture(fixture.Config(cfg.CONF))
config.load_raw_values(group='cors',
allowed_origin='http://valid.example.com',
allow_credentials='False',
max_age='',
expose_headers='',
allow_methods='GET',
allow_headers='')
config.load_raw_values(group='cors.credentials',
allowed_origin='http://creds.example.com',
allow_credentials='True')
config.load_raw_values(group='cors.exposed-headers',
allowed_origin='http://headers.example.com',
expose_headers='X-Header-1,X-Header-2',
allow_headers='X-Header-1,X-Header-2')
config.load_raw_values(group='cors.cached',
allowed_origin='http://cached.example.com',
max_age='3600')
config.load_raw_values(group='cors.get-only',
allowed_origin='http://get.example.com',
allow_methods='GET')
config.load_raw_values(group='cors.all-methods',
allowed_origin='http://all.example.com',
allow_methods='GET,PUT,POST,DELETE,HEAD')
# Now that the config is set up, create our application.
self.application = cors.CORS(application, cfg.CONF)
def assertCORSResponse(self, response,
status='200 OK',
@ -128,6 +88,58 @@ class CORSTestBase(test_base.BaseTestCase):
else:
self.assertNotIn(header, response.headers)
class CORSRegularRequestTest(CORSTestBase):
"""CORS Specification Section 6.1
http://www.w3.org/TR/cors/#resource-requests
"""
# List of HTTP methods (other than OPTIONS) to test with.
methods = ['POST', 'PUT', 'DELETE', 'GET', 'TRACE', 'HEAD']
def setUp(self):
"""Setup the tests."""
super(CORSRegularRequestTest, self).setUp()
@webob.dec.wsgify
def application(req):
return 'Hello, World!!!'
# Set up the config fixture.
config = self.useFixture(fixture.Config(cfg.CONF))
config.load_raw_values(group='cors',
allowed_origin='http://valid.example.com',
allow_credentials='False',
max_age='',
expose_headers='',
allow_methods='GET',
allow_headers='')
config.load_raw_values(group='cors.credentials',
allowed_origin='http://creds.example.com',
allow_credentials='True')
config.load_raw_values(group='cors.exposed-headers',
allowed_origin='http://headers.example.com',
expose_headers='X-Header-1,X-Header-2',
allow_headers='X-Header-1,X-Header-2')
config.load_raw_values(group='cors.cached',
allowed_origin='http://cached.example.com',
max_age='3600')
config.load_raw_values(group='cors.get-only',
allowed_origin='http://get.example.com',
allow_methods='GET')
config.load_raw_values(group='cors.all-methods',
allowed_origin='http://all.example.com',
allow_methods='GET,PUT,POST,DELETE,HEAD')
# Now that the config is set up, create our application.
self.application = cors.CORS(application, cfg.CONF)
def test_config_overrides(self):
"""Assert that the configuration options are properly registered."""
@ -186,16 +198,6 @@ class CORSTestBase(test_base.BaseTestCase):
['GET', 'PUT', 'POST', 'DELETE', 'HEAD'])
self.assertEqual(ac.allow_headers, gc.allow_headers)
class CORSRegularRequestTest(CORSTestBase):
"""CORS Specification Section 6.1
http://www.w3.org/TR/cors/#resource-requests
"""
# List of HTTP methods (other than OPTIONS) to test with.
methods = ['POST', 'PUT', 'DELETE', 'GET', 'TRACE', 'HEAD']
def test_no_origin_header(self):
"""CORS Specification Section 6.1.1
@ -339,6 +341,105 @@ class CORSPreflightRequestTest(CORSTestBase):
http://www.w3.org/TR/cors/#resource-preflight-requests
"""
def setUp(self):
super(CORSPreflightRequestTest, self).setUp()
@webob.dec.wsgify
def application(req):
return 'Hello, World!!!'
# Set up the config fixture.
config = self.useFixture(fixture.Config(cfg.CONF))
config.load_raw_values(group='cors',
allowed_origin='http://valid.example.com',
allow_credentials='False',
max_age='',
expose_headers='',
allow_methods='GET',
allow_headers='')
config.load_raw_values(group='cors.credentials',
allowed_origin='http://creds.example.com',
allow_credentials='True')
config.load_raw_values(group='cors.exposed-headers',
allowed_origin='http://headers.example.com',
expose_headers='X-Header-1,X-Header-2',
allow_headers='X-Header-1,X-Header-2')
config.load_raw_values(group='cors.cached',
allowed_origin='http://cached.example.com',
max_age='3600')
config.load_raw_values(group='cors.get-only',
allowed_origin='http://get.example.com',
allow_methods='GET')
config.load_raw_values(group='cors.all-methods',
allowed_origin='http://all.example.com',
allow_methods='GET,PUT,POST,DELETE,HEAD')
# Now that the config is set up, create our application.
self.application = cors.CORS(application, cfg.CONF)
def test_config_overrides(self):
"""Assert that the configuration options are properly registered."""
# Confirm global configuration
gc = cfg.CONF.cors
self.assertEqual(gc.allowed_origin, 'http://valid.example.com')
self.assertEqual(gc.allow_credentials, False)
self.assertEqual(gc.expose_headers, [])
self.assertEqual(gc.max_age, None)
self.assertEqual(gc.allow_methods, ['GET'])
self.assertEqual(gc.allow_headers, [])
# Confirm credentials overrides.
cc = cfg.CONF['cors.credentials']
self.assertEqual(cc.allowed_origin, 'http://creds.example.com')
self.assertEqual(cc.allow_credentials, True)
self.assertEqual(cc.expose_headers, gc.expose_headers)
self.assertEqual(cc.max_age, gc.max_age)
self.assertEqual(cc.allow_methods, gc.allow_methods)
self.assertEqual(cc.allow_headers, gc.allow_headers)
# Confirm exposed-headers overrides.
ec = cfg.CONF['cors.exposed-headers']
self.assertEqual(ec.allowed_origin, 'http://headers.example.com')
self.assertEqual(ec.allow_credentials, gc.allow_credentials)
self.assertEqual(ec.expose_headers, ['X-Header-1', 'X-Header-2'])
self.assertEqual(ec.max_age, gc.max_age)
self.assertEqual(ec.allow_methods, gc.allow_methods)
self.assertEqual(ec.allow_headers, ['X-Header-1', 'X-Header-2'])
# Confirm cached overrides.
chc = cfg.CONF['cors.cached']
self.assertEqual(chc.allowed_origin, 'http://cached.example.com')
self.assertEqual(chc.allow_credentials, gc.allow_credentials)
self.assertEqual(chc.expose_headers, gc.expose_headers)
self.assertEqual(chc.max_age, 3600)
self.assertEqual(chc.allow_methods, gc.allow_methods)
self.assertEqual(chc.allow_headers, gc.allow_headers)
# Confirm get-only overrides.
goc = cfg.CONF['cors.get-only']
self.assertEqual(goc.allowed_origin, 'http://get.example.com')
self.assertEqual(goc.allow_credentials, gc.allow_credentials)
self.assertEqual(goc.expose_headers, gc.expose_headers)
self.assertEqual(goc.max_age, gc.max_age)
self.assertEqual(goc.allow_methods, ['GET'])
self.assertEqual(goc.allow_headers, gc.allow_headers)
# Confirm all-methods overrides.
ac = cfg.CONF['cors.all-methods']
self.assertEqual(ac.allowed_origin, 'http://all.example.com')
self.assertEqual(ac.allow_credentials, gc.allow_credentials)
self.assertEqual(ac.expose_headers, gc.expose_headers)
self.assertEqual(ac.max_age, gc.max_age)
self.assertEqual(ac.allow_methods,
['GET', 'PUT', 'POST', 'DELETE', 'HEAD'])
self.assertEqual(ac.allow_headers, gc.allow_headers)
def test_no_origin_header(self):
"""CORS Specification Section 6.2.1
@ -699,3 +800,92 @@ class CORSPreflightRequestTest(CORSTestBase):
allow_headers=requested_headers,
allow_credentials=None,
expose_headers=None)
class CORSTestWildcard(CORSTestBase):
"""Test the CORS wildcard specification."""
def setUp(self):
super(CORSTestWildcard, self).setUp()
@webob.dec.wsgify
def application(req):
return 'Hello, World!!!'
# Set up the config fixture.
config = self.useFixture(fixture.Config(cfg.CONF))
config.load_raw_values(group='cors',
allowed_origin='http://default.example.com',
allow_credentials='True',
max_age='',
expose_headers='',
allow_methods='GET,PUT,POST,DELETE,HEAD',
allow_headers='')
config.load_raw_values(group='cors.wildcard',
allowed_origin='*',
allow_methods='GET')
# Now that the config is set up, create our application.
self.application = cors.CORS(application, cfg.CONF)
def test_config_overrides(self):
"""Assert that the configuration options are properly registered."""
# Confirm global configuration
gc = cfg.CONF.cors
self.assertEqual(gc.allowed_origin, 'http://default.example.com')
self.assertEqual(gc.allow_credentials, True)
self.assertEqual(gc.expose_headers, [])
self.assertEqual(gc.max_age, None)
self.assertEqual(gc.allow_methods, ['GET', 'PUT', 'POST', 'DELETE',
'HEAD'])
self.assertEqual(gc.allow_headers, [])
# Confirm all-methods overrides.
ac = cfg.CONF['cors.wildcard']
self.assertEqual(ac.allowed_origin, '*')
self.assertEqual(gc.allow_credentials, True)
self.assertEqual(ac.expose_headers, gc.expose_headers)
self.assertEqual(ac.max_age, gc.max_age)
self.assertEqual(ac.allow_methods, ['GET'])
self.assertEqual(ac.allow_headers, gc.allow_headers)
def test_wildcard_domain(self):
"""CORS Specification, Wildcards
If the configuration file specifies CORS settings for the wildcard '*'
domain, it should return those for all origin domains except for the
overrides.
"""
# Test valid domain
request = webob.Request({})
request.method = "OPTIONS"
request.headers['Origin'] = 'http://default.example.com'
request.headers['Access-Control-Request-Method'] = 'GET'
response = request.get_response(self.application)
self.assertCORSResponse(response,
status='200 OK',
allow_origin='http://default.example.com',
max_age=None,
allow_methods='GET',
allow_headers='',
allow_credentials='true',
expose_headers=None)
# Test invalid domain
request = webob.Request({})
request.method = "OPTIONS"
request.headers['Origin'] = 'http://invalid.example.com'
request.headers['Access-Control-Request-Method'] = 'GET'
response = request.get_response(self.application)
self.assertCORSResponse(response,
status='200 OK',
allow_origin='*',
max_age=None,
allow_methods='GET',
allow_headers='',
allow_credentials='true',
expose_headers=None)