Create API calls to Manage Blacklisted Domains

Add all API calls to manage blacklisted domains via a database.
Also, change the code to use the blacklisted domains from
the database and not from the config file.

bp/blacklist

Change-Id: I0fa08652cc0f15ab57708cd28413c43200e6d802
This commit is contained in:
betsy luzader 2013-12-19 09:40:35 -06:00
parent a2934600e8
commit e60a585457
19 changed files with 799 additions and 23 deletions

View File

@ -0,0 +1,134 @@
# Copyright 2014 Rackspace
#
# Author: Betsy Luzader <betsy.luzader@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from designate.central import rpcapi as central_rpcapi
from designate.openstack.common import log as logging
from designate import schema
from designate import utils
from designate.api.v2.controllers import rest
from designate.api.v2.views import blacklists as blacklists_view
LOG = logging.getLogger(__name__)
central_api = central_rpcapi.CentralAPI()
class BlacklistsController(rest.RestController):
_view = blacklists_view.BlacklistsView()
_resource_schema = schema.Schema('v2', 'blacklist')
_collection_schema = schema.Schema('v2', 'blacklists')
@pecan.expose(template='json:', content_type='application/json')
def get_one(self, blacklist_id):
""" Get Blacklist """
request = pecan.request
context = request.environ['context']
blacklist = central_api.get_blacklist(context, blacklist_id)
return self._view.show(context, request, blacklist)
@pecan.expose(template='json:', content_type='application/json')
def get_all(self, **params):
""" List all Blacklisted Zones """
request = pecan.request
context = request.environ['context']
# Extract the pagination params
#marker = params.pop('marker', None)
#limit = int(params.pop('limit', 30))
# Extract any filter params
accepted_filters = ('pattern')
criterion = dict((k, params[k]) for k in accepted_filters
if k in params)
blacklist = central_api.find_blacklists(context, criterion)
return self._view.list(context, request, blacklist)
@pecan.expose(template='json:', content_type='application/json')
def post_all(self):
""" Create Blacklisted Zone """
request = pecan.request
response = pecan.response
context = request.environ['context']
body = request.body_dict
# Validate the request conforms to the schema
self._resource_schema.validate(body)
# Convert from APIv2 -> Central format
values = self._view.load(context, request, body)
# Create the blacklist
blacklist = central_api.create_blacklist(context, values)
response.status_int = 201
response.headers['Location'] = self._view._get_resource_href(
request, blacklist)
# Prepare and return the response body
return self._view.show(context, request, blacklist)
@pecan.expose(template='json:', content_type='application/json')
@pecan.expose(template='json:', content_type='application/json-patch+json')
def patch_one(self, blacklist_id):
""" Update Blacklisted Zone """
request = pecan.request
context = request.environ['context']
body = request.body_dict
response = pecan.response
# Fetch the existing blacklisted zone
blacklist = central_api.get_blacklist(context, blacklist_id)
# Convert to APIv2 Format
blacklist = self._view.show(context, request, blacklist)
if request.content_type == 'application/json-patch+json':
raise NotImplemented('json-patch not implemented')
else:
blacklist = utils.deep_dict_merge(blacklist, body)
# Validate the request conforms to the schema
self._resource_schema.validate(blacklist)
values = self._view.load(context, request, body)
blacklist = central_api.update_blacklist(context,
blacklist_id, values)
response.status_int = 200
return self._view.show(context, request, blacklist)
@pecan.expose(template=None, content_type='application/json')
def delete_one(self, blacklist_id):
""" Delete Blacklisted Zone """
request = pecan.request
response = pecan.response
context = request.environ['context']
central_api.delete_blacklist(context, blacklist_id)
response.status_int = 204
# NOTE: This is a hack and a half.. But Pecan needs it.
return ''

View File

@ -19,6 +19,7 @@ from designate.api.v2.controllers import reverse
from designate.api.v2.controllers import schemas
from designate.api.v2.controllers import tlds
from designate.api.v2.controllers import zones
from designate.api.v2.controllers import blacklists
LOG = logging.getLogger(__name__)
@ -33,3 +34,4 @@ class RootController(object):
reverse = reverse.ReverseController()
tlds = tlds.TldsController()
zones = zones.ZonesController()
blacklists = blacklists.BlacklistsController()

View File

