Cleanup Blacklist code, Protect from sub-domain hijacking. Upgrade to pyflakes 0.6.1

Change-Id: I7da11d5b7fe0dbf7b1472777034dbb83fb9225d1
This commit is contained in:
Kiall Mac Innes 2013-01-30 18:20:58 +00:00
parent 3ba2cce5cd
commit 4b76292490
8 changed files with 278 additions and 19 deletions
moniker
central
context.py
storage/impl_sqlalchemy
tests
test_central
test_storage
tox.ini

@ -145,16 +145,36 @@ class Service(rpc_service.Service):
LOG.debug('Found handler for: %s' % event_type)
handler.process_notification(event_type, payload)
def _check_domain_name_blacklist(self, context, domain_name):
def _is_blacklisted_domain_name(self, context, domain_name):
"""
Ensures the provided domain_name is not blacklisted.
"""
blacklists = cfg.CONF['service:central'].domain_name_blacklist
for blacklist in blacklists:
if bool(re.search(blacklist, domain_name)):
policy.check('use_blacklisted_domain', context)
return blacklist
return False
def _is_subdomain(self, context, domain_name):
# Break the name up into it's component labels
labels = domain_name.split(".")
i = 1
# Starting with label #2, search for matching domain's in the database
while (i < len(labels)):
name = '.'.join(labels[i:])
try:
domain = self.storage_conn.find_domain(context, {'name': name})
except exceptions.DomainNotFound:
i += 1
else:
return domain
return False
# Server Methods
def create_server(self, context, values):
@ -206,7 +226,20 @@ class Service(rpc_service.Service):
policy.check('create_domain', context, target)
# Ensure the domain is not blacklisted
self._check_domain_name_blacklist(context, values['name'])
if self._is_blacklisted_domain_name(context, values['name']):
# Raises an exception if the policy check is denied
policy.check('use_blacklisted_domain', context)
# Handle sub-domains appropriately
parent_domain = self._is_subdomain(context, values['name'])
if parent_domain:
if parent_domain['tenant_id'] == values['tenant_id']:
# Record the Parent Domain ID
values['parent_domain_id'] = parent_domain['id']
else:
raise exceptions.Forbidden('Unable to create subdomain in '
'another tenants domain')
# NOTE(kiall): Fetch the servers before creating the domain, this way
# we can prevent domain creation if no servers are
@ -264,8 +297,10 @@ class Service(rpc_service.Service):
policy.check('create_domain', context, target)
if 'name' in values:
# Ensure the domain does not end with a reserved suffix.
self._check_domain_name_blacklist(context, values['name'])
# Ensure the domain is not blacklisted
if self._is_blacklisted_domain_name(context, values['name']):
# Raises an exception if the policy check is denied
policy.check('use_blacklisted_domain', context)
domain = self.storage_conn.update_domain(context, domain_id, values)
servers = self.storage_conn.get_servers(context)

@ -34,9 +34,7 @@ class MonikerContext(context.RequestContext):
show_deleted=show_deleted,
request_id=request_id)
self.user_id = user
self.tenant_id = tenant
self.effective_tenant_id = self.tenant_id
self._effective_tenant_id = None
self.roles = roles
def sudo(self, tenant_id):
@ -66,6 +64,33 @@ class MonikerContext(context.RequestContext):
return d
@property
def user_id(self):
return self.user
@user_id.setter
def user_id(self, value):
self.user = value
@property
def tenant_id(self):
return self.tenant
@tenant_id.setter
def tenant_id(self, value):
self.tenant = value
@property
def effective_tenant_id(self):
if self._effective_tenant_id:
return self._effective_tenant_id
else:
return self.tenant
@effective_tenant_id.setter
def effective_tenant_id(self, value):
self._effective_tenant_id = value
@classmethod
def get_admin_context(cls):
return cls(None, tenant=None, is_admin=True, roles=['admin'])

