Implement Pool Targets

We separate out the idea of targets (backends you write to), to places you read
from (DNS servers you query). This allows for backends like Akamai/Dynect/Agent
to work correctly with multiple DNS servers behind one target.

Change-Id: If1060ccd83bce8201f52e9927789db21fc2675c9
This commit is contained in:
Kiall Mac Innes 2015-03-25 13:17:16 +00:00
parent 4970298aa7
commit 3386d9539c
44 changed files with 986 additions and 1756 deletions

View File

@ -42,8 +42,9 @@ DESIGNATE_APIPASTE_CONF=$DESIGNATE_CONF_DIR/api-paste.ini
# Set up default options
DESIGNATE_BACKEND_DRIVER=${DESIGNATE_BACKEND_DRIVER:=powerdns}
DESIGNATE_POOL_MANAGER_CACHE_DRIVER=${DESIGNATE_POOL_MANAGER_CACHE_DRIVER:-noop}
DESIGNATE_POOL_ID=${DESIGNATE_POOL_ID:-794ccc2c-d751-44fe-b57f-8894c9f5c842}
DESIGNATE_SERVER_ID=${DESIGNATE_SERVER_ID:-f26e0b32-736f-4f0a-831b-039a415c481e}
DESIGNATE_TARGET_ID=${DESIGNATE_TARGET_ID:-f26e0b32-736f-4f0a-831b-039a415c481e}
# Public IP/Port Settings
DESIGNATE_SERVICE_PROTOCOL=${DESIGNATE_SERVICE_PROTOCOL:-$SERVICE_PROTOCOL}
@ -110,12 +111,22 @@ function configure_designate {
iniset $DESIGNATE_CONF DEFAULT state_path $DESIGNATE_STATE_PATH
iniset $DESIGNATE_CONF DEFAULT root-helper sudo designate-rootwrap $DESIGNATE_ROOTWRAP_CONF
iniset $DESIGNATE_CONF storage:sqlalchemy connection `database_connection_url designate`
iniset $DESIGNATE_CONF service:pool_manager pool_id $DESIGNATE_POOL_ID
iniset $DESIGNATE_CONF pool_manager_cache:sqlalchemy connection `database_connection_url designate_pool_manager`
iniset $DESIGNATE_CONF service:api enabled_extensions_v1 $DESIGNATE_ENABLED_EXTENSIONS_V1
iniset $DESIGNATE_CONF service:api enabled_extensions_v2 $DESIGNATE_ENABLED_EXTENSIONS_V2
iniset $DESIGNATE_CONF service:api enabled_extensions_admin $DESIGNATE_ENABLED_EXTENSIONS_ADMIN
# Pool Manager
iniset $DESIGNATE_CONF service:pool_manager pool_id $DESIGNATE_POOL_ID
iniset $DESIGNATE_CONF service:pool_manager cache_driver $DESIGNATE_POOL_MANAGER_CACHE_DRIVER
# Pool Manager Cache
if [ "$DESIGNATE_POOL_MANAGER_CACHE_DRIVER" == "sqlalchemy" ]; then
iniset $DESIGNATE_CONF pool_manager_cache:sqlalchemy connection `database_connection_url designate_pool_manager`
fi
# Pool Options
iniset $DESIGNATE_CONF pool:$DESIGNATE_POOL_ID targets $DESIGNATE_TARGET_ID
sudo cp $DESIGNATE_DIR/etc/designate/rootwrap.conf.sample $DESIGNATE_ROOTWRAP_CONF
iniset $DESIGNATE_ROOTWRAP_CONF DEFAULT filters_path $DESIGNATE_DIR/etc/designate/rootwrap.d root-helper
@ -207,11 +218,13 @@ function init_designate {
# Init and migrate designate database
designate-manage database sync
if [ "$DESIGNATE_POOL_MANAGER_CACHE_DRIVER" == "sqlalchemy" ]; then
# (Re)create designate_pool_manager cache
recreate_database designate_pool_manager utf8
# Init and migrate designate pool-manager-cache
designate-manage pool-manager-cache sync
fi
init_designate_backend
}

View File

@ -75,17 +75,15 @@ EOF
# configure_designate_backend - make configuration changes, including those to other services
function configure_designate_backend {
iniset $DESIGNATE_CONF service:pool_manager backends bind9
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID type bind9
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID masters $DESIGNATE_SERVICE_HOST:$DESIGNATE_SERVICE_PORT_MDNS
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID options "rndc_host: $DESIGNATE_SERVICE_HOST, rndc_port: $DESIGNATE_SERVICE_PORT_RNDC, rndc_config_file: $BIND_CFG_DIR/rndc.conf, rndc_key_file: $BIND_CFG_DIR/rndc.key"
iniset $DESIGNATE_CONF backend:bind9 masters $DESIGNATE_SERVICE_HOST:$DESIGNATE_SERVICE_PORT_MDNS
iniset $DESIGNATE_CONF backend:bind9 server_ids $DESIGNATE_SERVER_ID
iniset $DESIGNATE_CONF backend:bind9 rndc_port $DESIGNATE_SERVICE_PORT_RNDC
iniset $DESIGNATE_CONF backend:bind9 rndc_host $DESIGNATE_SERVICE_HOST
iniset $DESIGNATE_CONF backend:bind9 rndc_config_file "$BIND_CFG_DIR/rndc.conf"
iniset $DESIGNATE_CONF backend:bind9 rndc_key_file "$BIND_CFG_DIR/rndc.key"
iniset $DESIGNATE_CONF backend:bind9:$DESIGNATE_SERVER_ID host $DESIGNATE_SERVICE_HOST
iniset $DESIGNATE_CONF backend:bind9:$DESIGNATE_SERVER_ID port $DESIGNATE_SERVICE_PORT_DNS
# DevStack Managed BIND NameServer
local nameserver_id=`uuidgen`
iniset $DESIGNATE_CONF pool:$DESIGNATE_POOL_ID nameservers $nameserver_id
iniset $DESIGNATE_CONF pool_nameserver:$nameserver_id host $DESIGNATE_SERVICE_HOST
iniset $DESIGNATE_CONF pool_nameserver:$nameserver_id port $DESIGNATE_SERVICE_PORT_DNS
sudo chown $STACK_USER $BIND_CFG_DIR

View File

@ -46,14 +46,15 @@ function install_designate_backend {
# configure_designate_backend - make configuration changes, including those to other services
function configure_designate_backend {
iniset $DESIGNATE_CONF service:pool_manager backends powerdns
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID type powerdns
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID masters $DESIGNATE_SERVICE_HOST:$DESIGNATE_SERVICE_PORT_MDNS
iniset $DESIGNATE_CONF pool_target:$DESIGNATE_TARGET_ID options "connection: `database_connection_url designate_pdns`"
iniset $DESIGNATE_CONF backend:powerdns server_ids $DESIGNATE_SERVER_ID
iniset $DESIGNATE_CONF backend:powerdns connection `database_connection_url designate_pdns`
iniset $DESIGNATE_CONF backend:powerdns masters "$DESIGNATE_SERVICE_HOST:$DESIGNATE_SERVICE_PORT_MDNS"
iniset $DESIGNATE_CONF backend:powerdns:$DESIGNATE_SERVER_ID host $DESIGNATE_SERVICE_HOST
iniset $DESIGNATE_CONF backend:powerdns:$DESIGNATE_SERVER_ID port $DESIGNATE_SERVICE_PORT_DNS
# DevStack Managed PDNS NameServer
local nameserver_id=`uuidgen`
iniset $DESIGNATE_CONF pool:$DESIGNATE_POOL_ID nameservers $nameserver_id
iniset $DESIGNATE_CONF pool_nameserver:$nameserver_id host $DESIGNATE_SERVICE_HOST
iniset $DESIGNATE_CONF pool_nameserver:$nameserver_id port $DESIGNATE_SERVICE_PORT_DNS
sudo tee $POWERDNS_CFG_DIR/pdns.conf > /dev/null <<EOF
# General Config
@ -98,7 +99,7 @@ function init_designate_backend {
recreate_database designate_pdns utf8
# Init and migrate designate_pdns database
designate-manage powerdns sync
designate-manage powerdns sync $DESIGNATE_TARGET_ID
}
# start_designate_backend - start any external services

View File

@ -31,6 +31,9 @@ ENABLED_SERVICES+=,designate,designate-central,designate-api,designate-pool-mana
# setup.cfg)
#DESIGNATE_BACKEND_DRIVER=powerdns
# Pool Manager Cache Driver (e.g. noop, memcache, sqlalchemy. See
# designate.backend section of setup.cfg)
#DESIGNATE_POOL_MANAGER_CACHE_DRIVER=noop
# Other Devstack Config
# =====================

View File

@ -20,15 +20,10 @@ from designate.backend.base import Backend
LOG = logging.getLogger(__name__)
def get_backend(backend_driver, backend_options):
LOG.debug("Loading backend driver: %s" % backend_driver)
cls = Backend.get_driver(backend_driver)
def get_backend(type_, target):
# TODO(kiall): Type is attached to the target, use it.
LOG.debug("Loading backend driver: %s" % type_)
return cls(backend_options)
cls = Backend.get_driver(type_)
def get_server_object(backend_driver, server_id):
LOG.debug("Loading backend driver: %s" % backend_driver)
cls = Backend.get_driver(backend_driver)
return cls.get_server_object(backend_driver, server_id)
return cls(target)

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import copy
from oslo.config import cfg
from oslo_log import log as logging
@ -22,10 +21,13 @@ from oslo_log import log as logging
from designate.i18n import _LI
from designate.context import DesignateContext
from designate.plugin import DriverPlugin
from designate import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('pool_id', 'designate.pool_manager',
group='service:pool_manager')
class Backend(DriverPlugin):
@ -33,10 +35,14 @@ class Backend(DriverPlugin):
__plugin_type__ = 'backend'
__plugin_ns__ = 'designate.backend'
def __init__(self, backend_options):
def __init__(self, target):
super(Backend, self).__init__()
self.backend_options = backend_options
self.target = target
self.options = target.options
self.masters = target.masters
# TODO(kiall): Context's should never be shared accross requests.
self.admin_context = DesignateContext.get_admin_context()
self.admin_context.all_tenants = True
@ -46,145 +52,6 @@ class Backend(DriverPlugin):
def stop(self):
LOG.info(_LI('Stopped %s backend'), self.get_canonical_name())
# Config Methods
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(
name=cls.get_canonical_name(),
title='Backend options for %s Backend' % cls.get_plugin_name()
)
opts = [
cfg.ListOpt('server_ids', default=[]),
cfg.ListOpt('masters', default=['127.0.0.1:5354'],
help='Comma-separated list of master DNS servers, in '
'<ip-address>:<port> format. If <port> is '
'omitted, the default 5354 is used. These are '
'mdns servers.'),
]
opts.extend(cls._get_common_cfg_opts())
opts.extend(cls._get_global_cfg_opts())
return [(group, opts)]
@classmethod
def get_extra_cfg_opts(cls):
# Common options for all backends
opts = [
cfg.ListOpt('masters'),
cfg.StrOpt('host', default='127.0.0.1', help='Server Host'),
cfg.IntOpt('port', default=53, help='Server Port'),
cfg.StrOpt('tsig-key', help='Server TSIG Key'),
]
# Backend specific common options
common_cfg_opts = copy.deepcopy(cls._get_common_cfg_opts())
# Ensure the default value for all common options is reset to None
for opt in common_cfg_opts:
opt.default = None
opts.extend(common_cfg_opts)
# Add any server only config options
opts.extend(cls._get_server_cfg_opts())
result = []
global_group = cls.get_canonical_name()
for server_id in cfg.CONF[global_group].server_ids:
group = cfg.OptGroup(name='%s:%s' % (global_group, server_id))
result.append((group, opts))
return result
@classmethod
def _get_common_cfg_opts(cls):
return []
@classmethod
def _get_global_cfg_opts(cls):
return []
@classmethod
def _get_server_cfg_opts(cls):
return []
def get_backend_option(self, key):
"""
Get the backend option value using the backend option key.
"""
for backend_option in self.backend_options:
if backend_option['key'] == key:
return backend_option['value']
# Pool Mgr Object Creation
@classmethod
def _create_server_object(cls, backend, server_id, backend_options,
server_section_name):
"""
Create the server object.
"""
server_values = {
'id': server_id,
'host': cfg.CONF[server_section_name].host,
'port': cfg.CONF[server_section_name].port,
'backend': backend,
'backend_options': backend_options,
'tsig_key': cfg.CONF[server_section_name].tsig_key
}
return objects.PoolServer(**server_values)
@classmethod
def _create_backend_option_objects(cls, global_section_name,
server_section_name):
"""
Create the backend_option object list.
"""
backend_options = []
for key in cfg.CONF[global_section_name].keys():
backend_option = cls._create_backend_option_object(
key, global_section_name, server_section_name)
backend_options.append(backend_option)
return backend_options
@classmethod
def _create_backend_option_object(cls, key, global_section_name,
server_section_name):
"""
Create the backend_option object. If a server specific backend option
value exists, use it. Otherwise use the global backend option value.
"""
value = None
try:
value = cfg.CONF[server_section_name].get(key)
except cfg.NoSuchOptError:
pass
if value is None:
value = cfg.CONF[global_section_name].get(key)
backend_option_values = {
'key': key,
'value': value
}
return objects.BackendOption(**backend_option_values)
@classmethod
def get_server_object(cls, backend, server_id):
"""
Get the server object from the backend driver for the server_id.
"""
global_section_name = 'backend:%s' % (backend,)
server_section_name = 'backend:%s:%s' % (backend, server_id)
backend_options = cls._create_backend_option_objects(
global_section_name, server_section_name)
return cls._create_server_object(
backend, server_id, backend_options, server_section_name)
# Core Backend Interface
@abc.abstractmethod
def create_domain(self, context, domain):
@ -196,7 +63,12 @@ class Backend(DriverPlugin):
"""
def update_domain(self, context, domain):
pass
"""
Update a DNS domain.
:param context: Security context information.
:param domain: the DNS domain.
"""
@abc.abstractmethod
def delete_domain(self, context, domain):

View File

@ -13,9 +13,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
from oslo.config import cfg
from oslo_log import log as logging
from designate import exceptions
@ -30,55 +27,21 @@ DEFAULT_MASTER_PORT = 5354
class Bind9Backend(base.Backend):
__plugin_name__ = 'bind9'
@classmethod
def _get_common_cfg_opts(cls):
return [
cfg.StrOpt('rndc-host', default='127.0.0.1', help='RNDC Host'),
cfg.IntOpt('rndc-port', default=953, help='RNDC Port'),
cfg.StrOpt('rndc-config-file', default=None,
help='RNDC Config File'),
cfg.StrOpt('rndc-key-file', default=None, help='RNDC Key File'),
]
def __init__(self, target):
super(Bind9Backend, self).__init__(target)
def __init__(self, backend_options):
super(Bind9Backend, self).__init__(backend_options)
self.masters = [self._parse_master(master)
for master in self.get_backend_option('masters')]
self.rndc_host = self.get_backend_option('rndc_host')
self.rndc_port = self.get_backend_option('rndc_port')
self.rndc_config_file = self.get_backend_option('rndc_config_file')
self.rndc_key_file = self.get_backend_option('rndc_key_file')
@staticmethod
def _parse_master(master):
try:
(ip_address, port) = utils.split_host_port(master)
except ValueError:
ip_address = str(master)
port = DEFAULT_MASTER_PORT
try:
port = int(port)
except ValueError:
raise exceptions.ConfigurationError(
'Invalid port "%s" in masters option.' % port)
if port < 0 or port > 65535:
raise exceptions.ConfigurationError(
'Port "%s" is not between 0 and 65535 in masters option.' %
port)
try:
socket.inet_pton(socket.AF_INET, ip_address)
except socket.error:
raise exceptions.ConfigurationError(
'Invalid IP address "%s" in masters option.' % ip_address)
return {'ip-address': ip_address, 'port': port}
self.rndc_host = self.options.get('rndc_host', '127.0.0.1')
self.rndc_port = int(self.options.get('rndc_port', 953))
self.rndc_config_file = self.options.get('rndc_config_file')
self.rndc_key_file = self.options.get('rndc_key_file')
def create_domain(self, context, domain):
LOG.debug('Create Domain')
masters = []
for master in self.masters:
ip_address = master['ip-address']
host = master['host']
port = master['port']
masters.append('%s port %s' % (ip_address, port))
masters.append('%s port %s' % (host, port))
rndc_op = [
'addzone',
'%s { type slave; masters { %s;}; file "slave.%s%s"; };' %

View File

@ -16,6 +16,7 @@
import copy
import threading
from oslo_config import cfg
from oslo_db import options
from oslo_log import log as logging
from oslo_utils import excutils
@ -38,22 +39,26 @@ class PowerDNSBackend(base.Backend):
__plugin_name__ = 'powerdns'
@classmethod
def _get_common_cfg_opts(cls):
def get_cfg_opts(cls):
group = cfg.OptGroup('backend:powerdns')
opts = copy.deepcopy(options.database_opts)
# Overide the default DB connection path in order to avoid name
# conflicts between the Designate and PowerDNS databases.
for opt in opts:
if opt.name == 'connection':
opt.default = 'sqlite:///$state_path/powerdns.sqlite'
# Strip connection options
discard_opts = ('sqlite_db', 'connection', 'slave_connection')
opts = [opt for opt in opts if opt.name not in discard_opts]
return opts
return [(group, opts,)]
def __init__(self, backend_options):
super(PowerDNSBackend, self).__init__(backend_options)
def __init__(self, target):
super(PowerDNSBackend, self).__init__(target)
self.local_store = threading.local()
self.masters = [m for m in self.get_backend_option('masters')]
default_connection = 'sqlite:///%(state_path)s/powerdns.sqlite' % {
'state_path': cfg.CONF.state_path
}
self.connection = self.options.get('connection', default_connection)
@property
def session(self):
@ -61,10 +66,9 @@ class PowerDNSBackend(base.Backend):
# have it's own session stored correctly. Without this, each
# greenthread may end up using a single global session, which
# leads to bad things happening.
global LOCAL_STORE
if not hasattr(self.local_store, 'session'):
self.local_store.session = session.get_session(self.name)
self.local_store.session = session.get_session(
self.name, self.connection, self.target.id)
return self.local_store.session
@ -114,10 +118,14 @@ class PowerDNSBackend(base.Backend):
try:
self.session.begin()
def _parse_master(master):
return '%s:%d' % (master.host, master.port)
masters = map(_parse_master, self.masters)
domain_values = {
'designate_id': domain['id'],
'name': domain['name'].rstrip('.'),
'master': ','.join(self.masters),
'master': ','.join(masters),
'type': 'SLAVE',
'account': context.tenant
}

View File

@ -0,0 +1,41 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_log import log as logging
from sqlalchemy import MetaData, Table
from designate.i18n import _LW
from designate.i18n import _LE
LOG = logging.getLogger(__name__)
meta = MetaData()
def upgrade(migrate_engine):
meta.bind = migrate_engine
LOG.warn(_LW('It will not be possible to downgrade from schema #11'))
records_table = Table('records', meta, autoload=True)
records_table.c.designate_id.drop()
records_table.c.designate_recordset_id.drop()
def downgrade(migrate_engine):
LOG.error(_LE('It is not possible to downgrade from schema #11'))
sys.exit(1)

View File

@ -83,9 +83,9 @@ class NoServersConfigured(ConfigurationError):
error_type = 'no_servers_configured'
class NoPoolServersConfigured(ConfigurationError):
class NoPoolTargetsConfigured(ConfigurationError):
error_code = 500
error_type = 'no_pool_servers_configured'
error_type = 'no_pool_targets_configured'
class OverQuota(Base):

View File

@ -1,6 +1,6 @@
# Copyright 2014 eBay Inc.
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,20 +13,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
import pprint
from oslo.config import cfg
from oslo_log import log as logging
from designate.manage import base
from designate import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# TODO(Ron): replace the Server object with this object.
class PoolServer(base.DictObjectMixin, base.DesignateObject):
FIELDS = {
'id': {},
'host': {},
'port': {},
'backend': {},
'backend_options': {},
'tsig_key': {}
}
class PoolServerList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolServer
class PoolCommands(base.Commands):
def show_config(self):
print('*' * 100)
print('Pool Configuration:')
print('*' * 100)
pprint.pprint(objects.Pool.from_config(CONF).to_dict())

View File

@ -32,26 +32,33 @@ CONF = cfg.CONF
utils.register_plugin_opts()
def get_manager():
def get_manager(pool_target_id):
pool_target_options = CONF['pool_target:%s' % pool_target_id].options
connection = pool_target_options['connection']
migration_config = {
'migration_repo_path': REPOSITORY,
'db_url': CONF['backend:powerdns'].connection}
'db_url': connection}
return migration_manager.MigrationManager(migration_config)
class DatabaseCommands(base.Commands):
def version(self):
current = get_manager().version()
@base.args('pool-target-id', help="Pool Target to Migrate", type=str)
def version(self, pool_target_id):
current = get_manager(pool_target_id).version()
latest = versioning_api.version(repository=REPOSITORY).value
print("Current: %s Latest: %s" % (current, latest))
def sync(self):
get_manager().upgrade(None)
@base.args('pool-target-id', help="Pool Target to Migrate", type=str)
def sync(self, pool_target_id):
get_manager(pool_target_id).upgrade(None)
@base.args('pool-target-id', help="Pool Target to Migrate", type=str)
@base.args('revision', nargs='?')
def upgrade(self, revision):
get_manager().upgrade(revision)
def upgrade(self, pool_target_id, revision):
get_manager(pool_target_id).upgrade(revision)
@base.args('pool-target-id', help="Pool Target to Migrate", type=str)
@base.args('revision', nargs='?')
def downgrade(self, revision):
get_manager().downgrade(revision)
def downgrade(self, pool_target_id, revision):
get_manager(pool_target_id).downgrade(revision)

View File

@ -40,13 +40,13 @@ class NotifyEndpoint(base.BaseEndpoint):
RPC_API_VERSION = '1.1'
RPC_API_NAMESPACE = 'notify'
def notify_zone_changed(self, context, domain, server, timeout,
def notify_zone_changed(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
"""
:param context: The user context.
:param domain: The designate domain object. This contains the domain
name.
:param server: A notify is sent to server.host:server.port.
:param nameserver: A notify is sent to nameserver.host:nameserver.port.
:param timeout: The time (in seconds) to wait for a NOTIFY response
from server.
:param retry_interval: The time (in seconds) between retries.
@ -60,18 +60,19 @@ class NotifyEndpoint(base.BaseEndpoint):
"""
time.sleep(delay)
return self._make_and_send_dns_message(
domain, server, timeout, retry_interval, max_retries, notify=True)
domain, nameserver, timeout, retry_interval, max_retries,
notify=True)
def poll_for_serial_number(self, context, domain, server, timeout,
def poll_for_serial_number(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
"""
:param context: The user context.
:param domain: The designate domain object. This contains the domain
name. domain.serial = expected_serial
:param server: server.host:server.port is checked for an updated serial
number.
:param nameserver: nameserver.host:nameserver.port is checked for an
updated serial number.
:param timeout: The time (in seconds) to wait for a SOA response from
server.
nameserver.
:param retry_interval: The time (in seconds) between retries.
:param max_retries: The maximum number of retries mindns would do for
an expected serial number. After this many retries, mindns returns
@ -80,21 +81,21 @@ class NotifyEndpoint(base.BaseEndpoint):
:return: The pool manager is informed of the status with update_status.
"""
(status, actual_serial, retries) = self.get_serial_number(
context, domain, server, timeout, retry_interval, max_retries,
context, domain, nameserver, timeout, retry_interval, max_retries,
delay)
self.pool_manager_api.update_status(
context, domain, server, status, actual_serial)
context, domain, nameserver, status, actual_serial)
def get_serial_number(self, context, domain, server, timeout,
def get_serial_number(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
"""
:param context: The user context.
:param domain: The designate domain object. This contains the domain
name. domain.serial = expected_serial
:param server: server.host:server.port is checked for an updated serial
number.
:param nameserver: nameserver.host:nameserver.port is checked for an
updated serial number.
:param timeout: The time (in seconds) to wait for a SOA response from
server.
nameserver.
:param retry_interval: The time (in seconds) between retries.
:param max_retries: The maximum number of retries mindns would do for
an expected serial number. After this many retries, mindns returns
@ -103,7 +104,7 @@ class NotifyEndpoint(base.BaseEndpoint):
:return: a tuple of (status, actual_serial, retries)
status is either "SUCCESS" or "ERROR".
actual_serial is either the serial number returned in the SOA
message from the server or None.
message from the nameserver or None.
retries is the number of retries left.
The return value is just used for testing and not by pool manager.
The pool manager is informed of the status with update_status.
@ -114,7 +115,7 @@ class NotifyEndpoint(base.BaseEndpoint):
time.sleep(delay)
while (True):
(response, retry) = self._make_and_send_dns_message(
domain, server, timeout, retry_interval, retries)
domain, nameserver, timeout, retry_interval, retries)
if response and response.rcode() in (
dns.rcode.NXDOMAIN, dns.rcode.REFUSED, dns.rcode.SERVFAIL):
status = 'NO_DOMAIN'
@ -132,8 +133,8 @@ class NotifyEndpoint(base.BaseEndpoint):
LOG.warn(_LW("Got lower serial for '%(zone)s' to '%(host)s:"
"%(port)s'. Expected:'%(es)d'. Got:'%(as)s'."
"Retries left='%(retries)d'") %
{'zone': domain.name, 'host': server.host,
'port': server.port, 'es': domain.serial,
{'zone': domain.name, 'host': nameserver.host,
'port': nameserver.port, 'es': domain.serial,
'as': actual_serial, 'retries': retries})
if retries > 0:
# retry again
@ -149,7 +150,7 @@ class NotifyEndpoint(base.BaseEndpoint):
# Return retries for testing purposes.
return (status, actual_serial, retries)
def _make_and_send_dns_message(self, domain, server, timeout,
def _make_and_send_dns_message(self, domain, nameserver, timeout,
retry_interval, max_retries, notify=False):
"""
:param domain: The designate domain object. This contains the domain
@ -167,8 +168,8 @@ class NotifyEndpoint(base.BaseEndpoint):
response is the response on success or None on failure.
current_retry is the current retry number
"""
dest_ip = server.host
dest_port = server.port
dest_ip = nameserver.host
dest_port = nameserver.port
dns_message = self._make_dns_message(domain.name, notify=notify)

View File

@ -68,44 +68,49 @@ class MdnsAPI(object):
MDNS_API = cls()
return MDNS_API
def notify_zone_changed(self, context, domain, server, timeout,
def notify_zone_changed(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
LOG.info(_LI("notify_zone_changed: Calling mdns for zone '%(zone)s', "
"serial '%(serial)s' to server '%(host)s:%(port)s'") %
"serial '%(serial)s' to nameserver '%(host)s:%(port)s'") %
{'zone': domain.name, 'serial': domain.serial,
'host': server.host, 'port': server.port})
'host': nameserver.host, 'port': nameserver.port})
# The notify_zone_changed method is a cast rather than a call since the
# caller need not wait for the notify to complete.
return self.notify_client.cast(
context, 'notify_zone_changed', domain=domain,
server=server, timeout=timeout, retry_interval=retry_interval,
max_retries=max_retries, delay=delay)
nameserver=nameserver, timeout=timeout,
retry_interval=retry_interval, max_retries=max_retries,
delay=delay)
def poll_for_serial_number(self, context, domain, server, timeout,
def poll_for_serial_number(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
LOG.info(_LI("poll_for_serial_number: Calling mdns for zone '%(zone)s'"
", serial '%(serial)s' to server '%(host)s:%(port)s'") %
LOG.info(
_LI("poll_for_serial_number: Calling mdns for zone '%(zone)s', "
"serial '%(serial)s' on nameserver '%(host)s:%(port)s'") %
{'zone': domain.name, 'serial': domain.serial,
'host': server.host, 'port': server.port})
'host': nameserver.host, 'port': nameserver.port})
# The poll_for_serial_number method is a cast rather than a call since
# the caller need not wait for the poll to complete. Mdns informs pool
# manager of the return value using update_status
return self.notify_client.cast(
context, 'poll_for_serial_number', domain=domain,
server=server, timeout=timeout, retry_interval=retry_interval,
max_retries=max_retries, delay=delay)
nameserver=nameserver, timeout=timeout,
retry_interval=retry_interval, max_retries=max_retries,
delay=delay)
def get_serial_number(self, context, domain, server, timeout,
def get_serial_number(self, context, domain, nameserver, timeout,
retry_interval, max_retries, delay):
LOG.info(_LI("get_serial_number: Calling mdns for zone '%(zone)s'"
", serial '%(serial)s' to server '%(host)s:%(port)s'") %
LOG.info(
_LI("get_serial_number: Calling mdns for zone '%(zone)s', serial "
"%(serial)s' on nameserver '%(host)s:%(port)s'") %
{'zone': domain.name, 'serial': domain.serial,
'host': server.host, 'port': server.port})
'host': nameserver.host, 'port': nameserver.port})
cctxt = self.notify_client.prepare(version='1.1')
return cctxt.call(
context, 'get_serial_number', domain=domain,
server=server, timeout=timeout, retry_interval=retry_interval,
max_retries=max_retries, delay=delay)
nameserver=nameserver, timeout=timeout,
retry_interval=retry_interval, max_retries=max_retries,
delay=delay)
def perform_zone_xfr(self, context, domain):
LOG.info(_LI("perform_zone_xfr: Calling mdns for zone %(zone)s") %

View File

@ -18,16 +18,18 @@ from designate.objects.base import DesignateObject # noqa
from designate.objects.base import DictObjectMixin # noqa
from designate.objects.base import ListObjectMixin # noqa
from designate.objects.base import PagedListObjectMixin # noqa
from designate.objects.backend_option import BackendOption, BackendOptionList # noqa
from designate.objects.blacklist import Blacklist, BlacklistList # noqa
from designate.objects.domain import Domain, DomainList # noqa
from designate.objects.domain_attribute import DomainAttribute, DomainAttributeList # noqa
from designate.objects.floating_ip import FloatingIP, FloatingIPList # noqa
from designate.objects.pool_manager_status import PoolManagerStatus, PoolManagerStatusList # noqa
from designate.objects.pool_server import PoolServer, PoolServerList # noqa
from designate.objects.pool import Pool, PoolList # noqa
from designate.objects.pool_attribute import PoolAttribute, PoolAttributeList # noqa
from designate.objects.pool_ns_record import PoolNsRecord, PoolNsRecordList # noqa
from designate.objects.pool_nameserver import PoolNameserver, PoolNameserverList # noqa
from designate.objects.pool_target import PoolTarget, PoolTargetList # noqa
from designate.objects.pool_target_master import PoolTargetMaster, PoolTargetMasterList # noqa
from designate.objects.pool_target_option import PoolTargetOption, PoolTargetOptionList # noqa
from designate.objects.quota import Quota, QuotaList # noqa
from designate.objects.rrdata_a import RRData_A # noqa
from designate.objects.rrdata_aaaa import RRData_AAAA # noqa

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate import utils
from designate.objects import base
@ -59,8 +60,73 @@ class Pool(base.DictObjectMixin, base.PersistentObjectMixin,
'relation_cls': 'PoolNsRecordList',
'required': True
},
'nameservers': {
'relation': True,
'relation_cls': 'PoolNameserverList'
},
'targets': {
'relation': True,
'relation_cls': 'PoolTargetList'
},
}
@classmethod
def from_config(cls, CONF):
pool_id = CONF['service:pool_manager'].pool_id
pool_target_ids = CONF['pool:%s' % pool_id].targets
pool_nameserver_ids = CONF['pool:%s' % pool_id].nameservers
# Build Base Pool
pool = {
'id': pool_id,
'description': 'Pool built from configuration on %s' % CONF.host,
'targets': [],
'nameservers': [],
}
# Build Pool Targets
for pool_target_id in pool_target_ids:
pool_target_group = 'pool_target:%s' % pool_target_id
pool_target = {
'id': pool_target_id,
'type': CONF[pool_target_group].type,
'masters': [],
'options': [],
}
# Build Pool Target Masters
for pool_target_master in CONF[pool_target_group].masters:
host, port = utils.split_host_port(pool_target_master)
pool_target['masters'].append({
'host': host,
'port': port,
})
# Build Pool Target Options
for k, v in CONF[pool_target_group].options.items():
pool_target['options'].append({
'key': k,
'value': v,
})
pool['targets'].append(pool_target)
# Build Pool Nameservers
for pool_nameserver_id in pool_nameserver_ids:
pool_nameserver_group = 'pool_nameserver:%s' % pool_nameserver_id
pool_nameserver = {
'id': pool_nameserver_id,
'host': CONF[pool_nameserver_group].host,
'port': CONF[pool_nameserver_group].port,
}
pool['nameservers'].append(pool_nameserver)
return cls.from_dict(pool)
class PoolList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = Pool

View File

@ -19,7 +19,7 @@ from designate.objects import base
class PoolManagerStatus(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'server_id': {
'nameserver_id': {
'schema': {
'type': 'string',
'format': 'uuid',

View File

@ -0,0 +1,28 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolNameserver(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_id': {},
'host': {},
'port': {},
}
class PoolNameserverList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolNameserver

View File

@ -0,0 +1,38 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolTarget(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_id': {},
'type': {},
'tsigkey_id': {},
'description': {},
'masters': {
'relation': True,
'relation_cls': 'PoolTargetMasterList'
},
'options': {
'relation': True,
'relation_cls': 'PoolTargetOptionList'
},
}
class PoolTargetList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolTarget

View File

@ -0,0 +1,28 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolTargetMaster(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_target_id': {},
'host': {},
'port': {}
}
class PoolTargetMasterList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolTargetMaster

View File

@ -0,0 +1,35 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolTargetOption(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_target_id': {},
'key': {},
'value': {},
}
class PoolTargetOptionList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolTargetOption
def get(self, key, default=None):
for obj in self.objects:
if obj.key == key:
return obj.value
return default

View File

@ -15,13 +15,13 @@
# under the License.
from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup(
CONF = cfg.CONF
CONF.register_group(cfg.OptGroup(
name='service:pool_manager', title="Configuration for Pool Manager Service"
))
OPTS = [
cfg.ListOpt('backends', default=[],
help='List of enabled backend drivers'),
cfg.IntOpt('workers', default=None,
help='Number of Pool Manager worker processes to spawn'),
cfg.StrOpt('pool-id', default='794ccc2c-d751-44fe-b57f-8894c9f5c842',
@ -56,4 +56,56 @@ OPTS = [
help='The cache driver to use'),
]
cfg.CONF.register_opts(OPTS, group='service:pool_manager')
CONF.register_opts(OPTS, group='service:pool_manager')
def register_dynamic_pool_options():
# Pool Options Registration Pass One
# Find the Current Pool ID
pool_id = CONF['service:pool_manager'].pool_id
# Build the [pool:<id>] config section
pool_group = cfg.OptGroup('pool:%s' % pool_id)
pool_opts = [
cfg.ListOpt('targets', default=[]),
cfg.ListOpt('nameservers', default=[]),
]
CONF.register_group(pool_group)
CONF.register_opts(pool_opts, group=pool_group)
# Pool Options Registration Pass Two
# Find the Current Pools Target ID's
pool_target_ids = CONF['pool:%s' % pool_id].targets
# Build the [pool_target:<id>] config sections
pool_target_opts = [
cfg.StrOpt('type'),
cfg.ListOpt('masters', default=[]),
cfg.DictOpt('options', default={}),
]
for pool_target_id in pool_target_ids:
pool_target_group = cfg.OptGroup('pool_target:%s' % pool_target_id)
CONF.register_group(pool_target_group)
CONF.register_opts(pool_target_opts, group=pool_target_group)
# Find the Current Pools Nameserver ID's
pool_nameserver_ids = CONF['pool:%s' % pool_id].nameservers
# Build the [pool_nameserver:<id>] config sections
pool_nameserver_opts = [
cfg.StrOpt('host'),
cfg.IntOpt('port'),
]
for pool_nameserver_id in pool_nameserver_ids:
pool_nameserver_group = cfg.OptGroup(
'pool_nameserver:%s' % pool_nameserver_id)
CONF.register_group(pool_nameserver_group)
CONF.register_opts(pool_nameserver_opts, group=pool_nameserver_group)

View File

@ -49,13 +49,14 @@ class PoolManagerCache(DriverPlugin):
"""
@abc.abstractmethod
def retrieve(self, context, server_id, domain_id, action):
def retrieve(self, context, nameserver_id, domain_id, action):
"""
Retrieve the pool manager status object.
:param context: Security context information
:param server_id: the server ID of the pool manager status object
:param nameserver_id: the nameserver ID of the pool manager status
object
:param domain_id: the domain ID of the pool manger status object
:param action: the action of the pool manager status object
:return: the pool manager status object

View File

@ -74,9 +74,9 @@ class MemcachePoolManagerCache(cache_base.PoolManagerCache):
serial_number_key, pool_manager_status.serial_number,
self.expiration)
def retrieve(self, context, server_id, domain_id, action):
def retrieve(self, context, nameserver_id, domain_id, action):
values = {
'server_id': server_id,
'nameserver_id': nameserver_id,
'domain_id': domain_id,
'action': action,
}
@ -102,8 +102,8 @@ class MemcachePoolManagerCache(cache_base.PoolManagerCache):
@staticmethod
def _status_key(pool_manager_status, tail):
key = '{server}-{domain}-{action}-{tail}'.format(
server=pool_manager_status.server_id,
key = '{nameserver}-{domain}-{action}-{tail}'.format(
nameserver=pool_manager_status.nameserver_id,
domain=pool_manager_status.domain_id,
action=pool_manager_status.action,
tail=tail

View File

@ -32,5 +32,5 @@ class NoopPoolManagerCache(cache_base.PoolManagerCache):
def store(self, context, pool_manager_status):
pass
def retrieve(self, context, server_id, domain_id, action):
def retrieve(self, context, nameserver_id, domain_id, action):
raise exceptions.PoolManagerStatusNotFound

View File

@ -49,7 +49,7 @@ class SQLAlchemyPoolManagerCache(sqlalchemy_base.SQLAlchemy,
# If there is no id retrieve the relevant pool manager status
if not pool_manager_status.id:
pool_manager_status = self.retrieve(
context, pool_manager_status.server_id,
context, pool_manager_status.nameserver_id,
pool_manager_status.domain_id, pool_manager_status.action)
self._delete(
context, tables.pool_manager_statuses, pool_manager_status,
@ -66,9 +66,9 @@ class SQLAlchemyPoolManagerCache(sqlalchemy_base.SQLAlchemy,
tables.pool_manager_statuses, pool_manager_status,
exceptions.DuplicatePoolManagerStatus)
def retrieve(self, context, server_id, domain_id, action):
def retrieve(self, context, nameserver_id, domain_id, action):
criterion = {
'server_id': server_id,
'nameserver_id': nameserver_id,
'domain_id': domain_id,
'action': action
}

View File

@ -13,15 +13,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
from sqlalchemy.schema import Table, MetaData
class BackendOption(base.DictObjectMixin, base.DesignateObject):
FIELDS = {
'key': {},
'value': {}
}
meta = MetaData()
class BackendOptionList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = BackendOption
def upgrade(migrate_engine):
meta.bind = migrate_engine
pms_table = Table('pool_manager_statuses', meta, autoload=True)
pms_table.c.server_id.alter(name='nameserver_id')
def downgrade(migrate_engine):
meta.bind = migrate_engine
pms_table = Table('pool_manager_statuses', meta, autoload=True)
pms_table.c.nameserver_id.alter(name='server_id')

View File

@ -32,7 +32,7 @@ pool_manager_statuses = Table(
Column('version', Integer(), default=1, nullable=False),
Column('created_at', DateTime, default=lambda: timeutils.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()),
Column('server_id', UUID, nullable=False),
Column('nameserver_id', UUID, nullable=False),
Column('domain_id', UUID, nullable=False),
Column('action', Enum(name='update_actions', *UPDATE_ACTIONS),
nullable=False),
@ -41,7 +41,7 @@ pool_manager_statuses = Table(
Column('serial_number', Integer, nullable=False),
UniqueConstraint('server_id', 'domain_id', 'action',
UniqueConstraint('nameserver_id', 'domain_id', 'action',
name='unique_pool_manager_status'),
ForeignKeyConstraint(['domain_id'], ['domains.id']),

View File

@ -92,17 +92,18 @@ class PoolManagerAPI(object):
return cctxt.cast(
context, 'update_domain', domain=domain)
def update_status(self, context, domain, server, status, actual_serial):
def update_status(self, context, domain, nameserver, status,
actual_serial):
LOG.info(_LI("update_status: Calling pool manager for %(domain)s : "
"%(action)s : %(status)s : %(serial)s on server "
"%(action)s : %(status)s : %(serial)s on nameserver "
"'%(host)s:%(port)s'") %
{'domain': domain.name, 'action': domain.action,
'status': status, 'serial': actual_serial,
'host': server.host, 'port': server.port})
'host': nameserver.host, 'port': nameserver.port})
# Modifying the topic so it is pool manager instance specific.
topic = '%s.%s' % (self.topic, domain.pool_id)
cctxt = self.client.prepare(topic=topic)
return cctxt.cast(
context, 'update_status', domain=domain, server=server,
context, 'update_status', domain=domain, nameserver=nameserver,
status=status, actual_serial=actual_serial)

View File

@ -36,6 +36,7 @@ from designate.pool_manager import cache
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SUCCESS_STATUS = 'SUCCESS'
ERROR_STATUS = 'ERROR'
@ -74,47 +75,36 @@ class Service(service.RPCService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
# Build the Pool (and related) Object from Config
self.pool = objects.Pool.from_config(CONF)
# Get a pool manager cache connection.
cache_driver = cfg.CONF['service:pool_manager'].cache_driver
self.cache = cache.get_pool_manager_cache(cache_driver)
self.cache = cache.get_pool_manager_cache(
CONF['service:pool_manager'].cache_driver)
self.threshold = cfg.CONF['service:pool_manager'].threshold_percentage
self.timeout = cfg.CONF['service:pool_manager'].poll_timeout
self.retry_interval = \
cfg.CONF['service:pool_manager'].poll_retry_interval
self.max_retries = cfg.CONF['service:pool_manager'].poll_max_retries
self.delay = cfg.CONF['service:pool_manager'].poll_delay
# Store some settings for quick access later
self.threshold = CONF['service:pool_manager'].threshold_percentage
self.timeout = CONF['service:pool_manager'].poll_timeout
self.retry_interval = CONF['service:pool_manager'].poll_retry_interval
self.max_retries = CONF['service:pool_manager'].poll_max_retries
self.delay = CONF['service:pool_manager'].poll_delay
self.server_backends = []
# Create the necessary Backend instances for each target
self._setup_target_backends()
sections = []
for backend_name in cfg.CONF['service:pool_manager'].backends:
server_ids = cfg.CONF['backend:%s' % backend_name].server_ids
def _setup_target_backends(self):
self.target_backends = {}
for server_id in server_ids:
sections.append({"backend": backend_name,
"server_id": server_id})
for target in self.pool.targets:
# Fetch an instance of the Backend class, passing in the options
# and masters
self.target_backends[target.id] = backend.get_backend(
target.type, target)
for section in sections:
backend_driver = section['backend']
server_id = section['server_id']
server = backend.get_server_object(backend_driver, server_id)
LOG.info(_LI('%d targets setup'), len(self.pool.targets))
backend_instance = backend.get_backend(
backend_driver, server.backend_options)
server_backend = {
'server': server,
'backend_instance': backend_instance
}
self.server_backends.append(server_backend)
if not self.server_backends:
raise exceptions.NoPoolServersConfigured()
self.enable_recovery_timer = \
cfg.CONF['service:pool_manager'].enable_recovery_timer
self.enable_sync_timer = \
cfg.CONF['service:pool_manager'].enable_sync_timer
if not self.target_backends:
raise exceptions.NoPoolTargetsConfigured()
@property
def service_name(self):
@ -125,37 +115,37 @@ class Service(service.RPCService, service.Service):
# Modify the default topic so it's pool manager instance specific.
topic = super(Service, self)._rpc_topic
topic = '%s.%s' % (topic, cfg.CONF['service:pool_manager'].pool_id)
topic = '%s.%s' % (topic, CONF['service:pool_manager'].pool_id)
LOG.info(_LI('Using topic %(topic)s for this pool manager instance.')
% {'topic': topic})
return topic
def start(self):
for server_backend in self.server_backends:
backend_instance = server_backend['backend_instance']
backend_instance.start()
for target in self.pool.targets:
self.target_backends[target.id].start()
super(Service, self).start()
if self.enable_recovery_timer:
LOG.info(_LI('Starting periodic recovery timer.'))
if CONF['service:pool_manager'].enable_recovery_timer:
LOG.info(_LI('Starting periodic recovery timer'))
self.tg.add_timer(
cfg.CONF['service:pool_manager'].periodic_recovery_interval,
self.periodic_recovery)
CONF['service:pool_manager'].periodic_recovery_interval,
self.periodic_recovery,
CONF['service:pool_manager'].periodic_recovery_interval)
if self.enable_sync_timer:
LOG.info(_LI('Starting periodic sync timer.'))
if CONF['service:pool_manager'].enable_sync_timer:
LOG.info(_LI('Starting periodic synchronization timer'))
self.tg.add_timer(
cfg.CONF['service:pool_manager'].periodic_sync_interval,
self.periodic_sync)
CONF['service:pool_manager'].periodic_sync_interval,
self.periodic_sync,
CONF['service:pool_manager'].periodic_sync_interval)
def stop(self):
super(Service, self).stop()
for target in self.pool.targets:
self.target_backends[target.id].stop()
for server_backend in self.server_backends:
backend_instance = server_backend['backend_instance']
backend_instance.stop()
super(Service, self).stop()
@property
def central_api(self):
@ -165,76 +155,270 @@ class Service(service.RPCService, service.Service):
def mdns_api(self):
return mdns_api.MdnsAPI.get_instance()
# Periodioc Tasks
def periodic_recovery(self):
"""
:return:
"""
context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Recovery")
try:
# Handle Deletion Failures
domains = self._get_failed_domains(context, DELETE_ACTION)
for domain in domains:
self.delete_domain(context, domain)
# Handle Creation Failures
domains = self._get_failed_domains(context, CREATE_ACTION)
for domain in domains:
self.create_domain(context, domain)
# Handle Update Failures
domains = self._get_failed_domains(context, UPDATE_ACTION)
for domain in domains:
self.update_domain(context, domain)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic recovery '
'occurred'))
def periodic_sync(self):
"""
:return: None
"""
context = DesignateContext.get_admin_context(all_tenants=True) # noqa
LOG.debug("Starting Periodic Synchronization")
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
}
periodic_sync_seconds = \
CONF['service:pool_manager'].periodic_sync_seconds
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
domains = self.central_api.find_domains(context, criterion)
try:
for domain in domains:
# TODO(kiall): If the domain was created within the last
# periodic_sync_seconds, attempt to recreate to
# fill in targets which may have failed.
self.update_domain(context, domain)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'synchronization occurred.'))
# Standard Create/Update/Delete Methods
def create_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:param domain: Domain to be created
:return: None
"""
LOG.debug("Calling create_domain for %s" % domain.name)
LOG.info(_LI("Creating new domain %s"), domain.name)
for server_backend in self.server_backends:
server = server_backend['server']
create_status = self._build_status_object(
server, domain, CREATE_ACTION)
self._create_domain_on_server(
context, create_status, domain, server_backend)
results = []
# Create the domain on each of the Pool Targets
for target in self.pool.targets:
results.append(
self._create_domain_on_target(context, target, domain))
if self._exceed_or_meet_threshold(results.count(True)):
LOG.debug('Consensus reached for creating domain %(domain)s '
'on pool targets' % {'domain': domain.name})
else:
LOG.warn(_LW('Consensus not reached for creating domain %(domain)s'
' on pool targets') % {'domain': domain.name})
# ERROR status is updated right away, but success is updated when we
# hear back from mdns
if self._is_consensus(context, domain, CREATE_ACTION, ERROR_STATUS):
LOG.warn(_LW('Consensus not reached '
'for creating domain %(domain)s') %
{'domain': domain.name})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, domain.serial)
def delete_domain(self, context, domain):
return
# Send a NOTIFY to each nameserver
for nameserver in self.pool.nameservers:
create_status = self._build_status_object(
nameserver, domain, CREATE_ACTION)
self.cache.store(context, create_status)
self._update_domain_on_nameserver(context, nameserver, domain)
def _create_domain_on_target(self, context, target, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:return: None
:param target: Target to create Domain on
:param domain: Domain to be created
:return: True/False
"""
LOG.debug("Calling delete_domain for %s" % domain.name)
LOG.debug("Creating domain %s on target %s", domain.name, target.id)
for server_backend in self.server_backends:
server = server_backend['server']
delete_status = self._build_status_object(
server, domain, DELETE_ACTION)
self._delete_domain_on_server(
context, delete_status, domain, server_backend)
backend = self.target_backends[target.id]
self._check_delete_status(context, domain)
try:
backend.create_domain(context, domain)
return True
except Exception:
LOG.exception(_LE("Failed to create domain %(domain)s on target "
"%(target)s"),
{'domain': domain.name, 'target': target.id})
return False
def update_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: The designate domain object.
:param domain: Domain to be updated
:return: None
"""
LOG.debug("Calling update_domain for %s" % domain.name)
LOG.info(_LI("Updating domain %s"), domain.name)
for server_backend in self.server_backends:
server = server_backend['server']
results = []
# Update the domain on each of the Pool Targets
for target in self.pool.targets:
results.append(
self._update_domain_on_target(context, target, domain))
if self._exceed_or_meet_threshold(results.count(True)):
LOG.debug('Consensus reached for updating domain %(domain)s '
'on pool targets' % {'domain': domain.name})
else:
LOG.warn(_LW('Consensus not reached for updating domain %(domain)s'
' on pool targets') % {'domain': domain.name})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, domain.serial)
return
# Send a NOTIFY to each nameserver
for nameserver in self.pool.nameservers:
# See if there is already another update in progress
try:
update_status = self.cache.retrieve(
context, server.id, domain.id, UPDATE_ACTION)
context, nameserver.id, domain.id, UPDATE_ACTION)
except exceptions.PoolManagerStatusNotFound:
update_status = self._build_status_object(
server, domain, UPDATE_ACTION)
nameserver, domain, UPDATE_ACTION)
self.cache.store(context, update_status)
self._update_domain_on_server(context, domain, server_backend)
self._update_domain_on_nameserver(context, nameserver, domain)
def update_status(self, context, domain, server, status, actual_serial):
def _update_domain_on_target(self, context, target, domain):
"""
:param context: Security context information.
:param target: Target to update Domain on
:param domain: Domain to be updated
:return: True/False
"""
LOG.debug("Updating domain %s on target %s", domain.name, target.id)
backend = self.target_backends[target.id]
try:
backend.update_domain(context, domain)
return True
except Exception:
LOG.exception(_LE("Failed to update domain %(domain)s on target "
"%(target)s"),
{'domain': domain.name, 'target': target.id})
return False
def _update_domain_on_nameserver(self, context, nameserver, domain):
LOG.info(_LI('Updating domain %(domain)s on nameserver %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(nameserver)})
self.mdns_api.notify_zone_changed(
context, domain, nameserver, self.timeout, self.retry_interval,
self.max_retries, 0)
self.mdns_api.poll_for_serial_number(
context, domain, nameserver, self.timeout, self.retry_interval,
self.max_retries, self.delay)
def delete_domain(self, context, domain):
"""
:param context: Security context information.
:param domain: Domain to be deleted
:return: None
"""
LOG.info(_LI("Deleting domain %s"), domain.name)
results = []
# Delete the domain on each of the Pool Targets
for target in self.pool.targets:
results.append(
self._delete_domain_on_target(context, target, domain))
# TODO(kiall): We should monitor that the Domain is actually deleted
# correctly on each of the nameservers, rather than
# assuming a sucessful delete-on-target is OK as we have
# in the past.
if self._exceed_or_meet_threshold(
results.count(True), MAXIMUM_THRESHOLD):
LOG.debug('Consensus reached for deleting domain %(domain)s '
'on pool targets' % {'domain': domain.name})
self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, domain.serial)
else:
LOG.warn(_LW('Consensus not reached for deleting domain %(domain)s'
' on pool targets') % {'domain': domain.name})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, domain.serial)
def _delete_domain_on_target(self, context, target, domain):
"""
:param context: Security context information.
:param target: Target to delete Domain from
:param domain: Domain to be deleted
:return: True/False
"""
LOG.debug("Deleting domain %s on target %s", domain.name, target.id)
backend = self.target_backends[target.id]
try:
backend.delete_domain(context, domain)
return True
except Exception:
LOG.exception(_LE("Failed to delete domain %(domain)s on target "
"%(target)s"),
{'domain': domain.name, 'target': target.id})
return False
def update_status(self, context, domain, nameserver, status,
actual_serial):
"""
update_status is called by mdns for creates and updates.
deletes are handled by the backend entirely and status is determined
at the time of delete itself.
:param context: Security context information.
:param domain: The designate domain object.
:param server: The server for which a status update is being sent.
:param nameserver: The nameserver for which a status update is being
sent.
:param status: The status, 'SUCCESS' or 'ERROR'.
:param actual_serial: The actual serial number received from the name
server for the domain.
@ -247,17 +431,17 @@ class Service(service.RPCService, service.Service):
with lockutils.lock('update-status-%s' % domain.id):
try:
current_status = self.cache.retrieve(
context, server.id, domain.id, action)
context, nameserver.id, domain.id, action)
except exceptions.PoolManagerStatusNotFound:
current_status = self._build_status_object(
server, domain, action)
nameserver, domain, action)
self.cache.store(context, current_status)
cache_serial = current_status.serial_number
LOG.debug('For domain %s : %s on server %s the cache serial is %s '
'and the actual serial is %s.' %
LOG.debug('For domain %s : %s on nameserver %s the cache serial '
'is %s and the actual serial is %s.' %
(domain.name, action,
self._get_destination(server),
self._get_destination(nameserver),
cache_serial, actual_serial))
if actual_serial and cache_serial <= actual_serial:
current_status.status = status
@ -295,200 +479,28 @@ class Service(service.RPCService, service.Service):
MAXIMUM_THRESHOLD):
self._clear_cache(context, domain, action)
def periodic_recovery(self):
"""
:return:
"""
LOG.debug("Calling periodic_recovery.")
context = DesignateContext.get_admin_context(all_tenants=True)
try:
self._periodic_delete_domains_that_failed(context)
self._periodic_create_domains_that_failed(context)
self._periodic_update_domains_that_failed(context)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic recovery '
'occurred. This should never happen!'))
def periodic_sync(self):
"""
:return: None
"""
LOG.debug("Calling periodic_sync.")
context = DesignateContext.get_admin_context(all_tenants=True)
criterion = {
'pool_id': cfg.CONF['service:pool_manager'].pool_id,
'status': '%s%s' % ('!', ERROR_STATUS)
}
periodic_sync_seconds = \
cfg.CONF['service:pool_manager'].periodic_sync_seconds
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
domains = self.central_api.find_domains(context, criterion)
try:
for domain in domains:
self.update_domain(context, domain)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic sync '
'occurred. This should never happen!'))
def _create_domain_on_server(self, context, create_status, domain,
server_backend):
server = server_backend['server']
backend_instance = server_backend['backend_instance']
try:
with wrap_backend_call():
backend_instance.create_domain(context, domain)
# The status will be updated when we hear back the serial number
# from minidns
self.cache.store(context, create_status)
LOG.info(_LI('Created domain %(domain)s on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
# PowerDNS needs to explicitly send a NOTIFY for the AXFR to
# happen whereas BIND9 does an AXFR implicitly after the domain
# is created. Sending a NOTIFY for all cases.
self._update_domain_on_server(context, domain, server_backend)
except exceptions.Backend:
create_status.status = ERROR_STATUS
self.cache.store(context, create_status)
LOG.warn(_LW('Failed to create domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
def _periodic_create_domains_that_failed(self, context):
domains = self._get_failed_domains(context, CREATE_ACTION)
for domain in domains:
create_statuses = self._retrieve_statuses(
context, domain, CREATE_ACTION)
for create_status in create_statuses:
server_backend = self._get_server_backend(
create_status.server_id)
self._create_domain_on_server(
context, create_status, domain, server_backend)
def _delete_domain_on_server(self, context, delete_status, domain,
server_backend):
server = server_backend['server']
backend_instance = server_backend['backend_instance']
try:
with wrap_backend_call():
backend_instance.delete_domain(context, domain)
delete_status.status = SUCCESS_STATUS
delete_status.serial_number = domain.serial
self.cache.store(context, delete_status)
LOG.info(_LI('Deleted domain %(domain)s from server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
except exceptions.Backend:
delete_status.status = ERROR_STATUS
self.cache.store(context, delete_status)
LOG.warn(_LW('Failed to delete domain %(domain)s '
'from server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
def _check_delete_status(self, context, domain):
if self._is_consensus(context, domain, DELETE_ACTION, SUCCESS_STATUS):
LOG.info(_LI('Consensus reached for deleting domain %(domain)s') %
{'domain': domain.name})
self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, domain.serial)
else:
LOG.warn(_LW('Consensus not reached for deleting domain '
'%(domain)s') % {'domain': domain.name})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, domain.serial)
if self._is_consensus(context, domain, DELETE_ACTION, SUCCESS_STATUS,
MAXIMUM_THRESHOLD):
# Clear all the entries from cache
self._clear_cache(context, domain)
def _periodic_delete_domains_that_failed(self, context):
domains = self._get_failed_domains(context, DELETE_ACTION)
for domain in domains:
delete_statuses = self._retrieve_statuses(
context, domain, DELETE_ACTION)
for delete_status in delete_statuses:
server_backend = self._get_server_backend(
delete_status.server_id)
self._delete_domain_on_server(
context, delete_status, domain, server_backend)
self._check_delete_status(context, domain)
def _update_domain_on_server(self, context, domain, server_backend):
server = server_backend['server']
self.mdns_api.notify_zone_changed(
context, domain, server, self.timeout, self.retry_interval,
self.max_retries, 0)
self.mdns_api.poll_for_serial_number(
context, domain, server, self.timeout, self.retry_interval,
self.max_retries, self.delay)
LOG.info(_LI('Updating domain %(domain)s on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
def _periodic_update_domains_that_failed(self, context):
domains = self._get_failed_domains(context, UPDATE_ACTION)
for domain in domains:
update_statuses = self._retrieve_statuses(
context, domain, UPDATE_ACTION)
for update_status in update_statuses:
server_backend = self._get_server_backend(
update_status.server_id)
self._update_domain_on_server(context, domain, server_backend)
# Utility Methods
def _get_failed_domains(self, context, action):
criterion = {
'pool_id': cfg.CONF['service:pool_manager'].pool_id,
'pool_id': CONF['service:pool_manager'].pool_id,
'action': action,
'status': 'ERROR'
}
return self.central_api.find_domains(context, criterion)
def _get_server_backend(self, server_id):
for server_backend in self.server_backends:
server = server_backend['server']
if server.id == server_id:
return server_backend
@staticmethod
def _get_destination(server):
return '%s:%s' % (server.host, server.port)
def _get_destination(nameserver):
return '%s:%s' % (nameserver.host, nameserver.port)
@staticmethod
def _percentage(count, total_count):
return (Decimal(count) / Decimal(total_count)) * Decimal(100)
def _exceed_or_meet_threshold(self, count, threshold):
def _exceed_or_meet_threshold(self, count, threshold=None):
threshold = threshold or self.threshold
return self._percentage(
count, len(self.server_backends)) >= Decimal(threshold)
count, len(self.pool.targets)) >= Decimal(threshold)
@staticmethod
def _get_sorted_serials(pool_manager_statuses, descending=False):
@ -542,12 +554,12 @@ class Service(service.RPCService, service.Service):
break
return error_serial
# When we hear back from the server, the serial_number is set to the value
# the server
# When we hear back from the nameserver, the serial_number is set to the
# value the nameserver
@staticmethod
def _build_status_object(server, domain, action):
def _build_status_object(nameserver, domain, action):
values = {
'server_id': server.id,
'nameserver_id': nameserver.id,
'domain_id': domain.id,
'status': None,
'serial_number': 0,
@ -557,17 +569,19 @@ class Service(service.RPCService, service.Service):
# Methods for manipulating the cache.
def _clear_cache(self, context, domain, action=None):
LOG.debug('Clearing cache for domain %s with action %s.' %
(domain.name, action))
pool_manager_statuses = []
if action:
actions = [action]
else:
actions = [CREATE_ACTION, UPDATE_ACTION, DELETE_ACTION]
for server_backend in self.server_backends:
server = server_backend['server']
for nameserver in self.pool.nameservers:
for action in actions:
pool_manager_status = self._build_status_object(
server, domain, action)
nameserver, domain, action)
pool_manager_statuses.append(pool_manager_status)
for pool_manager_status in pool_manager_statuses:
@ -576,23 +590,23 @@ class Service(service.RPCService, service.Service):
self.cache.clear(context, pool_manager_status)
except exceptions.PoolManagerStatusNotFound:
pass
LOG.debug('Cleared cache for domain %s with action %s.' %
(domain.name, action))
def _retrieve_from_mdns(self, context, server, domain, action):
def _retrieve_from_mdns(self, context, nameserver, domain, action):
try:
(status, actual_serial, retries) = \
self.mdns_api.get_serial_number(
context, domain, server, self.timeout, self.retry_interval,
self.max_retries, self.delay)
context, domain, nameserver, self.timeout,
self.retry_interval, self.max_retries, self.delay)
except messaging.MessagingException as msg_ex:
LOG.debug('Could not retrieve status and serial for domain %s on '
'server %s with action %s from the server. %s:%s' %
(domain.name, self._get_destination(server), action,
'nameserver %s with action %s (%s: %s)' %
(domain.name, self._get_destination(nameserver), action,
type(msg_ex), str(msg_ex)))
return None
pool_manager_status = self._build_status_object(server, domain, action)
pool_manager_status = self._build_status_object(
nameserver, domain, action)
if status == NO_DOMAIN_STATUS:
if action == CREATE_ACTION:
pool_manager_status.status = 'ERROR'
@ -606,34 +620,36 @@ class Service(service.RPCService, service.Service):
pool_manager_status.serial_number = actual_serial \
if actual_serial is not None else 0
LOG.debug('Retrieved status %s and serial %s for domain %s '
'on server %s with action %s from mdns.' %
'on nameserver %s with action %s from mdns.' %
(pool_manager_status.status,
pool_manager_status.serial_number,
domain.name, self._get_destination(server), action))
domain.name, self._get_destination(nameserver), action))
self.cache.store(context, pool_manager_status)
return pool_manager_status
def _retrieve_statuses(self, context, domain, action):
pool_manager_statuses = []
for server_backend in self.server_backends:
server = server_backend['server']
for nameserver in self.pool.nameservers:
try:
pool_manager_status = self.cache.retrieve(
context, server.id, domain.id, action)
context, nameserver.id, domain.id, action)
LOG.debug('Cache hit! Retrieved status %s and serial %s '
'for domain %s on server %s with action %s from '
'for domain %s on nameserver %s with action %s from '
'the cache.' %
(pool_manager_status.status,
pool_manager_status.serial_number,
domain.name, self._get_destination(server), action))
domain.name,
self._get_destination(nameserver), action))
except exceptions.PoolManagerStatusNotFound:
LOG.debug('Cache miss! Did not retrieve status and serial '
'for domain %s on server %s with action %s from '
'for domain %s on nameserver %s with action %s from '
'the cache. Getting it from the server.' %
(domain.name, self._get_destination(server), action))
(domain.name,
self._get_destination(nameserver),
action))
pool_manager_status = self._retrieve_from_mdns(
context, server, domain, action)
context, nameserver, domain, action)
if pool_manager_status is not None:
pool_manager_statuses.append(pool_manager_status)

