Add backend validation and re-factored pool manage

- Added better test coverage.
- Cleaned up error handling.

Closes-Bug: #1987993

Change-Id: I4829c74706e125df5eae25336e13af780cfa304a
This commit is contained in:
Erik Olof Gunnar Andersson 2022-08-27 17:42:36 -07:00
parent 3207d71d54
commit 6d61ad555b
6 changed files with 684 additions and 232 deletions

View File

@ -15,7 +15,9 @@
# under the License.
#
# Copied: designate
import sys
import traceback
import eventlet
from oslo_config import cfg
@ -62,8 +64,9 @@ def add_command_parsers(subparsers):
parser.set_defaults(command_object=command_object)
category_subparsers = parser.add_subparsers(dest='action')
category_subparsers.required = True
for (action, action_fn) in methods_of(command_object):
for action, action_fn in methods_of(command_object):
action = getattr(action_fn, '_cmd_name', action)
parser = category_subparsers.add_parser(action)
@ -75,8 +78,8 @@ def add_command_parsers(subparsers):
parser.set_defaults(action_kwargs=action_kwargs)
category_opt = cfg.SubCommandOpt('category', title="Commands",
help="Available Commands",
category_opt = cfg.SubCommandOpt('category', title='Commands',
help='Available Commands',
handler=add_command_parsers)
@ -108,13 +111,15 @@ def fetch_func_args(func):
def main():
CONF.register_cli_opt(category_opt)
utils.read_config('designate', sys.argv)
logging.setup(CONF, 'designate')
gmr.TextGuruMeditation.setup_autorun(version)
fn = CONF.category.action_fn
fn_args = fetch_func_args(fn)
fn(*fn_args)
try:
fn = CONF.category.action_fn
fn_args = fetch_func_args(fn)
fn(*fn_args)
except Exception:
print('An error has occurred:\n%s' % traceback.format_exc())
return 255

View File

@ -39,3 +39,8 @@ class Commands(object):
self.context = context.DesignateContext.get_admin_context(
request_id='designate-manage'
)
self.output_message = ['']
def _print_result(self):
for message in self.output_message:
print(message)

View File

@ -13,13 +13,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import stevedore.exception
import yaml
from designate.backend import base as backend_base
from designate.central import rpcapi as central_rpcapi
from designate import exceptions
from designate.manage import base
@ -27,109 +28,76 @@ from designate import objects
from designate.objects.adapters import DesignateAdapter
from designate import policy
from designate import rpc
from designate import utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class PoolCommands(base.Commands):
def __init__(self):
super(PoolCommands, self).__init__()
self.output_msg = ['']
self.central_api = None
self.dry_run = False
self.skip_verify_drivers = False
# NOTE(jh): Cannot do this earlier because we are still missing the config
# at that point, see bug #1651576
def _startup(self):
def _setup(self, dry_run=False, skip_verify_drivers=False):
self.dry_run = dry_run
self.skip_verify_drivers = skip_verify_drivers
rpc.init(cfg.CONF)
self.central_api = central_rpcapi.CentralAPI()
def _create_pool(self, pool, dry_run):
pool = DesignateAdapter.parse('YAML', pool, objects.Pool())
for ns_record in pool.ns_records:
try:
ns_record.validate()
except exceptions.InvalidObject as e:
LOG.error(e.errors.to_list()[0]['message'])
sys.exit(1)
if dry_run:
self.output_msg.append('Create Pool: %s' % pool)
else:
LOG.info('Creating new pool: %s', pool)
self.central_api.create_pool(self.context, pool)
def _update_zones(self, pool):
LOG.info('Updating zone masters for pool: %s', pool.id)
def __get_masters_from_pool(pool):
masters = []
for target in pool.targets:
for master in target.get('masters', []):
master = {'host': master['host'], 'port': master['port']}
found = False
for existing_master in masters:
if master == existing_master:
found = True
if not found:
masters.append(master)
return masters
policy.init()
self.context.all_tenants = True
zones = self.central_api.find_zones(
self.context,
criterion={'pool_id': pool.id})
for zone in zones:
zone.masters = objects.ZoneMasterList().from_list(
__get_masters_from_pool(pool)
)
self.central_api.update_zone(
self.context, zone
)
@base.args('--file', help='The path to the file the yaml output should be '
'written to',
'written to',
default='/etc/designate/pools.yaml')
def generate_file(self, file):
self._startup()
self._setup()
try:
pools = self.central_api.find_pools(self.context)
data = DesignateAdapter.render('YAML', pools)
self._write_config_to_file(file, data)
except messaging.exceptions.MessagingTimeout:
LOG.critical(
'No response received from designate-central. '
'Check it is running, and retry'
)
sys.exit(1)
with open(file, 'w') as stream:
yaml.dump(
DesignateAdapter.render('YAML', pools),
stream,
default_flow_style=False
)
raise SystemExit(1)
@base.args('--pool_id', help='ID of the pool to be examined',
default=CONF['service:central'].default_pool_id)
def show_config(self, pool_id):
self._startup()
self._setup()
self.output_message.append('Pool Configuration:')
self.output_message.append('-------------------')
try:
if not utils.is_uuid_like(pool_id):
self.output_message.append('Not a valid uuid: %s' % pool_id)
raise SystemExit(1)
pool = self.central_api.find_pool(self.context, {'id': pool_id})
print('Pool Configuration:')
print('-------------------')
print(yaml.dump(DesignateAdapter.render('YAML', pool),
default_flow_style=False))
self.output_message.append(
yaml.dump(
DesignateAdapter.render('YAML', pool),
default_flow_style=False
)
)
except exceptions.PoolNotFound:
self.output_message.append('Pool not found')
raise SystemExit(1)
except messaging.exceptions.MessagingTimeout:
LOG.critical(
'No response received from designate-central. '
'Check it is running, and retry'
)
sys.exit(1)
raise SystemExit(1)
finally:
self._print_result()
@base.args('--file', help='The path to the yaml file describing the pools',
default='/etc/designate/pools.yaml')
@ -144,98 +112,156 @@ class PoolCommands(base.Commands):
help='This will simulate what will happen when you run this command',
action='store_true',
default=False)
def update(self, file, delete, dry_run):
self._startup()
print('Updating Pools Configuration')
print('****************************')
@base.args(
'--skip-verify-drivers',
help='Don\'t verify the designate backend drivers',
action='store_true',
default=False)
def update(self, file, delete, dry_run=False, skip_verify_drivers=False):
self._setup(dry_run, skip_verify_drivers)
with open(file, 'r') as stream:
xpools = yaml.safe_load(stream)
try:
self.output_message.append('Updating Pools Configuration')
self.output_message.append('****************************')
if dry_run:
self.output_msg.append('The following changes will occur:')
self.output_msg.append('*********************************')
pools_data = self._load_config(file)
for xpool in xpools:
try:
if 'id' in xpool:
try:
pool = self.central_api.get_pool(
self.context, xpool['id']
)
except Exception as e:
LOG.critical(
'Bad ID Supplied for pool. pool_id: '
'%(pool)s message: %(res)s',
{
'pool': xpool['id'], 'res': e
}
)
continue
else:
pool = self.central_api.find_pool(
self.context, {'name': xpool['name']}
)
if dry_run:
self.output_message.append('The following changes will occur:')
self.output_message.append('*********************************')
LOG.info('Updating existing pool: %s', pool)
for pool_data in pools_data:
self._create_or_update_pool(pool_data)
# TODO(kiall): Move the below into the pool object
if delete:
pools = self.central_api.find_pools(self.context)
pools_in_db = {pool.name for pool in pools}
pools_in_yaml = {pool_data['name'] for pool_data in pools_data}
pools_to_delete = pools_in_db - pools_in_yaml
for pool_name in pools_to_delete:
self._delete_pool(pool_name)
pool = DesignateAdapter.parse('YAML', xpool, pool)
except exceptions.InvalidObject as e:
self.output_message.append(str(e))
raise SystemExit(1)
except messaging.exceptions.MessagingTimeout:
LOG.critical(
'No response received from designate-central. '
'Check it is running, and retry'
)
raise SystemExit(1)
finally:
self._print_result()
# TODO(graham): We should be doing a full validation, but right
# now there is quirks validating through nested objects.
def _create_or_update_pool(self, pool_data):
try:
pool = self._get_pool(pool_data)
self._update_pool(pool_data, pool)
for ns_record in pool.ns_records:
try:
ns_record.validate()
except exceptions.InvalidObject as e:
LOG.error(e.errors.to_list()[0]['message'])
sys.exit(1)
except exceptions.PoolNotFound:
self._create_pool(pool_data)
if dry_run:
self.output_msg.append('Update Pool: %s' % pool)
else:
pool = self.central_api.update_pool(self.context, pool)
# Bug: Changes in the pool targets should trigger a
# zone masters update LP: #1879798.
self._update_zones(pool)
def _get_pool(self, pool_data):
if 'id' in pool_data:
pool_id = pool_data['id']
if not utils.is_uuid_like(pool_id):
self.output_message.append('Not a valid uuid: %s' % pool_id)
raise SystemExit(1)
except exceptions.PoolNotFound:
self._create_pool(xpool, dry_run)
except messaging.exceptions.MessagingTimeout:
LOG.critical(
'No response received from designate-central. '
'Check it is running, and retry'
)
sys.exit(1)
pool = self.central_api.get_pool(
self.context, pool_id
)
else:
pool = self.central_api.find_pool(
self.context, {'name': pool_data['name']}
)
if delete:
pools = self.central_api.find_pools(self.context)
pools_in_db = {pool.name for pool in pools}
pools_in_yaml = {xpool['name'] for xpool in xpools}
return pool
pools_to_delete = pools_in_db - pools_in_yaml
def _create_pool(self, pool_data):
pool = DesignateAdapter.parse('YAML', pool_data, objects.Pool())
self._validate_pool(pool)
for pool in pools_to_delete:
if self.dry_run:
self.output_message.append('Create Pool: %s' % pool)
else:
LOG.info('Creating new pool: %s', pool)
self.central_api.create_pool(self.context, pool)
return pool
def _update_pool(self, pool_data, pool):
pool = DesignateAdapter.parse('YAML', pool_data, pool)
self._validate_pool(pool)
if self.dry_run:
self.output_message.append('Update Pool: %s' % pool)
else:
pool = self.central_api.update_pool(self.context, pool)
self._update_zones(pool)
def _delete_pool(self, pool_name):
pool = self.central_api.find_pool(
self.context, criterion={'name': pool_name}
)
if self.dry_run:
self.output_message.append('Delete Pool: %s' % pool_name)
else:
LOG.info('Deleting %s', pool_name)
self.central_api.delete_pool(self.context, pool.id)
def _update_zones(self, pool):
LOG.info('Updating zone masters for pool: %s', pool.id)
policy.init()
self.context.all_tenants = True
zones = self.central_api.find_zones(
self.context, criterion={'pool_id': pool.id}
)
for zone in zones:
zone.masters = objects.ZoneMasterList().from_list(
self._get_masters_from_pool(pool)
)
self.central_api.update_zone(self.context, zone)
def _validate_pool(self, pool):
for ns_record in pool.ns_records:
ns_record.validate()
if not self.skip_verify_drivers:
for target in pool.targets:
try:
p = self.central_api.find_pool(
self.context,
criterion={'name': pool})
if dry_run:
self.output_msg.append('Delete Pool: %s' % p)
else:
LOG.info('Deleting %s', p)
self.central_api.delete_pool(self.context, p.id)
except messaging.exceptions.MessagingTimeout:
LOG.critical(
'No response received from designate-central. '
'Check it is running, and retry'
backend_base.Backend.get_driver(target.type)
except stevedore.exception.NoMatches:
self.output_message.append(
'Unable to find designate backend driver type: '
'%s' % target.type
)
sys.exit(1)
if not self.dry_run:
raise SystemExit(1)
for line in self.output_msg:
print(line)
@staticmethod
def _get_masters_from_pool(pool):
masters = []
for target in pool.targets:
for master in target.get('masters', []):
master = {'host': master['host'], 'port': master['port']}
found = False
for existing_master in masters:
if master == existing_master:
found = True
if not found:
masters.append(master)
return masters
@staticmethod
def _load_config(filename):
with open(filename, 'r') as stream:
return yaml.safe_load(stream)
@staticmethod
def _write_config_to_file(filename, data):
with open(filename, 'w') as stream:
yaml.dump(data, stream, default_flow_style=False)

View File

@ -0,0 +1,39 @@
---
- name: bind
description: Default BIND Pool
ns_records:
- hostname: ns1-1.example.org.
priority: 1
- hostname: ns1-2.example.org.
priority: 2
- hostname: ns1-3.example.org.
priority: 3
nameservers:
- host: 192.0.2.2
port: 53
- host: 192.0.2.3
port: 53
targets:
- type: bind9
description: BIND Instance
masters:
- host: 192.0.2.5
port: 5354
- host: 192.0.2.6
port: 5354
- host: 192.0.2.7
port: 5354
options:
host: ::1
port: 5322
rndc_host: ::1
rndc_port: 953
rndc_config_file: /etc/bind/rndc.conf
rndc_key_file: /etc/bind/rndc.key
also_notifies:
- host: 192.0.2.4
port: 53

View File

@ -0,0 +1,421 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from unittest import mock
import yaml
from oslo_log import log as logging
import oslo_messaging
from designate.central import service
from designate.manage import base
from designate.manage import pool
from designate.tests import fixtures
from designate.tests import resources
from designate.tests.test_manage import DesignateManageTestCase
LOG = logging.getLogger(__name__)
def get_pools_path(name='pools.yaml'):
return os.path.join(resources.path, 'pools_yaml', name)
def get_pools(name='pools.yaml'):
with open(get_pools_path(name), 'r') as pool_obj:
return yaml.safe_load(pool_obj)
class ManagePoolTestCase(DesignateManageTestCase):
def setUp(self):
super(DesignateManageTestCase, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
default_pool = self.central_service.find_pool(
self.admin_context, {'name': 'default'}
)
self.central_service.delete_pool(self.admin_context, default_pool.id)
self.command = pool.PoolCommands()
self.print_result = mock.patch.object(
base.Commands, '_print_result').start()
def test_show_config(self):
self.command._setup()
self.command._create_pool(get_pools()[0])
pool_id = self.central_service.find_pool(
self.admin_context, {'name': 'default'}).id
self.command.show_config(pool_id)
self.print_result.assert_called_once()
self.assertIn('Pool Configuration', self.command.output_message[1])
self.assertIn(
'Default PowerDNS 4 Pool', ''.join(self.command.output_message)
)
@mock.patch.object(service.Service, 'find_pool',
side_effect=oslo_messaging.MessagingTimeout())
def test_show_config_rpc_timeout(self, mock_find_pool):
self.assertRaises(
SystemExit,
self.command.show_config, '5421ca70-f1b7-4edc-9e01-b604011a262a'
)
mock_find_pool.assert_called_once()
def test_show_config_pool_not_found(self):
self.assertRaises(
SystemExit,
self.command.show_config, '5421ca70-f1b7-4edc-9e01-b604011a262a'
)
self.assertIn(
'Pool not found', ''.join(self.command.output_message)
)
def test_show_config_invalid_uuid(self):
self.assertRaises(
SystemExit,
self.command.show_config, 'None'
)
self.print_result.assert_called_once()
self.assertIn(
'Not a valid uuid: None', ''.join(self.command.output_message)
)
def test_show_config_empty(self):
self.assertRaises(
SystemExit,
self.command.show_config, 'a36bb018-9584-420c-acc6-2b5cf89714ad'
)
self.print_result.assert_called_once()
self.assertIn('Pool not found', ''.join(self.command.output_message))
def test_update(self):
self.command.update(
get_pools_path('pools.yaml'), delete=False, dry_run=False
)
self.print_result.assert_called_once()
self.assertIn(
'Updating Pools Configuration****************************',
''.join(self.command.output_message)
)
pool = self.central_service.find_pool(self.admin_context, {
'name': 'default'
})
self.assertEqual(1, len(pool.targets))
self.assertEqual('pdns4', pool.targets[0].type)
def test_update_bind9(self):
self.command.update(
get_pools_path('bind9_pools.yaml'), delete=False, dry_run=False
)
self.print_result.assert_called_once()
self.assertIn(
'Updating Pools Configuration****************************',
''.join(self.command.output_message)
)
pool = self.central_service.find_pool(self.admin_context, {
'name': 'bind'
})
self.assertEqual(1, len(pool.targets))
self.assertEqual('bind9', pool.targets[0].type)
def test_update_multiple_pools(self):
self.command.update(
get_pools_path('multiple-pools.yaml'), delete=False, dry_run=False
)
self.print_result.assert_called_once()
self.assertIn(
'Updating Pools Configuration****************************',
''.join(self.command.output_message)
)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(2, len(pools))
@mock.patch.object(service.Service, 'find_pool',
side_effect=oslo_messaging.MessagingTimeout())
def test_update_rpc_timeout(self, mock_find_pool):
self.assertRaises(
SystemExit,
self.command.update,
get_pools_path('pools.yaml'), delete=False, dry_run=False
)
mock_find_pool.assert_called_once()
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_pool_with_invalid_uuid(self, mock_load_config):
mock_load_config.return_value = [{
'name': 'default',
'id': 'invalid',
}]
self.assertRaises(
SystemExit,
self.command.update, 'test.yaml', delete=False, dry_run=False
)
self.assertIn(
'Not a valid uuid: invalid',
''.join(self.command.output_message)
)
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_pool_invalid_ns_record(self, mock_load_config):
mock_load_config.return_value = [{
'name': 'default',
'ns_records': [
{'hostname': 'ns1-1.example.org.', 'priority': None},
],
'targets': [
{
'type': 'powerdns',
}
],
}]
self.assertRaises(
SystemExit,
self.command.update, 'test.yaml', delete=False, dry_run=False
)
self.assertIn(
"Provided object is not valid. Got a ValueError error with "
"message Field `priority' cannot be None",
''.join(self.command.output_message)
)
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_new_backend(self, mock_load_config):
self.command._setup()
self.command._create_pool(get_pools()[0])
self.create_zone(fixture=0)
self.create_zone(fixture=1)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(1, len(pools))
self.assertEqual('pdns4', pools[0].targets[0].type)
new_default = dict(get_pools()[0])
new_default['targets'][0]['type'] = 'bind9'
mock_load_config.return_value = [new_default]
self.command.update('test.yaml', delete=False, dry_run=False)
mock_load_config.assert_called_once_with('test.yaml')
self.print_result.assert_called_once()
self.assertIn(
'Updating Pools Configuration****************************',
''.join(self.command.output_message)
)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(1, len(pools))
self.assertEqual('bind9', pools[0].targets[0].type)
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_pool_unknown_backend(self, mock_load_config):
mock_load_config.return_value = [{
'name': 'default',
'ns_records': [
{'hostname': 'ns1-1.example.org.', 'priority': 1},
],
'targets': [
{
'type': 'powerdns',
}
],
}]
self.assertRaises(
SystemExit,
self.command.update, 'test.yaml', delete=False, dry_run=False
)
self.assertIn(
'Unable to find designate backend driver type: powerdns',
''.join(self.command.output_message)
)
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_pool_unknown_backend_dry_run(self, mock_load_config):
mock_load_config.return_value = [{
'name': 'default',
'ns_records': [
],
'targets': [
{
'type': 'powerdns',
}
],
}]
self.command.update(
'test.yaml', delete=False, dry_run=True
)
self.assertIn(
'Unable to find designate backend driver type: powerdns',
''.join(self.command.output_message)
)
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_pool_unknown_backend_skip_verify(self, mock_load_config):
mock_load_config.return_value = [{
'name': 'default',
'ns_records': [
],
'targets': [
{
'type': 'powerdns',
}
],
}]
self.command.update(
'test.yaml', delete=False, dry_run=False, skip_verify_drivers=True
)
self.assertNotIn(
'Unable to find designate backend driver type: powerdns',
''.join(self.command.output_message)
)
def test_update_with_delete(self):
self.command.update(
get_pools_path('multiple-pools.yaml'), delete=True, dry_run=False
)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(2, len(pools))
self.command.update(
get_pools_path('pools.yaml'), delete=True, dry_run=False
)
self.print_result.assert_called()
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(1, len(pools))
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_with_delete_dry_run(self, mock_load_config):
default_pool = dict(get_pools()[0])
default_pool['id'] = 'a234253f-9fd8-4e1c-996e-5bcb152f43d5'
additional_pool = {
'name': 'second_pool',
'ns_records': [
{'hostname': 'ns1-1.example.org.', 'priority': 1},
],
'targets': [
{
'type': 'pdns4',
}
],
}
mock_load_config.return_value = get_pools()
self.command._setup()
self.command._create_pool(default_pool)
self.command._create_pool(additional_pool)
self.command.update('test.yaml', delete=True, dry_run=True)
mock_load_config.assert_called_once_with('test.yaml')
self.print_result.assert_called_once()
self.assertIn(
"Update Pool: <Pool id:'a234253f-9fd8-4e1c-996e-5bcb152f43d5' "
"name:'default'>",
' '.join(self.command.output_message)
)
self.assertIn(
'Delete Pool: second_pool',
''.join(self.command.output_message)
)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(2, len(pools))
@mock.patch.object(pool.PoolCommands, '_load_config')
def test_update_dry_run(self, mock_load_config):
mock_load_config.return_value = get_pools()
self.command.update('test.yaml', delete=True, dry_run=True)
mock_load_config.assert_called_once_with('test.yaml')
self.print_result.assert_called_once()
self.assertIn(
"Create Pool: <Pool id:'None' name:'default'>",
''.join(self.command.output_message)
)
@mock.patch.object(pool.PoolCommands, '_write_config_to_file')
def test_generate_file(self, mock_write_config_to_file):
self.command._setup()
self.command._create_pool(get_pools()[0])
self.command.generate_file('test.yaml')
mock_write_config_to_file.assert_called_once()
@mock.patch.object(service.Service, 'find_pools',
side_effect=oslo_messaging.MessagingTimeout())
def test_generate_file_rpc_timeout(self, mock_find_pools):
self.assertRaises(
SystemExit,
self.command.generate_file, 'test.yaml'
)
mock_find_pools.assert_called_once()
def test_create_new_pool(self):
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(0, len(pools))
self.command._setup()
self.command._create_pool(get_pools()[0])
new_pool = self.central_service.find_pool(
self.admin_context, {'name': 'default'}
)
self.assertEqual('default', new_pool.name)
self.assertEqual('Default PowerDNS 4 Pool', new_pool.description)
pools = self.central_service.find_pools(self.admin_context, {})
self.assertEqual(1, len(pools))
def test_get_pool_by_id(self):
self.command._setup()
self.command._create_pool(get_pools()[0])
new_pool = self.central_service.find_pool(
self.admin_context, {'name': 'default'}
)
self.assertEqual(
'default',
self.command._get_pool({'id': new_pool.id}).name
)

View File

@ -9,12 +9,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_log import log as logging
from designate import context
from designate.manage.pool import PoolCommands
from designate.manage import base
from designate.manage import pool
from designate import objects
from designate.tests import fixtures
from designate.tests.test_manage import DesignateManageTestCase
@ -22,25 +23,25 @@ from designate.tests.test_manage import DesignateManageTestCase
LOG = logging.getLogger(__name__)
def hydrate_pool_targets(target_masters):
pool_targets = objects.PoolTargetList()
masters = objects.PoolTargetMasterList()
for target_master in target_masters:
masters.append(target_master)
target = objects.PoolTarget(masters=masters)
target.masters = masters
pool_targets.append(target)
return pool_targets
class UpdatePoolTestCase(DesignateManageTestCase):
def setUp(self):
super(DesignateManageTestCase, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.context = context.DesignateContext.get_admin_context(
request_id='designate-manage'
)
def hydrate_pool_targets(self, target_masters):
pool_targets = objects.PoolTargetList()
masters = objects.PoolTargetMasterList()
for target_master in target_masters:
masters.append(target_master)
target = objects.PoolTarget(masters=masters)
target.masters = masters
pool_targets.append(target)
return pool_targets
self.print_result = mock.patch.object(
base.Commands, '_print_result').start()
def test_update_pools_zones(self):
values = dict(
@ -53,23 +54,22 @@ class UpdatePoolTestCase(DesignateManageTestCase):
self.admin_context, zone=objects.Zone.from_dict(values))
# Ensure the correct NS Records are in place
pool = self.central_service.get_pool(
new_pool = self.central_service.get_pool(
self.admin_context, zone.pool_id
)
pool.targets = self.hydrate_pool_targets([objects.PoolTargetMaster(
pool_target_id=pool.id,
new_pool.targets = hydrate_pool_targets([objects.PoolTargetMaster(
pool_target_id=new_pool.id,
host='192.0.2.2',
port='53')]
)
command = PoolCommands()
command.context = self.context
command.central_api = self.central_service
command = pool.PoolCommands()
command._setup()
with mock.patch.object(
self.central_service, 'update_zone') as mock_update_zone:
command._update_zones(pool)
command._update_zones(new_pool)
mock_update_zone.assert_called_once()
def test_update_pools_zones_multiple_masters(self):
@ -83,70 +83,26 @@ class UpdatePoolTestCase(DesignateManageTestCase):
self.admin_context, zone=objects.Zone.from_dict(values))
# Ensure the correct NS Records are in place
pool = self.central_service.get_pool(
new_pool = self.central_service.get_pool(
self.admin_context, zone.pool_id
)
targets1 = self.hydrate_pool_targets([
targets1 = hydrate_pool_targets([
objects.PoolTargetMaster(
pool_target_id=pool.id,
pool_target_id=new_pool.id,
host='192.0.2.3',
port='53')
])
targets2 = self.hydrate_pool_targets([
targets2 = hydrate_pool_targets([
objects.PoolTargetMaster(
pool_target_id=pool.id,
pool_target_id=new_pool.id,
host='192.0.2.4',
port='53')
])
pool.targets = objects.PoolTargetList()
pool.targets.extend(targets1.objects + targets2.objects)
new_pool.targets = objects.PoolTargetList()
new_pool.targets.extend(targets1.objects + targets2.objects)
command = PoolCommands()
command.context = self.context
command.central_api = self.central_service
command = pool.PoolCommands()
command._setup()
command._update_zones(pool)
def test_create_new_pool(self):
pool = {
'name': 'new_pool',
'description': 'New PowerDNS Pool',
'attributes': {},
'ns_records': [
{'hostname': 'ns1-1.example.org.', 'priority': 1},
{'hostname': 'ns1-2.example.org.', 'priority': 2}
],
'nameservers': [
{'host': '192.0.2.2', 'port': 53}
],
'targets': [
{
'type': 'powerdns',
'description': 'PowerDNS Database Cluster',
'masters': [
{'host': '192.0.2.1', 'port': 5354}
],
'options': {
'host': '192.0.2.2', 'port': 53,
'connection': 'connection'
}
}
],
'also_notifies': [
{'host': '192.0.2.4', 'port': 53}
]
}
command = PoolCommands()
command.context = self.context
command.central_api = self.central_service
command._create_pool(pool, dry_run=False)
pool = self.central_service.find_pool(
self.admin_context, {'name': 'new_pool'}
)
self.assertEqual('new_pool', pool.name)
self.assertEqual('New PowerDNS Pool', pool.description)
command._update_zones(new_pool)