middleware that allows for cname lookups on unknown hostnames

This commit is contained in:
John Dickinson 2010-11-05 19:52:33 +00:00 committed by Tarmac
commit 15413a4344
4 changed files with 290 additions and 0 deletions

View File

@ -85,3 +85,8 @@ use = egg:swift#domain_remap
[filter:catch_errors]
use = egg:swift#catch_errors
[filter:cname_lookup]
# Note: this middleware requires python-dnspython
use = egg:swift#cname_lookup
# storage_domain = example.com
# lookup_depth = 1

View File

@ -93,6 +93,7 @@ setup(
'healthcheck=swift.common.middleware.healthcheck:filter_factory',
'memcache=swift.common.middleware.memcache:filter_factory',
'ratelimit=swift.common.middleware.ratelimit:filter_factory',
'cname_lookup=swift.common.middleware.cname_lookup:filter_factory',
'catch_errors=swift.common.middleware.catch_errors:filter_factory',
'domain_remap=swift.common.middleware.domain_remap:filter_factory',
],

View File

@ -0,0 +1,120 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from webob import Request
from webob.exc import HTTPBadRequest
import dns.resolver
from dns.exception import DNSException
from swift.common.utils import cache_from_env, get_logger
def lookup_cname(domain): # pragma: no cover
"""
Given a domain, returns it's DNS CNAME mapping and DNS ttl.
:param domain: domain to query on
:returns: (ttl, result)
"""
try:
answer = dns.resolver.query(domain, 'CNAME').rrset
ttl = answer.ttl
result = answer.items[0].to_text()
result = result.rstrip('.')
return ttl, result
except DNSException:
return 0, None
class CNAMELookupMiddleware(object):
"""
Middleware that translates a unknown domain in the host header to
something that ends with the configured storage_domain by looking up
the given domain's CNAME record in DNS.
"""
def __init__(self, app, conf):
self.app = app
self.storage_domain = conf.get('storage_domain', 'example.com')
if self.storage_domain and self.storage_domain[0] != '.':
self.storage_domain = '.' + self.storage_domain
self.lookup_depth = int(conf.get('lookup_depth', '1'))
self.memcache = None
self.logger = get_logger(conf)
def __call__(self, env, start_response):
if not self.storage_domain:
return self.app(env, start_response)
given_domain = env['HTTP_HOST']
port = ''
if ':' in given_domain:
given_domain, port = given_domain.rsplit(':', 1)
if given_domain == self.storage_domain[1:]: # strip initial '.'
return self.app(env, start_response)
a_domain = given_domain
if not a_domain.endswith(self.storage_domain):
if self.memcache is None:
self.memcache = cache_from_env(env)
error = True
for tries in xrange(self.lookup_depth):
found_domain = None
if self.memcache:
memcache_key = ''.join(['cname-', a_domain])
found_domain = self.memcache.get(memcache_key)
if not found_domain:
ttl, found_domain = lookup_cname(a_domain)
if self.memcache:
memcache_key = ''.join(['cname-', given_domain])
self.memcache.set(memcache_key, found_domain,
timeout=ttl)
if found_domain is None or found_domain == a_domain:
# no CNAME records or we're at the last lookup
error = True
found_domain = None
break
elif found_domain.endswith(self.storage_domain):
# Found it!
self.logger.info('Mapped %s to %s' % (given_domain,
found_domain))
if port:
env['HTTP_HOST'] = ':'.join([found_domain, port])
else:
env['HTTP_HOST'] = found_domain
error = False
break
else:
# try one more deep in the chain
self.logger.debug('Following CNAME chain for %s to %s' %
(given_domain, found_domain))
a_domain = found_domain
if error:
if found_domain:
msg = 'CNAME lookup failed after %d tries' % \
self.lookup_depth
else:
msg = 'CNAME lookup failed to resolve to a valid domain'
resp = HTTPBadRequest(request=Request(env), body=msg,
content_type='text/plain')
return resp(env, start_response)
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf): # pragma: no cover
conf = global_conf.copy()
conf.update(local_conf)
def cname_filter(app):
return CNAMELookupMiddleware(app, conf)
return cname_filter

View File

@ -0,0 +1,164 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from nose import SkipTest
from webob import Request
try:
# this test requires the dnspython package to be installed
from swift.common.middleware import cname_lookup
skip = False
except ImportError:
skip = True
class FakeApp(object):
def __call__(self, env, start_response):
return "FAKE APP"
def start_response(*args):
pass
class TestCNAMELookup(unittest.TestCase):
def setUp(self):
if skip:
raise SkipTest
self.app = cname_lookup.CNAMELookupMiddleware(FakeApp(),
{'lookup_depth': 2})
def test_passthrough(self):
def my_lookup(d):
return 0, d
cname_lookup.lookup_cname = my_lookup
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'foo.example.com'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'foo.example.com:8080'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
def test_good_lookup(self):
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'mysite.com'})
def my_lookup(d):
return 0, '%s.example.com' % d
cname_lookup.lookup_cname = my_lookup
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'mysite.com:8080'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
def test_lookup_chain_too_long(self):
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'mysite.com'})
def my_lookup(d):
if d == 'mysite.com':
site = 'level1.foo.com'
elif d == 'level1.foo.com':
site = 'level2.foo.com'
elif d == 'level2.foo.com':
site = 'bar.example.com'
return 0, site
cname_lookup.lookup_cname = my_lookup
resp = self.app(req.environ, start_response)
self.assertEquals(resp, ['CNAME lookup failed after 2 tries'])
def test_lookup_chain_bad_target(self):
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'mysite.com'})
def my_lookup(d):
return 0, 'some.invalid.site.com'
cname_lookup.lookup_cname = my_lookup
resp = self.app(req.environ, start_response)
self.assertEquals(resp,
['CNAME lookup failed to resolve to a valid domain'])
def test_something_weird(self):
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'mysite.com'})
def my_lookup(d):
return 0, None
cname_lookup.lookup_cname = my_lookup
resp = self.app(req.environ, start_response)
self.assertEquals(resp,
['CNAME lookup failed to resolve to a valid domain'])
def test_with_memcache(self):
def my_lookup(d):
return 0, '%s.example.com' % d
cname_lookup.lookup_cname = my_lookup
class memcache_stub(object):
def __init__(self):
self.cache = {}
def get(self, key):
return self.cache.get(key, None)
def set(self, key, value, *a, **kw):
self.cache[key] = value
memcache = memcache_stub()
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET',
'swift.cache': memcache},
headers={'Host': 'mysite.com'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET',
'swift.cache': memcache},
headers={'Host': 'mysite.com'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')
def test_cname_matching_ending_not_domain(self):
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'foo.com'})
def my_lookup(d):
return 0, 'c.aexample.com'
cname_lookup.lookup_cname = my_lookup
resp = self.app(req.environ, start_response)
self.assertEquals(resp,
['CNAME lookup failed to resolve to a valid domain'])
def test_cname_configured_with_empty_storage_domain(self):
app = cname_lookup.CNAMELookupMiddleware(FakeApp(),
{'storage_domain': '',
'lookup_depth': 2})
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'c.a.example.com'})
def my_lookup(d):
return 0, None
cname_lookup.lookup_cname = my_lookup
resp = app(req.environ, start_response)
self.assertEquals(resp, 'FAKE APP')