@ -0,0 +1,51 @@
# Copyright 2014 Rackspace
#
# Author: Betsy Luzader <betsy.luzader@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.api.v2.views import base as base_view
from designate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class BlacklistsView(base_view.BaseView):
""" Model a Blacklist API response as a python dictionary """
_resource_name = 'blacklist'
_collection_name = 'blacklists'
def show_basic(self, context, request, blacklist):
""" Detailed view of a blacklisted zone """
return {
"id": blacklist['id'],
"pattern": blacklist['pattern'],
"description": blacklist['description'],
"created_at": blacklist['created_at'],
"updated_at": blacklist['updated_at'],
"links": self._get_resource_links(request, blacklist)
}
def load(self, context, request, body):
""" Extract a "central" compatible dict from an API call """
result = {}
item = body[self._resource_name]
# Copy keys which need no alterations
for k in ('id', 'pattern', 'description',):
if k in item:
result[k] = item[k]
return result

View File

@ -28,10 +28,6 @@ cfg.CONF.register_opts([
help='The storage driver to use'),
cfg.ListOpt('enabled-notification-handlers', default=[],
help='Enabled Notification Handlers'),
cfg.ListOpt('domain-name-blacklist',
default=['\\.arpa\\.$', '\\.novalocal\\.$', '\\.localhost\\.$',
'\\.localdomain\\.$', '\\.local\\.$'],
help='DNS domain name blacklist'),
cfg.IntOpt('max_domain_name_len', default=255,
help="Maximum domain name length"),
cfg.IntOpt('max_recordset_name_len', default=255,

View File

@ -35,7 +35,7 @@ class CentralAPI(rpc_proxy.RpcProxy):
3.0 - RecordSet Changes
3.1 - Add floating ip ptr methods
3.2 - TLD Api changes
3.3 - Add methods for blacklisted domains
"""
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.central_topic
@ -400,3 +400,41 @@ class CentralAPI(rpc_proxy.RpcProxy):
msg = self.make_msg('update_floatingip', region=region,
floatingip_id=floatingip_id, values=values)
return self.call(context, msg)
# Blacklisted Domain Methods
def create_blacklist(self, context, values):
LOG.info("create_blacklist: Calling central's create_blacklist")
msg = self.make_msg('create_blacklist', values=values)
return self.call(context, msg, version='3.3')
def get_blacklist(self, context, blacklist_id):
LOG.info("get_blacklist: Calling central's get_blacklist.")
msg = self.make_msg('get_blacklist', blacklist_id=blacklist_id)
return self.call(context, msg, version='3.3')
def find_blacklists(self, context, criterion=None):
LOG.info("find_blacklists: Calling central's find_blacklists.")
msg = self.make_msg('find_blacklists', criterion=criterion)
return self.call(context, msg, version='3.3')
def find_blacklist(self, context, criterion):
LOG.info("find_blacklist: Calling central's find_blacklist.")
msg = self.make_msg('find_blacklist', criterion=criterion)
return self.call(context, msg, version='3.3')
def update_blacklist(self, context, blacklist_id, values):
LOG.info("update_blacklist: Calling central's update_blacklist.")
msg = self.make_msg('update_blacklist', blacklist_id=blacklist_id,
values=values)
return self.call(context, msg, version='3.3')
def delete_blacklist(self, context, blacklist_id):
LOG.info("delete_blacklist: Calling central's delete blacklist.")
msg = self.make_msg('delete_blacklist', blacklist_id=blacklist_id)
return self.call(context, msg, version='3.3')

View File

@ -45,7 +45,7 @@ def wrap_backend_call():
class Service(rpc_service.Service):
RPC_API_VERSION = '3.2'
RPC_API_VERSION = '3.3'
def __init__(self, *args, **kwargs):
backend_driver = cfg.CONF['service:central'].backend_driver
@ -203,11 +203,12 @@ class Service(rpc_service.Service):
"""
Ensures the provided domain_name is not blacklisted.
"""
blacklists = cfg.CONF['service:central'].domain_name_blacklist
blacklists = self.storage_api.find_blacklists(context)
for blacklist in blacklists:
if bool(re.search(blacklist, domain_name)):
return blacklist
if bool(re.search(blacklist["pattern"], domain_name)):
return True
return False
@ -1328,3 +1329,56 @@ class Service(rpc_service.Service):
elif isinstance(values['ptrdname'], basestring):
return self._set_floatingip_reverse(
context, region, floatingip_id, values)
# Blacklisted Domains
def create_blacklist(self, context, values):
policy.check('create_blacklist', context)
with self.storage_api.create_blacklist(context, values) as blacklist:
pass # NOTE: No other systems need updating
self.notifier.info(context, 'dns.blacklist.create', blacklist)
return blacklist
def get_blacklist(self, context, blacklist_id):
policy.check('get_blacklist', context)
blacklist = self.storage_api.get_blacklist(context, blacklist_id)
return blacklist
def find_blacklists(self, context, criterion=None):
policy.check('find_blacklists', context)
blacklists = self.storage_api.find_blacklists(context, criterion)
return blacklists
def find_blacklist(self, context, criterion):
policy.check('find_blacklist', context)
blacklist = self.storage_api.find_blacklist(context, criterion)
return blacklist
def update_blacklist(self, context, blacklist_id, values):
policy.check('update_blacklist', context)
with self.storage_api.update_blacklist(context,
blacklist_id,
values) as blacklist:
pass # NOTE: No other systems need updating
self.notifier.info(context, 'dns.blacklist.update', blacklist)
return blacklist
def delete_blacklist(self, context, blacklist_id):
policy.check('delete_blacklist', context)
with self.storage_api.delete_blacklist(context,
blacklist_id) as blacklist:
pass # NOTE: No other systems need updating
self.notifier.info(context, 'dns.blacklist.delete', blacklist)

View File

@ -160,6 +160,10 @@ class DuplicateRecord(Duplicate):
error_type = 'duplicate_record'
class DuplicateBlacklist(Duplicate):
error_type = 'duplicate_blacklist'
class NotFound(Base):
error_code = 404
error_type = 'not_found'
@ -177,6 +181,10 @@ class TsigKeyNotFound(NotFound):
error_type = 'tsigkey_not_found'
class BlacklistNotFound(NotFound):
error_type = 'blacklist_not_found'
class DomainNotFound(NotFound):
error_type = 'domain_not_found'

View File

@ -0,0 +1,63 @@
{
"$schema": "http://json-schema.org/draft-04/hyper-schema",
"id": "blacklist",
"title": "blacklist",
"description": "Blacklisted Zone",
"additionalProperties": false,
"required": ["blacklist"],
"properties": {
"blacklist": {
"type": "object",
"additionalProperties": false,
"required": ["pattern"],
"properties":{
"id": {
"type": "string",
"description": "Blacklisted Zone Identifier",
"pattern": "^([0-9a-fA-F]){8}-([0-9a-fA-F]){4}-([0-9a-fA-F]){4}-([0-9a-fA-F]){4}-([0-9a-fA-F]){12}$",
"readonly": true
},
"pattern": {
"type": "string",
"description": "Regex for blacklisted zone name",
"format": "regex",
"maxLength": 512,
"required": true
},
"created_at": {
"type": "string",
"description": "Date and time of blacklisted zone creation",
"format": "date-time",
"readonly": true
},
"description": {
"type": ["string", "null"],
"description": "Description for the blacklisted zone",
"maxLength": 160
},
"updated_at": {
"type": ["string", "null"],
"description": "Date and time of last blacklisted zone update",
"format": "date-time",
"readonly": true
},
"links": {
"type": "object",
"additionalProperties": false,
"properties": {
"self": {
"type": "string",
"format": "url"
}
}
}
}
}
}
}

View File

@ -0,0 +1,38 @@
{
"$schema": "http://json-schema.org/draft-04/hyper-schema",
"id": "blacklist",
"title": "blacklist",
"description": "Blacklisted Zone",
"additionalProperties": false,
"required": ["blacklists"],
"properties": {
"blacklists": {
"type": "array",
"description": "Blacklist",
"items": {"$ref": "blacklist#/properties/blacklist"}
},
"links": {
"type": "object",
"additionalProperties": false,
"properties": {
"self": {
"type": "string",
"format": "url"
},
"next": {
"type": ["string", "null"],
"format": "url"
},
"previous": {
"type": ["string", "null"],
"format": "url"
}
}
}
}
}

View File

@ -677,6 +677,93 @@ class StorageAPI(object):
"""
return self.storage.count_records(context, criterion)
@contextlib.contextmanager
def create_blacklist(self, context, values):
"""
Create a new Blacklisted Domain.
:param context: RPC Context.
:param values: Values to create the new Blacklist from.
"""
self.storage.begin()
try:
blacklist = self.storage.create_blacklist(context, values)
yield blacklist
except Exception:
with excutils.save_and_reraise_exception():
self.storage.rollback()
else:
self.storage.commit()
def get_blacklist(self, context, blacklist_id):
"""
Get a Blacklist via its ID.
:param context: RPC Context.
:param blacklist_id: ID of the Blacklisted Domain.
"""
return self.storage.get_blacklist(context, blacklist_id)
def find_blacklists(self, context, criterion=None):
"""
Find all Blacklisted Domains
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.find_blacklists(context, criterion)
def find_blacklist(self, context, criterion):
"""
Find a single Blacklisted Domain.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.find_blacklist(context, criterion)
@contextlib.contextmanager
def update_blacklist(self, context, blacklist_id, values):
"""
Update a Blacklisted Domain via ID.
:param context: RPC Context.
:param blacklist_id: Values to update the Blacklist with
:param values: Values to update the Blacklist from.
"""
self.storage.begin()
try:
blacklist = self.storage.update_blacklist(context,
blacklist_id,
values)
yield blacklist
except Exception:
with excutils.save_and_reraise_exception():
self.storage.rollback()
else:
self.storage.commit()
@contextlib.contextmanager
def delete_blacklist(self, context, blacklist_id):
"""
Delete a Blacklisted Domain
:param context: RPC Context.
:param blacklist_id: Blacklist ID to delete.
"""
self.storage.begin()
try:
yield self.storage.get_blacklist(context, blacklist_id)
self.storage.delete_blacklist(context, blacklist_id)
except Exception:
with excutils.save_and_reraise_exception():
self.storage.rollback()
else:
self.storage.commit()
def ping(self, context):
""" Ping the Storage connection """
return self.storage.ping(context)

View File

@ -443,6 +443,61 @@ class Storage(Plugin):
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def create_blacklist(self, context, values):
"""
Create a Blacklist.
:param context: RPC Context.
:param values: Values to create the new Blacklist from.
"""
@abc.abstractmethod
def get_blacklist(self, context, blacklist_id):
"""
Get a Blacklist via ID.
:param context: RPC Context.
:param blacklist_id: Blacklist ID to get.
"""
@abc.abstractmethod
def find_blacklists(self, context, criterion):
"""
Find Blacklists
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def find_blacklist(self, context, criterion):
"""
Find a single Blacklist.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def update_blacklist(self, context, blacklist_id, values):
"""
Update a Blacklist via ID
:param context: RPC Context.
:param blacklist_id: Blacklist ID to update.
:param values: Values to update the Blacklist from
"""
@abc.abstractmethod
def delete_blacklist(self, context, blacklist_id):
"""
Delete a Blacklist via ID.
:param context: RPC Context.
:param blacklist_id: Delete a Blacklist via ID
"""
def ping(self, context):
""" Ping the Storage connection """
return {

View File

@ -546,6 +546,63 @@ class SQLAlchemyStorage(base.Storage):
query = self._apply_criterion(models.Record, query, criterion)
return query.count()
#
# Blacklist Methods
#
def _find_blacklist(self, context, criterion, one=False):
try:
return self._find(models.Blacklists, context, criterion, one)
except exceptions.NotFound:
raise exceptions.BlacklistNotFound()
def create_blacklist(self, context, values):
blacklist = models.Blacklists()
blacklist.update(values)
try:
blacklist.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateBlacklist()
return dict(blacklist)
def find_blacklists(self, context, criterion=None):
blacklists = self._find_blacklist(context, criterion)
return [dict(b) for b in blacklists]
def get_blacklist(self, context, blacklist_id):
blacklist = self._find_blacklist(context,
{'id': blacklist_id}, one=True)
return dict(blacklist)
def find_blacklist(self, context, criterion):
blacklist = self._find_blacklist(context, criterion, one=True)
return dict(blacklist)
def update_blacklist(self, context, blacklist_id, values):
blacklist = self._find_blacklist(context, {'id': blacklist_id},
one=True)
blacklist.update(values)
try:
blacklist.save(self.session)
except exceptions.Duplicate:
raise exceptions.DuplicateBlacklist()
return dict(blacklist)
def delete_blacklist(self, context, blacklist_id):
blacklist = self._find_blacklist(context, {'id': blacklist_id},
one=True)
blacklist.delete(self.session)
# diagnostics
def ping(self, context):
start_time = time.time()

View File

@ -0,0 +1,54 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Author: Betsy Luzader <betsy.luzader@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Integer, String, DateTime
from sqlalchemy.schema import Table, Column, MetaData
from designate.openstack.common import timeutils
from designate.openstack.common.uuidutils import generate_uuid
from designate.sqlalchemy.types import UUID
meta = MetaData()
blacklists = Table(
'blacklists',
meta,
Column('id', UUID(), default=generate_uuid,
primary_key=True),
Column('created_at', DateTime(),
default=timeutils.utcnow),
Column('updated_at', DateTime(),
onupdate=timeutils.utcnow),
Column('version', Integer(), default=1,
nullable=False),
Column('pattern', String(512), nullable=False,
unique=True),
Column('description', String(160),
nullable=True),
mysql_engine='INNODB',
mysql_charset='utf8')
def upgrade(migrate_engine):
meta.bind = migrate_engine
blacklists.create()
def downgrade(migrate_engine):
meta.bind = migrate_engine
blacklists.drop()

View File

@ -188,3 +188,10 @@ class TsigKey(Base):
algorithm = Column(Enum(name='tsig_algorithms', *TSIG_ALGORITHMS),
nullable=False)
secret = Column(String(255), nullable=False)
class Blacklists(Base):
__tablename__ = 'blacklists'
pattern = Column(String(512), nullable=False, unique=True)
description = Column(Unicode(160), nullable=True)

View File

@ -220,6 +220,15 @@ class TestCase(test.BaseTestCase):
{'ptrdname': 'srv1.example.net.'}
]
blacklist_fixtures = [{
'pattern': 'blacklisted.com.',
'description': 'This is a comment',
}, {
'pattern': 'blacklisted.net.'
}, {
'pattern': 'blacklisted.org.'
}]
def setUp(self):
super(TestCase, self).setUp()
@ -384,6 +393,11 @@ class TestCase(test.BaseTestCase):
with open(path) as zonefile:
return zonefile.read()
def get_blacklist_fixture(self, fixture=0, values={}):
_values = copy.copy(self.blacklist_fixtures[fixture])
_values.update(values)
return _values
def create_quota(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
@ -465,6 +479,13 @@ class TestCase(test.BaseTestCase):
recordset['id'],
values=values)
def create_blacklist(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_blacklist_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_blacklist(context, values=values)
def _skip_decorator(func):
@functools.wraps(func)

View File

@ -66,8 +66,11 @@ class CentralServiceTest(CentralTestCase):
context, domain, 'a.example.COM.')
def test_is_blacklisted_domain_name(self):
self.config(domain_name_blacklist=['^example.org.$', 'net.$'],
group='service:central')
# Create blacklisted zones with specific names
self.create_blacklist(pattern='example.org.')
self.create_blacklist(pattern='example.net.')
self.create_blacklist(pattern='^blacklisted.org.$')
self.create_blacklist(pattern='com.$')
# Set the policy to reject the authz
self.policy({'use_blacklisted_domain': '!'})
@ -78,18 +81,28 @@ class CentralServiceTest(CentralTestCase):
context, 'org.')
self.assertFalse(result)
# Subdomains should not be allowed from a blacklisted domain
result = self.central_service._is_blacklisted_domain_name(
context, 'www.example.org.')
self.assertFalse(result)
self.assertTrue(result)
result = self.central_service._is_blacklisted_domain_name(
context, 'example.org.')
self.assertTrue(result)
# Check for blacklisted domains containing regexps
result = self.central_service._is_blacklisted_domain_name(
context, 'example.net.')
self.assertTrue(result)
result = self.central_service._is_blacklisted_domain_name(
context, 'example.com.')
self.assertTrue(result)
result = self.central_service._is_blacklisted_domain_name(
context, 'blacklisted.org.')
self.assertTrue(result)
def test_is_subdomain(self):
context = self.get_context()
@ -508,8 +521,8 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_domain(context, values=values)
def test_create_blacklisted_domain_success(self):
self.config(domain_name_blacklist=['^blacklisted.com.$'],
group='service:central')
# Create blacklisted zone using default values
self.create_blacklist()
# Set the policy to accept the authz
self.policy({'use_blacklisted_domain': '@'})
@ -522,7 +535,7 @@ class CentralServiceTest(CentralTestCase):
# Create a server
self.create_server()
# Create a domain
# Create a zone that is blacklisted
domain = self.central_service.create_domain(
self.admin_context, values=values)
@ -532,8 +545,7 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(domain['email'], values['email'])
def test_create_blacklisted_domain_fail(self):
self.config(domain_name_blacklist=['^blacklisted.com.$'],
group='service:central')
self.create_blacklist()
# Set the policy to reject the authz
self.policy({'use_blacklisted_domain': '!'})
@ -1705,3 +1717,101 @@ class CentralServiceTest(CentralTestCase):
self.central_service.get_floatingip(
context, fip['region'], fip['id'])
# Blacklist Tests
def test_create_blacklist(self):
values = self.get_blacklist_fixture(fixture=0)
blacklist = self.create_blacklist(fixture=0)
# Verify all values have been set correctly
self.assertIsNotNone(blacklist['id'])
self.assertEqual(blacklist['pattern'], values['pattern'])
self.assertEqual(blacklist['description'], values['description'])
def test_get_blacklist(self):
# Create a blacklisted zone
expected = self.create_blacklist(fixture=0)
# Retrieve it, and verify it is the same
blacklist = self.central_service.get_blacklist(
self.admin_context, expected['id'])
self.assertEqual(blacklist['id'], expected['id'])
self.assertEqual(blacklist['pattern'], expected['pattern'])
self.assertEqual(blacklist['description'], expected['description'])
def test_find_blacklists(self):
# Verify there are no blacklisted zones to start with
blacklists = self.central_service.find_blacklists(
self.admin_context)
self.assertEqual(len(blacklists), 0)
# Create a single blacklisted zone
self.create_blacklist()
# Verify we can retrieve the newly created blacklist
blacklists = self.central_service.find_blacklists(
self.admin_context)
values1 = self.get_blacklist_fixture(fixture=0)
self.assertEqual(len(blacklists), 1)
self.assertEqual(blacklists[0]['pattern'], values1['pattern'])
# Create a second blacklisted zone
self.create_blacklist(fixture=1)
# Verify we can retrieve both blacklisted zones
blacklists = self.central_service.find_blacklists(
self.admin_context)
values2 = self.get_blacklist_fixture(fixture=1)
self.assertEqual(len(blacklists), 2)
self.assertEqual(blacklists[0]['pattern'], values1['pattern'])
self.assertEqual(blacklists[1]['pattern'], values2['pattern'])
def test_find_blacklist(self):
#Create a blacklisted zone
expected = self.create_blacklist(fixture=0)
# Retrieve the newly created blacklist
blacklist = self.central_service.find_blacklist(
self.admin_context, {'id': expected['id']})
self.assertEqual(blacklist['pattern'], expected['pattern'])
self.assertEqual(blacklist['description'], expected['description'])
def test_update_blacklist(self):
# Create a blacklisted zone
expected = self.create_blacklist(fixture=0)
new_comment = "This is a different comment."
# Update the blacklist
updated_values = dict(
description=new_comment
)
self.central_service.update_blacklist(self.admin_context,
expected['id'],
updated_values)
# Fetch the blacklist
blacklist = self.central_service.get_blacklist(self.admin_context,
expected['id'])
# Verify that the record was updated correctly
self.assertEqual(blacklist['description'], new_comment)
def test_delete_blacklist(self):
# Create a blacklisted zone
blacklist = self.create_blacklist()
# Delete the blacklist
self.central_service.delete_blacklist(self.admin_context,
blacklist['id'])
# Try to fetch the blacklist to verify an exception is raised
with testtools.ExpectedException(exceptions.BlacklistNotFound):
self.central_service.get_blacklist(self.admin_context,
blacklist['id'])

View File

@ -32,9 +32,6 @@ root_helper = sudo
# Driver used for backend communication (e.g. fake, rpc, bind9, powerdns)
backend_driver = powerdns
# List of blacklist domain name regexes
#domain_name_blacklist = \.arpa\.$, \.novalocal\.$, \.localhost\.$, \.localdomain\.$, \.local\.$
# Maximum domain name length
max_domain_name_len = 255

View File

@ -35,9 +35,6 @@ root_helper = sudo
# Driver used for backend communication (e.g. fake, rpc, bind9, powerdns)
#backend_driver = fake
# List of blacklist domain name regexes
#domain_name_blacklist = \.arpa\.$, \.novalocal\.$, \.localhost\.$, \.localdomain\.$, \.local\.$
# Maximum domain name length
#max_domain_name_len = 255

View File

@ -53,6 +53,13 @@
"count_records": "rule:admin_or_owner",
"use_sudo": "rule:admin",
"create_blacklist": "rule:admin",
"find_blacklist": "rule:admin",
"find_blacklists": "rule:admin",
"get_blacklist": "rule:admin",
"update_blacklist": "rule:admin",
"delete_blacklist": "rule:admin",
"use_blacklisted_domain": "rule:admin",
"diagnostics_ping": "rule:admin",