@ -155,6 +155,16 @@ class Connection(base.Connection):
return dict(domain)
def find_domain(self, context, criterion):
query = self.session.query(models.Domain)
try:
domain = query.filter_by(**criterion).one()
except (exc.NoResultFound, exc.MultipleResultsFound):
raise exceptions.DomainNotFound()
else:
return dict(domain)
def update_domain(self, context, domain_id, values):
domain = self._get_domain(context, domain_id)
@ -210,6 +220,18 @@ class Connection(base.Connection):
return dict(record)
def find_record(self, context, domain_id, criterion):
domain = self._get_domain(context, domain_id)
query = domain.records
try:
record = query.filter_by(**criterion).one()
except (exc.NoResultFound, exc.MultipleResultsFound):
raise exceptions.RecordNotFound()
else:
return dict(record)
def update_record(self, context, record_id, values):
record = self._get_record(context, record_id)

@ -0,0 +1,45 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import MetaData, Table, Column, ForeignKey
from moniker.sqlalchemy.types import UUID
meta = MetaData()
def upgrade(migrate_engine):
meta.bind = migrate_engine
domains_table = Table('domains', meta, autoload=True)
parent_domain_id = Column('parent_domain_id',
UUID,
ForeignKey('domains.id'),
default=None,
nullable=True)
parent_domain_id.create(domains_table, populate_default=True)
def downgrade(migrate_engine):
meta.bind = migrate_engine
domains_table = Table('domains', meta, autoload=True)
parent_domain_id = Column('parent_domain_id',
UUID,
ForeignKey('domains.id'),
default=None,
nullable=True)
parent_domain_id.drop(domains_table)

@ -57,6 +57,7 @@ class Domain(Base):
__tablename__ = 'domains'
tenant_id = Column(String(36), default=None, nullable=True)
name = Column(String(255), nullable=False, unique=True)
email = Column(String(36), nullable=False)
@ -70,6 +71,9 @@ class Domain(Base):
lazy='dynamic', cascade="all, delete-orphan",
passive_deletes=True)
parent_domain_id = Column(UUID, ForeignKey('domains.id'), default=None,
nullable=True)
@hybrid_property
def serial(self):
# TODO: Terrible terrible hack.. Cleanup ;)