View File

@ -29,20 +29,23 @@ CONF = cfg.CONF
_FACADES = {}
def _create_facade_lazily(name):
if name not in _FACADES:
_FACADES[name] = session.EngineFacade(
cfg.CONF[name].connection,
**dict(cfg.CONF[name].iteritems()))
def _create_facade_lazily(cfg_group, connection=None, discriminator=None):
connection = connection or cfg.CONF[cfg_group].connection
cache_name = "%s:%s" % (cfg_group, discriminator)
return _FACADES[name]
if cache_name not in _FACADES:
_FACADES[cache_name] = session.EngineFacade(
connection,
**dict(cfg.CONF[cfg_group].iteritems()))
return _FACADES[cache_name]
def get_engine(name):
facade = _create_facade_lazily(name)
def get_engine(cfg_group):
facade = _create_facade_lazily(cfg_group)
return facade.get_engine()
def get_session(name, **kwargs):
facade = _create_facade_lazily(name)
def get_session(cfg_group, connection=None, discriminator=None, **kwargs):
facade = _create_facade_lazily(cfg_group, connection, discriminator)
return facade.get_session(**kwargs)

View File

@ -28,22 +28,19 @@ class PowerDNSBackendTestCase(BackendTestCase):
def setUp(self):
super(PowerDNSBackendTestCase, self).setUp()
self.masters = [
'127.0.1.1:53',
'127.0.1.2:53',
]
self.domain = objects.Domain(id='e2bed4dc-9d01-11e4-89d3-123b93f75cba',
name='example.com.',
email='example@example.com')
backend_options = [
objects.BackendOption(key="host", value="127.0.0.1"),
objects.BackendOption(key="port", value=5353),
objects.BackendOption(key="masters", value=self.masters),
]
self.target = objects.PoolTarget.from_dict({
'id': '4588652b-50e7-46b9-b688-a9bad40a873e',
'type': 'powerdns',
'masters': [{'host': '192.0.2.1', 'port': 53},
{'host': '192.0.2.2', 'port': 35}],
'options': [{'key': 'connection', 'value': 'memory://'}],
})
self.backend = impl_powerdns.PowerDNSBackend(backend_options)
self.backend = impl_powerdns.PowerDNSBackend(self.target)
# Helper Methpds
def assertSessionTransactionCalls(self, session_mock, begin=0, commit=0,
@ -73,7 +70,7 @@ class PowerDNSBackendTestCase(BackendTestCase):
self.assertDictContainsSubset(
{'type': 'SLAVE',
'designate_id': self.domain.id,
'master': ','.join(self.masters),
'master': '192.0.2.1:53,192.0.2.2:35',
'name': self.domain.name.rstrip('.')},
session_mock.execute.call_args_list[0][0][1])

View File

@ -36,13 +36,11 @@ class MdnsNotifyTest(MdnsTestCase):
def setUp(self):
super(MdnsNotifyTest, self).setUp()
server_values = {
self.nameserver = objects.PoolNameserver.from_dict({
'id': 'f278782a-07dc-4502-9177-b5d85c5f7c7e',
'host': '127.0.0.1',
'port': 65255,
'backend': 'fake'
}
self.server = objects.PoolServer.from_dict(server_values)
'port': 65255
})
self.mock_tg = mock.Mock()
self.notify = notify.NotifyEndpoint(self.mock_tg)
@ -63,7 +61,7 @@ class MdnsNotifyTest(MdnsTestCase):
binascii.a2b_hex(expected_notify_response))):
response, retry = self.notify.notify_zone_changed(
context, objects.Domain.from_dict(self.test_domain),
self.server, 0, 0, 2, 0)
self.nameserver, 0, 0, 2, 0)
self.assertEqual(response, dns.message.from_wire(
binascii.a2b_hex(expected_notify_response)))
self.assertEqual(retry, 1)
@ -85,7 +83,7 @@ class MdnsNotifyTest(MdnsTestCase):
binascii.a2b_hex(non_auth_notify_response))):
response, retry = self.notify.notify_zone_changed(
context, objects.Domain.from_dict(self.test_domain),
self.server, 0, 0, 2, 0)
self.nameserver, 0, 0, 2, 0)
self.assertEqual(response, None)
self.assertEqual(retry, 1)
@ -93,8 +91,8 @@ class MdnsNotifyTest(MdnsTestCase):
def test_send_notify_message_timeout(self, _):
context = self.get_context()
response, retry = self.notify.notify_zone_changed(
context, objects.Domain.from_dict(self.test_domain), self.server,
0, 0, 2, 0)
context, objects.Domain.from_dict(self.test_domain),
self.nameserver, 0, 0, 2, 0)
self.assertEqual(response, None)
self.assertEqual(retry, 2)
@ -102,8 +100,8 @@ class MdnsNotifyTest(MdnsTestCase):
def test_send_notify_message_bad_response(self, _):
context = self.get_context()
response, retry = self.notify.notify_zone_changed(
context, objects.Domain.from_dict(self.test_domain), self.server,
0, 0, 2, 0)
context, objects.Domain.from_dict(self.test_domain),
self.nameserver, 0, 0, 2, 0)
self.assertEqual(response, None)
self.assertEqual(retry, 1)
@ -128,7 +126,7 @@ class MdnsNotifyTest(MdnsTestCase):
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Domain.from_dict(self.test_domain),
self.server, 0, 0, 2, 0)
self.nameserver, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, self.test_domain['serial'])
self.assertEqual(retries, 2)
@ -154,7 +152,7 @@ class MdnsNotifyTest(MdnsTestCase):
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Domain.from_dict(self.test_domain),
self.server, 0, 0, 2, 0)
self.nameserver, 0, 0, 2, 0)
self.assertEqual(status, 'ERROR')
self.assertEqual(serial, 99)
self.assertEqual(retries, 0)
@ -180,7 +178,7 @@ class MdnsNotifyTest(MdnsTestCase):
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Domain.from_dict(self.test_domain),
self.server, 0, 0, 2, 0)
self.nameserver, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, 101)
self.assertEqual(retries, 2)
@ -189,8 +187,8 @@ class MdnsNotifyTest(MdnsTestCase):
def test_poll_for_serial_number_timeout(self, _):
context = self.get_context()
status, serial, retries = self.notify.get_serial_number(
context, objects.Domain.from_dict(self.test_domain), self.server,
0, 0, 2, 0)
context, objects.Domain.from_dict(self.test_domain),
self.nameserver, 0, 0, 2, 0)
self.assertEqual(status, 'ERROR')
self.assertEqual(serial, None)
self.assertEqual(retries, 0)
@ -205,8 +203,8 @@ class MdnsNotifyTest(MdnsTestCase):
context = self.get_context()
test_domain = objects.Domain.from_dict(self.test_domain)
status, serial, retries = self.notify.get_serial_number(
context, test_domain, self.server, 0, 0, 2, 0)
context, test_domain, self.nameserver, 0, 0, 2, 0)
response, retry = self.notify.notify_zone_changed(
context, test_domain, self.server, 0, 0, 2, 0)
context, test_domain, self.nameserver, 0, 0, 2, 0)
assert not udp.called
assert tcp.called

