Set zone in ERROR status on periodic sync fail

Improve testing
Code style cleanup

Change-Id: I1d65ed13fc185858ffe477e4bca6303e01a12c1d
Closes-Bug: #1416263
This commit is contained in:
Federico Ceratto 2015-12-09 15:45:57 +00:00
parent c2a0af1aa8
commit 937e6dd523
3 changed files with 265 additions and 82 deletions

View File

@ -138,18 +138,16 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer:
LOG.info(_LI('Starting periodic recovery timer'))
self.tg.add_timer(
CONF['service:pool_manager'].periodic_recovery_interval,
self.periodic_recovery,
CONF['service:pool_manager'].periodic_recovery_interval)
interval = CONF['service:pool_manager'].periodic_recovery_interval
LOG.info(_LI('Starting periodic recovery timer every'
' %(interval)s s') % {'interval': interval})
self.tg.add_timer(interval, self.periodic_recovery, interval)
if CONF['service:pool_manager'].enable_sync_timer:
LOG.info(_LI('Starting periodic synchronization timer'))
self.tg.add_timer(
CONF['service:pool_manager'].periodic_sync_interval,
self.periodic_sync,
CONF['service:pool_manager'].periodic_sync_interval)
interval = CONF['service:pool_manager'].periodic_sync_interval
LOG.info(_LI('Starting periodic synchronization timer every'
' %(interval)s s') % {'interval': interval})
self.tg.add_timer(interval, self.periodic_sync, interval)
def stop(self):
self._pool_election.stop()
@ -170,72 +168,81 @@ class Service(service.RPCService, coordination.CoordinationMixin,
# Periodioc Tasks
def periodic_recovery(self):
"""
Runs only on the pool leader
:return: None
"""
# NOTE(kiall): Only run this periodic task on the pool leader
if self._pool_election.is_leader:
context = DesignateContext.get_admin_context(all_tenants=True)
if not self._pool_election.is_leader:
return
LOG.debug("Starting Periodic Recovery")
context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Recovery")
try:
# Handle Deletion Failures
zones = self._get_failed_zones(context, DELETE_ACTION)
try:
# Handle Deletion Failures
zones = self._get_failed_zones(context, DELETE_ACTION)
for zone in zones:
self.delete_zone(context, zone)
for zone in zones:
self.delete_zone(context, zone)
# Handle Creation Failures
zones = self._get_failed_zones(context, CREATE_ACTION)
# Handle Creation Failures
zones = self._get_failed_zones(context, CREATE_ACTION)
for zone in zones:
self.create_zone(context, zone)
for zone in zones:
self.create_zone(context, zone)
# Handle Update Failures
zones = self._get_failed_zones(context, UPDATE_ACTION)
# Handle Update Failures
zones = self._get_failed_zones(context, UPDATE_ACTION)
for zone in zones:
self.update_zone(context, zone)
for zone in zones:
self.update_zone(context, zone)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'recovery occurred'))
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'recovery occurred'))
def periodic_sync(self):
"""
"""Periodically sync all the zones that are not in ERROR status
Runs only on the pool leader
:return: None
"""
# NOTE(kiall): Only run this periodic task on the pool leader
if self._pool_election.is_leader:
context = DesignateContext.get_admin_context(all_tenants=True)
if not self._pool_election.is_leader:
return
LOG.debug("Starting Periodic Synchronization")
context = DesignateContext.get_admin_context(all_tenants=True)
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
}
LOG.debug("Starting Periodic Synchronization")
periodic_sync_seconds = \
CONF['service:pool_manager'].periodic_sync_seconds
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
}
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
periodic_sync_seconds = \
CONF['service:pool_manager'].periodic_sync_seconds
zones = self.central_api.find_zones(context, criterion)
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
try:
for zone in zones:
# TODO(kiall): If the zone was created within the last
# periodic_sync_seconds, attempt to recreate
# to fill in targets which may have failed.
self.update_zone(context, zone)
zones = self.central_api.find_zones(context, criterion)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'synchronization occurred.'))
try:
for zone in zones:
# TODO(kiall): If the zone was created within the last
# periodic_sync_seconds, attempt to recreate
# to fill in targets which may have failed.
success = self.update_zone(context, zone)
if not success:
self.central_api.update_status(context, zone.id,
ERROR_STATUS, zone.serial)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'synchronization occurred.'))
self.central_api.update_status(context, zone.id, ERROR_STATUS,
zone.serial)
# Standard Create/Update/Delete Methods
@ -314,32 +321,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return False
def update_zone(self, context, zone):
"""
"""Update a zone across every pool target, check for consensus and
for propagation.
:param context: Security context information.
:param zone: Zone to be updated
:return: None
:return: consensus reached (bool)
"""
LOG.info(_LI("Updating zone %s"), zone.name)
results = []
# Update the zone on each of the Pool Targets
success_count = 0
for target in self.pool.targets:
results.append(
self._update_zone_on_target(context, target, zone))
ok_status = self._update_zone_on_target(context, target, zone)
if ok_status:
success_count += 1
if self._exceed_or_meet_threshold(results.count(True)):
LOG.debug('Consensus reached for updating zone %(zone)s '
'on pool targets' % {'zone': zone.name})
else:
if not self._exceed_or_meet_threshold(success_count):
LOG.warn(_LW('Consensus not reached for updating zone %(zone)s'
' on pool targets') % {'zone': zone.name})
self.central_api.update_status(context, zone.id, ERROR_STATUS,
zone.serial)
return False
self.central_api.update_status(
context, zone.id, ERROR_STATUS, zone.serial)
return
LOG.debug('Consensus reached for updating zone %(zone)s '
'on pool targets' % {'zone': zone.name})
# Send a NOTIFY to each also-notifies
for also_notify in self.pool.also_notifies:
@ -360,8 +365,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
context, zone, nameserver, self.timeout,
self.retry_interval, self.max_retries, self.delay)
return True
def _update_zone_on_target(self, context, target, zone):
"""
"""Instruct the appropriate backend to update a zone on a target
:param context: Security context information.
:param target: Target to update Zone on
:param zone: Zone to be updated
@ -535,7 +542,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'action': action,
'status': 'ERROR'
'status': ERROR_STATUS
}
return self.central_api.find_zones(context, criterion)
@ -548,10 +555,12 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return (Decimal(count) / Decimal(total_count)) * Decimal(100)
def _exceed_or_meet_threshold(self, count, threshold=None):
"""Evaluate if count / the number of pool targets >= threshold
Used to implement consensus
"""
threshold = threshold or self.threshold
return self._percentage(
count, len(self.pool.targets)) >= Decimal(threshold)
perc = self._percentage(count, len(self.pool.targets))
return perc >= Decimal(threshold)
@staticmethod
def _get_sorted_serials(pool_manager_statuses, descending=False):
@ -568,6 +577,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return self._get_sorted_serials(pool_manager_statuses, descending=True)
def _is_consensus(self, context, zone, action, status, threshold=None):
"""Fetch zone status across all nameservers through MiniDNS and compare
it with the expected `status`
:return: consensus reached (bool)
"""
status_count = 0
pool_manager_statuses = self._retrieve_statuses(
context, zone, action)
@ -609,6 +622,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
# value the nameserver
@staticmethod
def _build_status_object(nameserver, zone, action):
"""
:return: :class:`objects.PoolManagerStatus`
"""
values = {
'nameserver_id': nameserver.id,
'zone_id': zone.id,
@ -643,6 +659,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
pass
def _retrieve_from_mdns(self, context, nameserver, zone, action):
"""Instruct MiniDNS to get a zone serial number from a nameserver
Set error status if the zone is unexpectedly missing.
:return: :class:`objects.PoolManagerStatus` or None
"""
try:
(status, actual_serial, retries) = \
self.mdns_api.get_serial_number(
@ -661,16 +681,16 @@ class Service(service.RPCService, coordination.CoordinationMixin,
if status == NO_ZONE_STATUS:
if action == CREATE_ACTION:
pool_manager_status.status = 'ERROR'
pool_manager_status.status = ERROR_STATUS
elif action == DELETE_ACTION:
pool_manager_status.status = 'SUCCESS'
pool_manager_status.status = SUCCESS_STATUS
elif action == UPDATE_ACTION:
pool_manager_status.action = 'CREATE'
pool_manager_status.status = 'ERROR'
pool_manager_status.action = CREATE_ACTION
pool_manager_status.status = ERROR_STATUS
else:
pool_manager_status.status = status
pool_manager_status.serial_number = actual_serial \
if actual_serial is not None else 0
pool_manager_status.serial_number = actual_serial or 0
LOG.debug('Retrieved status %s and serial %s for zone %s '
'on nameserver %s with action %s from mdns.' %
(pool_manager_status.status,
@ -681,6 +701,11 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return pool_manager_status
def _retrieve_statuses(self, context, zone, action):
"""Instruct MiniDNS to get a zone serial number from all nameservers,
unless a cached value is available.
Set error status if the zone is unexpectedly missing.
:return: list of :class:`objects.PoolManagerStatus`
"""
pool_manager_statuses = []
for nameserver in self.pool.nameservers:
try:

View File

@ -13,9 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import oslo_messaging as messaging
from oslo_config import cfg
from mock import call
from mock import Mock
from mock import patch
from designate import exceptions
@ -103,9 +106,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.cache = self.service.cache
@staticmethod
def _build_zone(name, action, status):
def _build_zone(name, action, status, id=None):
zid = id or '75ea1626-eea7-46b5-acb7-41e5897c2d40'
values = {
'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40',
'id': zid,
'name': name,
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842',
'action': action,
@ -114,6 +118,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
}
return objects.Zone.from_dict(values)
def _build_zones(self, n, action, status):
return [
self._build_zone("zone%02X.example" % cnt, action,
status, id=str(uuid.uuid4()))
for cnt in range(n)
]
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
side_effect=messaging.MessagingException)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@ -419,3 +430,92 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_update_status.assert_called_once_with(
self.admin_context, zone.id, 'ERROR', 0)
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
def test_periodic_sync_not_leader(self, mock_find_zones):
self.service._update_zone_on_target = Mock(return_value=False)
self.service._pool_election = Mock()
self.service._pool_election.is_leader = False
self.service.update_zone = Mock()
self.service.periodic_sync()
self.assertFalse(mock_find_zones.called)
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_update_zone_no_consensus(self, mock_cent_update_status):
zone = self._build_zone('example.org.', 'UPDATE', 'PENDING')
self.service._update_zone_on_target = Mock(return_value=True)
self.service._exceed_or_meet_threshold = Mock(return_value=False)
ret = self.service.update_zone(self.admin_context, zone)
self.assertFalse(ret)
self.assertEqual(2, self.service._update_zone_on_target.call_count)
self.assertEqual(1, mock_cent_update_status.call_count)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
def test_update_zone(self, mock_mdns_poll):
zone = self._build_zone('example.org.', 'UPDATE', 'PENDING')
self.service._update_zone_on_target = Mock(return_value=True)
self.service._update_zone_on_also_notify = Mock()
self.service.pool.also_notifies = ['bogus']
self.service._exceed_or_meet_threshold = Mock(return_value=True)
# cache.retrieve will throw exceptions.PoolManagerStatusNotFound
# mdns_api.poll_for_serial_number will be called twice
ret = self.service.update_zone(self.admin_context, zone)
self.assertTrue(ret)
self.assertEqual(2, self.service._update_zone_on_target.call_count)
self.assertEqual(1, self.service._update_zone_on_also_notify.call_count) # noqa
self.assertEqual(2, mock_mdns_poll.call_count)
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
def test_periodic_sync(self, mock_find_zones,
mock_cent_update_status, *a):
self.service.update_zone = Mock()
mock_find_zones.return_value = self._build_zones(2, 'UPDATE',
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_zones.call_count)
criterion = mock_find_zones.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
self.assertEqual(2, self.service.update_zone.call_count)
self.assertEqual(0, mock_cent_update_status.call_count)
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
def test_periodic_sync_with_failing_update(self, mock_find_zones,
mock_cent_update_status, *a):
self.service.update_zone = Mock(return_value=False) # fail update
mock_find_zones.return_value = self._build_zones(3, 'UPDATE',
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_zones.call_count)
criterion = mock_find_zones.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
# all zones are now in ERROR status
self.assertEqual(3, self.service.update_zone.call_count)
self.assertEqual(3, mock_cent_update_status.call_count)
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
def test_periodic_sync_with_failing_update_with_exception(
self, mock_find_zones, mock_cent_update_status, *a):
self.service.update_zone = Mock(side_effect=Exception)
mock_find_zones.return_value = self._build_zones(3, 'UPDATE',
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_zones.call_count)
criterion = mock_find_zones.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
# the first updated zone is now in ERROR status
self.assertEqual(1, self.service.update_zone.call_count)
self.assertEqual(1, mock_cent_update_status.call_count)

View File

@ -0,0 +1,58 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import Mock
from mock import MagicMock
from mock import patch
from oslotest import base as test
from designate import exceptions
from designate import objects
from designate.pool_manager.service import Service
class PoolManagerTest(test.BaseTestCase):
def __setUp(self):
super(PoolManagerTest, self).setUp()
def test_init_no_pool_targets(self):
with patch.object(objects.Pool, 'from_config',
return_value=MagicMock()):
self.assertRaises(exceptions.NoPoolTargetsConfigured, Service)
def test_init(self):
with patch.object(objects.Pool, 'from_config',
return_value=Mock()):
Service._setup_target_backends = Mock()
Service()
def test_start(self):
with patch.object(objects.Pool, 'from_config',
return_value=Mock()):
Service._setup_target_backends = Mock()
pm = Service()
pm.pool.targets = ()
pm.tg.add_timer = Mock()
pm._pool_election = Mock()
with patch("designate.service.RPCService.start"):
pm.start()
call1 = pm.tg.add_timer.call_args_list[0][0]
self.assertEqual(120, call1[0])
self.assertEqual(120, call1[-1])
call2 = pm.tg.add_timer.call_args_list[1][0]
self.assertEqual(1800, call2[0])
self.assertEqual(1800, call2[-1])