@ -34,7 +34,7 @@ class CentralServiceTest(CentralTestCase):
self.central_service.start()
self.central_service.stop()
def test_check_domain_name_blacklist(self):
def test_is_blacklisted_domain_name(self):
self.config(domain_name_blacklist=['^example.org.$', 'net.$'],
group='service:central')
@ -43,18 +43,41 @@ class CentralServiceTest(CentralTestCase):
context = self.get_context()
self.central_service._check_domain_name_blacklist(context, 'org.')
result = self.central_service._is_blacklisted_domain_name(
context, 'org.')
self.assertFalse(result)
self.central_service._check_domain_name_blacklist(
result = self.central_service._is_blacklisted_domain_name(
context, 'www.example.org.')
self.assertFalse(result)
with self.assertRaises(exceptions.Forbidden):
self.central_service._check_domain_name_blacklist(
context, 'example.org.')
result = self.central_service._is_blacklisted_domain_name(
context, 'example.org.')
self.assertTrue(result)
with self.assertRaises(exceptions.Forbidden):
self.central_service._check_domain_name_blacklist(
context, 'example.net.')
result = self.central_service._is_blacklisted_domain_name(
context, 'example.net.')
self.assertTrue(result)
def test_is_subdomain(self):
context = self.get_context()
# Create a domain (using the specified domain name)
self.create_domain(name='example.org.')
result = self.central_service._is_subdomain(context, 'org.')
self.assertFalse(result)
result = self.central_service._is_subdomain(context,
'www.example.net.')
self.assertFalse(result)
result = self.central_service._is_subdomain(context, 'example.org.')
self.assertFalse(result)
result = self.central_service._is_subdomain(context,
'www.example.org.')
self.assertTrue(result)
# Server Tests
def test_create_server(self):
@ -166,6 +189,48 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(domain['name'], values['name'])
self.assertEqual(domain['email'], values['email'])
def test_create_subdomain(self):
context = self.get_admin_context()
# Explicitly set a tenant_id
context.tenant_id = '1'
# Create the Parent Domain using fixture 0
parent_domain = self.create_domain(fixture=0, context=context)
# Prepare values for the subdomain using fixture 1 as a base
values = self.get_domain_fixture(1)
values['name'] = 'www.%s' % parent_domain['name']
# Create the subdomain
domain = self.central_service.create_domain(context, values=values)
# Ensure all values have been set correctly
self.assertIsNotNone(domain['id'])
self.assertEqual(domain['parent_domain_id'], parent_domain['id'])
def test_create_subdomain_failure(self):
context = self.get_admin_context()
# Explicitly set a tenant_id
context.tenant_id = '1'
# Create the Parent Domain using fixture 0
parent_domain = self.create_domain(fixture=0, context=context)
context = self.get_admin_context()
# Explicitly use a different tenant_id
context.tenant_id = '2'
# Prepare values for the subdomain using fixture 1 as a base
values = self.get_domain_fixture(1)
values['name'] = 'www.%s' % parent_domain['name']
# Attempt to create the subdomain
with self.assertRaises(exceptions.Forbidden):
self.central_service.create_domain(context, values=values)
def test_create_blacklisted_domain_success(self):
self.config(domain_name_blacklist=['^blacklisted.com.$'],
group='service:central')

@ -314,6 +314,38 @@ class StorageDriverTestCase(StorageTestCase):
uuid = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
self.storage_conn.get_domain(self.admin_context, uuid)
def test_find_domain_criterion(self):
domain_one = self.create_domain(0)[1]
domain_two = self.create_domain(1)[1]
criterion = dict(
name=domain_one['name']
)
result = self.storage_conn.find_domain(self.admin_context, criterion)
self.assertEqual(result['name'], domain_one['name'])
self.assertEqual(result['email'], domain_one['email'])
criterion = dict(
name=domain_two['name']
)
result = self.storage_conn.find_domain(self.admin_context, criterion)
self.assertEqual(result['name'], domain_two['name'])
self.assertEqual(result['email'], domain_two['email'])
def test_find_domain_criterion_missing(self):
expected = self.create_domain(0)[1]
criterion = dict(
name=expected['name'] + "NOT FOUND"
)
with self.assertRaises(exceptions.DomainNotFound):
self.storage_conn.find_domain(self.admin_context, criterion)
def test_update_domain(self):
# Create a domain
fixture, domain = self.create_domain()
@ -444,6 +476,37 @@ class StorageDriverTestCase(StorageTestCase):
uuid = 'caf771fc-6b05-4891-bee1-c2a48621f57b'
self.storage_conn.get_record(self.admin_context, uuid)
def test_find_record_criterion(self):
domain = self.create_domain(0)[1]
expected = self.create_record(domain)[1]
criterion = dict(
name=expected['name']
)
actual = self.storage_conn.find_record(self.admin_context,
domain['id'],
criterion)
self.assertEqual(actual['name'], expected['name'])
self.assertEqual(actual['type'], expected['type'])
self.assertEqual(actual['data'], expected['data'])
def test_find_record_criterion_missing(self):
domain = self.create_domain(0)[1]
expected = self.create_record(domain)[1]
criterion = dict(
name=expected['name'] + "NOT FOUND"
)
with self.assertRaises(exceptions.RecordNotFound):
self.storage_conn.find_record(self.admin_context,
domain['id'],
criterion)
def test_update_record(self):
domain_fixture, domain = self.create_domain()

@ -33,7 +33,7 @@ commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack m
[testenv:pyflakes]
deps = {[testenv]deps}
pyflakes==0.5.0
pyflakes==0.6.1
commands = pyflakes moniker bin setup.py
[testenv:venv]