View File

@ -23,7 +23,7 @@ from designate.pool_manager.cache.base import PoolManagerCache
class PoolManagerCacheTestCase(object):
def create_pool_manager_status(self):
values = {
'server_id': '896aa661-198c-4379-bccd-5d8de7007030',
'nameserver_id': '896aa661-198c-4379-bccd-5d8de7007030',
'domain_id': 'bce45113-4a22-418d-a54d-c9777d056312',
'action': 'CREATE',
'status': 'SUCCESS',
@ -42,12 +42,12 @@ class PoolManagerCacheTestCase(object):
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.retrieve(
self.admin_context, expected.server_id, expected.domain_id,
self.admin_context, expected.nameserver_id, expected.domain_id,
expected.action)
def test_retrieve(self):
expected = self.create_pool_manager_status()
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.retrieve(
self.admin_context, expected.server_id, expected.domain_id,
self.admin_context, expected.nameserver_id, expected.domain_id,
expected.action)

View File

@ -26,7 +26,7 @@ class MemcachePoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
self.cache = cache.get_pool_manager_cache('memcache')
self.mock_status = Mock(
server_id='server_id',
nameserver_id='nameserver_id',
domain_id='domain_id',
action='CREATE',
)
@ -36,10 +36,10 @@ class MemcachePoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
self.cache.store(self.admin_context, expected)
actual = self.cache.retrieve(
self.admin_context, expected.server_id, expected.domain_id,
self.admin_context, expected.nameserver_id, expected.domain_id,
expected.action)
self.assertEqual(expected.server_id, actual.server_id)
self.assertEqual(expected.nameserver_id, actual.nameserver_id)
self.assertEqual(expected.domain_id, actual.domain_id)
self.assertEqual(expected.status, actual.status)
self.assertEqual(expected.serial_number, actual.serial_number)
@ -53,7 +53,7 @@ class MemcachePoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
"""
key = self.cache._build_serial_number_key(self.mock_status)
self.assertIsInstance(key, str)
self.assertEqual(key, 'server_id-domain_id-CREATE-serial_number')
self.assertEqual(key, 'nameserver_id-domain_id-CREATE-serial_number')
def test_status_key_is_a_string(self):
"""Memcache requires keys be strings.
@ -63,4 +63,4 @@ class MemcachePoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
"""
key = self.cache._build_status_key(self.mock_status)
self.assertIsInstance(key, str)
self.assertEqual(key, 'server_id-domain_id-CREATE-status')
self.assertEqual(key, 'nameserver_id-domain_id-CREATE-status')

View File

@ -33,5 +33,5 @@ class NoopPoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.cache.retrieve(
self.admin_context, expected.server_id, expected.domain_id,
self.admin_context, expected.nameserver_id, expected.domain_id,
expected.action)

View File

@ -29,10 +29,10 @@ class SqlalchemyPoolManagerCacheTest(PoolManagerCacheTestCase, TestCase):
self.cache.store(self.admin_context, expected)
actual = self.cache.retrieve(
self.admin_context, expected.server_id, expected.domain_id,
self.admin_context, expected.nameserver_id, expected.domain_id,
expected.action)
self.assertEqual(expected.server_id, actual.server_id)
self.assertEqual(expected.nameserver_id, actual.nameserver_id)
self.assertEqual(expected.domain_id, actual.domain_id)
self.assertEqual(expected.status, actual.status)
self.assertEqual(expected.serial_number, actual.serial_number)

View File

@ -93,12 +93,12 @@ class PoolManagerAPITest(PoolManagerTestCase):
'host': '127.0.0.1',
'port': '53'
}
server = objects.PoolServer.from_dict(values)
nameserver = objects.PoolNameserver.from_dict(values)
PoolManagerAPI.get_instance().update_status(
self.admin_context, domain, server, 'SUCCESS', 1)
self.admin_context, domain, nameserver, 'SUCCESS', 1)
mock_prepare.assert_called_once_with(
topic='pool_manager.%s' % domain.pool_id)
mock_prepare.return_value.cast.assert_called_once_with(
self.admin_context, 'update_status', domain=domain,
server=server, status='SUCCESS', actual_serial=1)
nameserver=nameserver, status='SUCCESS', actual_serial=1)

View File

@ -32,36 +32,70 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
super(PoolManagerServiceNoopTest, self).setUp()
self.config(
backends=['fake'],
threshold_percentage=100,
enable_recovery_timer=False,
enable_sync_timer=False,
cache_driver='noop',
group='service:pool_manager')
# TODO(kiall): Rework all this pool config etc into a fixture..
# Configure the Pool ID
self.config(
server_ids=['f278782a-07dc-4502-9177-b5d85c5f7c7e',
'a38703f2-b71e-4e5b-ab22-30caaed61dfd'],
group='backend:fake')
pool_id='794ccc2c-d751-44fe-b57f-8894c9f5c842',
group='service:pool_manager')
section_name = 'backend:fake:f278782a-07dc-4502-9177-b5d85c5f7c7e'
server_opts = [
cfg.StrOpt('host', default='10.0.0.2'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
# Configure the Pool
section_name = 'pool:794ccc2c-d751-44fe-b57f-8894c9f5c842'
section_opts = [
cfg.ListOpt('targets', default=[
'f278782a-07dc-4502-9177-b5d85c5f7c7e',
'a38703f2-b71e-4e5b-ab22-30caaed61dfd',
]),
cfg.ListOpt('nameservers', default=[
'c5d64303-4cba-425a-9f3c-5d708584dde4',
'c67cdc95-9a9e-4d2a-98ed-dc78cbd85234',
]),
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
cfg.CONF.register_opts(section_opts, group=section_name)
section_name = 'backend:fake:a38703f2-b71e-4e5b-ab22-30caaed61dfd'
server_opts = [
cfg.StrOpt('host', default='10.0.0.3'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
# Configure the Pool Targets
section_name = 'pool_target:f278782a-07dc-4502-9177-b5d85c5f7c7e'
section_opts = [
cfg.StrOpt('type', default='fake'),
cfg.ListOpt('masters', default=['127.0.0.1:5354']),
cfg.DictOpt('options', default={})
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
cfg.CONF.register_opts(section_opts, group=section_name)
section_name = 'pool_target:a38703f2-b71e-4e5b-ab22-30caaed61dfd'
section_opts = [
cfg.StrOpt('type', default='fake'),
cfg.ListOpt('masters', default=['127.0.0.1:5354']),
cfg.DictOpt('options', default={})
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(section_opts, group=section_name)
# Configure the Pool Nameservers
section_name = 'pool_nameserver:c5d64303-4cba-425a-9f3c-5d708584dde4'
section_opts = [
cfg.StrOpt('host', default='127.0.0.1'),
cfg.StrOpt('port', default=5355),
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(section_opts, group=section_name)
section_name = 'pool_nameserver:c67cdc95-9a9e-4d2a-98ed-dc78cbd85234'
section_opts = [
cfg.StrOpt('host', default='127.0.0.1'),
cfg.StrOpt('port', default=5356),
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(section_opts, group=section_name)
# Start the Service
self.service = self.start_service('pool_manager')
self.cache = self.service.cache
@ -97,20 +131,20 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(0, len(create_statuses))
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each backend server.
# was called for each nameserver.
self.assertEqual(2, mock_notify_zone_changed.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0),
self.service.pool.nameservers[0], 30, 2, 3, 0),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
self.service.pool.nameservers[1], 30, 2, 3, 0)],
mock_notify_zone_changed.call_args_list)
self.assertEqual(2, mock_poll_for_serial_number.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1),
self.service.pool.nameservers[0], 30, 2, 3, 1),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
self.service.pool.nameservers[1], 30, 2, 3, 1)],
mock_poll_for_serial_number.call_args_list)
# Pool manager needs to call into mdns to calculate consensus as
@ -123,7 +157,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_both_failure(
def test_create_domain_target_both_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain, _):
@ -142,7 +176,9 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(False, mock_notify_zone_changed.called)
self.assertEqual(False, mock_poll_for_serial_number.called)
self.assertEqual(False, mock_update_status.called)
# Since consensus is not reached this early, we immediatly call
# central's update_status.
self.assertEqual(True, mock_update_status.called)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@ -150,7 +186,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure(
def test_create_domain_target_one_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain, _):
@ -164,14 +200,9 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.admin_context, domain, 'CREATE')
self.assertEqual(0, len(create_statuses))
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
self.assertEqual(False, mock_update_status.called)
# Since consensus is not reached this early, we immediatly call
# central's update_status.
self.assertEqual(True, mock_update_status.called)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@ -179,7 +210,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure_consensus(
def test_create_domain_target_one_failure_consensus(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain, _):
@ -199,37 +230,40 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.admin_context, domain, 'CREATE')
self.assertEqual(0, len(create_statuses))
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each nameserver.
self.assertEqual(2, mock_notify_zone_changed.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.pool.nameservers[0], 30, 2, 3, 0),
call(self.admin_context, domain,
self.service.pool.nameservers[1], 30, 2, 3, 0)],
mock_notify_zone_changed.call_args_list)
self.assertEqual(2, mock_poll_for_serial_number.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.pool.nameservers[0], 30, 2, 3, 1),
call(self.admin_context, domain,
self.service.pool.nameservers[1], 30, 2, 3, 1)],
mock_poll_for_serial_number.call_args_list)
self.assertEqual(False, mock_update_status.called)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain',
side_effect=exceptions.Backend)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_both_failure(
self, mock_update_status, mock_delete_domain, _):
def test_delete_domain_target_both_failure(
self, mock_update_status, mock_delete_domain):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
@ -237,19 +271,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure(
self, mock_update_status, mock_delete_domain, _):
def test_delete_domain_target_one_failure(
self, mock_update_status, mock_delete_domain):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
@ -257,19 +285,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure_consensus(
self, mock_update_status, mock_delete_domain, _):
def test_delete_domain_target_one_failure_consensus(
self, mock_update_status, mock_delete_domain):
self.service.stop()
self.config(
@ -283,10 +305,6 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@ -298,7 +316,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
self.service.pool.nameservers[0],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -309,7 +327,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
self.service.pool.nameservers[1],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -323,11 +341,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_both_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
self.service.pool.nameservers[0],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -341,7 +358,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
self.service.pool.nameservers[1],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -355,11 +372,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_one_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
self.service.pool.nameservers[0],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -370,7 +386,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
self.service.pool.nameservers[1],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -394,7 +410,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
self.service.pool.nameservers[0],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
@ -408,7 +424,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
self.service.pool.nameservers[1],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(

View File

@ -1,454 +0,0 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from oslo.config import cfg
from mock import call
from mock import patch
from designate import exceptions
from designate import objects
from designate.backend import impl_fake
from designate.central import rpcapi as central_rpcapi
from designate.mdns import rpcapi as mdns_rpcapi
from designate.tests.test_pool_manager import PoolManagerTestCase
class PoolManagerServiceMemcacheTest(PoolManagerTestCase):
def setUp(self):
super(PoolManagerServiceMemcacheTest, self).setUp()
self.config(
backends=['fake'],
threshold_percentage=100,
enable_recovery_timer=False,
enable_sync_timer=False,
cache_driver='memcache',
group='service:pool_manager')
self.config(
server_ids=['f278782a-07dc-4502-9177-b5d85c5f7c7e',
'a38703f2-b71e-4e5b-ab22-30caaed61dfd'],
group='backend:fake')
section_name = 'backend:fake:f278782a-07dc-4502-9177-b5d85c5f7c7e'
server_opts = [
cfg.StrOpt('host', default='10.0.0.2'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
section_name = 'backend:fake:a38703f2-b71e-4e5b-ab22-30caaed61dfd'
server_opts = [
cfg.StrOpt('host', default='10.0.0.3'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
self.service = self.start_service('pool_manager')
self.cache = self.service.cache
@staticmethod
def _build_domain(name, action, status):
values = {
'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40',
'name': name,
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842',
'action': action,
'serial': 1422062497,
'status': status
}
return objects.Domain.from_dict(values)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, _):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual(None, create_statuses[0].status)
self.assertEqual(None, create_statuses[1].status)
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each backend server.
self.assertEqual(2, mock_notify_zone_changed.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
mock_notify_zone_changed.call_args_list)
self.assertEqual(2, mock_poll_for_serial_number.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
mock_poll_for_serial_number.call_args_list)
self.assertEqual(False, mock_update_status.called)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_both_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = exceptions.Backend
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual('ERROR', create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
# Ensure notify_zone_changed and poll_for_serial_number
# were never called.
self.assertEqual(False, mock_notify_zone_changed.called)
self.assertEqual(False, mock_poll_for_serial_number.called)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = [None, exceptions.Backend]
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual(None, create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
self.assertEqual(False, mock_update_status.called)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure_consensus(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = [None, exceptions.Backend]
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual(None, create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_both_failure(
self, mock_update_status, mock_delete_domain, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = exceptions.Backend
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('ERROR', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure(
self, mock_update_status, mock_delete_domain, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = [None, exceptions.Backend]
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure_consensus(
self, mock_update_status, mock_delete_domain, _):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = [None, exceptions.Backend]
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
# Ensure update_status was not called.
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(0, len(update_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_both_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', 0)
# Reset the mock call attributes.
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_one_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
# Ensure update_status was not called.
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_one_failure_consensus(self, mock_update_status, _):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
# Reset the mock call attributes.
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)

View File

@ -1,514 +0,0 @@
# Copyright 2014 eBay Inc.
#
# Author: Ron Rickard <rrickard@ebaysf.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslo import messaging
from oslo.config import cfg
from mock import call
from mock import patch
from designate import exceptions
from designate import objects
from designate.backend import impl_fake
from designate.central import rpcapi as central_rpcapi
from designate.mdns import rpcapi as mdns_rpcapi
from designate.tests.test_pool_manager import PoolManagerTestCase
class PoolManagerServiceTest(PoolManagerTestCase):
def setUp(self):
super(PoolManagerServiceTest, self).setUp()
self.config(
backends=['fake'],
threshold_percentage=100,
enable_recovery_timer=False,
enable_sync_timer=False,
cache_driver='sqlalchemy',
group='service:pool_manager')
self.config(
server_ids=['f278782a-07dc-4502-9177-b5d85c5f7c7e',
'a38703f2-b71e-4e5b-ab22-30caaed61dfd'],
group='backend:fake')
section_name = 'backend:fake:f278782a-07dc-4502-9177-b5d85c5f7c7e'
server_opts = [
cfg.StrOpt('host', default='10.0.0.2'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
section_name = 'backend:fake:a38703f2-b71e-4e5b-ab22-30caaed61dfd'
server_opts = [
cfg.StrOpt('host', default='10.0.0.3'),
cfg.IntOpt('port', default=53),
cfg.StrOpt('tsig_key')
]
cfg.CONF.register_group(cfg.OptGroup(name=section_name))
cfg.CONF.register_opts(server_opts, group=section_name)
self.service = self.start_service('pool_manager')
self.cache = self.service.cache
@staticmethod
def _build_domain(name, action, status):
values = {
'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40',
'name': name,
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842',
'action': action,
'serial': 1422062497,
'status': status
}
return objects.Domain.from_dict(values)
def test_stop(self):
# NOTE: Start is already done by the fixture in start_service()
self.service.stop()
def test_pool_instance_topic(self):
self.assertEqual(
'pool_manager.%s' % cfg.CONF['service:pool_manager'].pool_id,
self.service._rpc_topic)
def test_no_pool_servers_configured(self):
self.service.stop()
self.config(
server_ids=[],
group='backend:fake'
)
with testtools.ExpectedException(exceptions.NoPoolServersConfigured):
self.start_service('pool_manager')
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, _):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual(None, create_statuses[0].status)
self.assertEqual(None, create_statuses[1].status)
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each backend server.
self.assertEqual(2, mock_notify_zone_changed.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
mock_notify_zone_changed.call_args_list)
self.assertEqual(2, mock_poll_for_serial_number.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
mock_poll_for_serial_number.call_args_list)
self.assertEqual(False, mock_update_status.called)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_both_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = exceptions.Backend
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual('ERROR', create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
# Ensure notify_zone_changed and poll_for_serial_number
# were never called.
self.assertEqual(False, mock_notify_zone_changed.called)
self.assertEqual(False, mock_poll_for_serial_number.called)
# Pool manager is able to determine error status for both the backends
# without calling mdns, so update_status is called with error
# TODO(vinod): Investigate later whether sending back domain.serial or
# 0 is the right thing to do here.
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = [None, exceptions.Backend]
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
# The status is not updated to 'SUCCESS' until we hear back from mdns
self.assertEqual(None, create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
# Ensure update_status was not called. This is because we want to hear
# back from mdns for one of the backends
self.assertEqual(False, mock_update_status.called)
@patch.object(impl_fake.FakeBackend, 'create_domain')
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_create_domain_backend_one_failure_consensus(
self, mock_update_status, mock_notify_zone_changed,
mock_poll_for_serial_number, mock_create_domain):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'CREATE', 'PENDING')
mock_create_domain.side_effect = [None, exceptions.Backend]
self.service.create_domain(self.admin_context, domain)
create_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'CREATE')
self.assertEqual(2, len(create_statuses))
self.assertEqual(None, create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
mock_poll_for_serial_number.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1)
# The status is updated to 'SUCCESS' only after pool manager verifies
# via mdns even though the backend reports success.
# TODO(vinod): Investigate later whether sending back the status at
# this point is the right thing to do
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_both_failure(
self, mock_update_status, mock_delete_domain, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = exceptions.Backend
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('ERROR', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure(
self, mock_update_status, mock_delete_domain, _):
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = [None, exceptions.Backend]
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure_consensus(
self, mock_update_status, mock_delete_domain, _):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'DELETE', 'PENDING')
mock_delete_domain.side_effect = [None, exceptions.Backend]
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'DELETE')
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
def test_update_domain(
self, mock_notify_zone_changed, mock_poll_for_serial_number):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_domain(self.admin_context, domain)
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each backend server.
self.assertEqual(2, mock_notify_zone_changed.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
mock_notify_zone_changed.call_args_list)
self.assertEqual(2, mock_poll_for_serial_number.call_count)
self.assertEqual(
[call(self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 1),
call(self.admin_context, domain,
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
mock_poll_for_serial_number.call_args_list)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
# Ensure update_status was not called.
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(0, len(update_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_both_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
# update_status is called now as pool manager retrieves statuses from
# mdns as needed, which gets the status of the other backend too
# which indicates an ERROR
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', 0)
# Reset the mock call attributes.
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_one_failure(self, mock_update_status, _):
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
# Ensure update_status was not called.
self.assertEqual(False, mock_update_status.called)
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_status_one_failure_consensus(self, mock_update_status, _):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self._build_domain('example.org.', 'UPDATE', 'PENDING')
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
'SUCCESS', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
# Reset the mock call attributes.
mock_update_status.reset_mock()
self.service.update_status(self.admin_context, domain,
self.service.server_backends[1]['server'],
'ERROR', domain.serial)
update_statuses = self.service._retrieve_statuses(
self.admin_context, domain, 'UPDATE')
self.assertEqual(2, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(2, mock_update_status.call_count)
self.assertEqual(
[call(self.admin_context, domain.id, 'SUCCESS', domain.serial),
call(self.admin_context, domain.id, 'ERROR', 0)],
mock_update_status.call_args_list)

View File

@ -95,6 +95,10 @@ def read_config(prog, argv):
register_plugin_opts()
# Avoid circular dependency imports
from designate import pool_manager
pool_manager.register_dynamic_pool_options()
def register_plugin_opts():
# Avoid circular dependency imports
@ -103,6 +107,8 @@ def register_plugin_opts():
# Register Backend Plugin Config Options
plugin.Plugin.register_cfg_opts('designate.backend')
plugin.Plugin.register_extra_cfg_opts('designate.backend')
# Register Agent Backend Plugin Config Options
plugin.Plugin.register_cfg_opts('designate.backend.agent_backend')
plugin.Plugin.register_extra_cfg_opts('designate.backend.agent_backend')

View File

@ -152,7 +152,6 @@ debug = False
# Pool Manager Service
#-----------------------
[service:pool_manager]
#backends = bind9
#workers = None
#pool_id = 794ccc2c-d751-44fe-b57f-8894c9f5c842
#threshold_percentage = 100
@ -243,30 +242,27 @@ debug = False
#memcached_servers = None
#expiration = 3600
#############################
## Pool Backend Configuration
#############################
#####################
## Pool Configuration
#####################
# This section does not have the defaults filled in but demonstrates an
# example pool / server set up. Different backends will have different options.
#-----------------------
# Global Bind9 Pool Backend
#-----------------------
[backend:bind9]
#server_ids = 6a5032b6-2d96-43ee-b25b-7d784e2bf3b2
#masters = 127.0.0.1:5354
#rndc_host = 127.0.0.1
#rndc_port = 953
#rndc_config_file = /etc/rndc.conf
#rndc_key_file = /etc/rndc.key
#[pool:794ccc2c-d751-44fe-b57f-8894c9f5c842]
#nameservers = 0f66b842-96c2-4189-93fc-1dc95a08b012
#targets = f26e0b32-736f-4f0a-831b-039a415c481e
#-----------------------
# Server Specific Bind9 Pool Backend
#-----------------------
[backend:bind9:6a5032b6-2d96-43ee-b25b-7d784e2bf3b2]
# host = 127.0.0.1
#[pool_nameserver:0f66b842-96c2-4189-93fc-1dc95a08b012]
#port = 53
#host = 192.168.27.100
#[pool_target:f26e0b32-736f-4f0a-831b-039a415c481e]
#options = rndc_host: 192.168.27.100, rndc_port: 953, rndc_config_file: /etc/bind/rndc.conf, rndc_key_file: /etc/bind/rndc.key
#masters = 192.168.27.100:5354
#type = bind9
#port = 53
#host = 192.168.27.100
#############################
## Agent Backend Configuration

View File

@ -96,6 +96,7 @@ designate.quota =
designate.manage =
database = designate.manage.database:DatabaseCommands
pool = designate.manage.pool:PoolCommands
pool-manager-cache = designate.manage.pool_manager_cache:DatabaseCommands
powerdns = designate.manage.powerdns:DatabaseCommands
tlds = designate.manage.tlds:TLDCommands