Merge "Add retry logic on periodic_sync"
This commit is contained in:
commit
bbd4443765
@ -54,6 +54,10 @@ OPTS = [
|
||||
cfg.IntOpt('periodic-sync-seconds', default=21600,
|
||||
help='Zones Updated within last N seconds will be syncd.'
|
||||
'Use an empty value to sync all zones.'),
|
||||
cfg.IntOpt('periodic-sync-max-attempts', default=3,
|
||||
help='Number of attempts to update a zone during sync'),
|
||||
cfg.IntOpt('periodic-sync-retry-interval', default=30,
|
||||
help='Interval between zone update attempts during sync'),
|
||||
cfg.StrOpt('cache-driver', default='memcache',
|
||||
help='The cache driver to use'),
|
||||
]
|
||||
|
@ -62,6 +62,20 @@ def wrap_backend_call():
|
||||
raise exceptions.Backend('Unknown backend failure: %r' % e)
|
||||
|
||||
|
||||
def _constant_retries(num_attempts, sleep_interval):
|
||||
"""Generate a sequence of False terminated by a True
|
||||
Sleep `sleep_interval` between cycles but not at the end.
|
||||
"""
|
||||
for cnt in range(num_attempts):
|
||||
if cnt != 0:
|
||||
LOG.debug(_LI("Executing retry n. %d"), cnt)
|
||||
if cnt < num_attempts - 1:
|
||||
yield False
|
||||
time.sleep(sleep_interval)
|
||||
else:
|
||||
yield True
|
||||
|
||||
|
||||
class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
service.Service):
|
||||
"""
|
||||
@ -92,6 +106,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
self.retry_interval = CONF['service:pool_manager'].poll_retry_interval
|
||||
self.max_retries = CONF['service:pool_manager'].poll_max_retries
|
||||
self.delay = CONF['service:pool_manager'].poll_delay
|
||||
self._periodic_sync_max_attempts = \
|
||||
CONF['service:pool_manager'].periodic_sync_max_attempts
|
||||
self._periodic_sync_retry_interval = \
|
||||
CONF['service:pool_manager'].periodic_sync_retry_interval
|
||||
|
||||
# Create the necessary Backend instances for each target
|
||||
self._setup_target_backends()
|
||||
@ -217,20 +235,33 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
LOG.debug("Starting Periodic Synchronization")
|
||||
context = self._get_admin_context_all_tenants()
|
||||
zones = self._fetch_healthy_zones(context)
|
||||
zones = set(zones)
|
||||
|
||||
try:
|
||||
# TODO(kiall): If the zone was created within the last
|
||||
# periodic_sync_seconds, attempt to recreate
|
||||
# to fill in targets which may have failed.
|
||||
retry_gen = _constant_retries(
|
||||
self._periodic_sync_max_attempts,
|
||||
self._periodic_sync_retry_interval
|
||||
)
|
||||
for is_last_cycle in retry_gen:
|
||||
zones_in_error = []
|
||||
for zone in zones:
|
||||
# TODO(kiall): If the zone was created within the last
|
||||
# periodic_sync_seconds, attempt to recreate
|
||||
# to fill in targets which may have failed.
|
||||
success = self.update_zone(context, zone)
|
||||
if not success:
|
||||
self.central_api.update_status(context, zone.id,
|
||||
ERROR_STATUS, zone.serial)
|
||||
try:
|
||||
success = self.update_zone(context, zone)
|
||||
if not success:
|
||||
zones_in_error.append(zone)
|
||||
except Exception:
|
||||
LOG.exception(_LE('An unhandled exception in periodic '
|
||||
'synchronization occurred.'))
|
||||
zones_in_error.append(zone)
|
||||
|
||||
except Exception:
|
||||
LOG.exception(_LE('An unhandled exception in periodic '
|
||||
'synchronization occurred.'))
|
||||
if not zones_in_error:
|
||||
return
|
||||
|
||||
zones = zones_in_error
|
||||
|
||||
for zone in zones_in_error:
|
||||
self.central_api.update_status(context, zone.id, ERROR_STATUS,
|
||||
zone.serial)
|
||||
|
||||
|
@ -494,11 +494,12 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
||||
self.assertEqual(2, self.service.update_zone.call_count)
|
||||
self.assertEqual(0, mock_cent_update_status.call_count)
|
||||
|
||||
@patch.object(pm_module.time, 'sleep')
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
|
||||
def test_periodic_sync_with_failing_update(self, mock_find_zones,
|
||||
mock_cent_update_status, *a):
|
||||
def test_periodic_sync_with_failing_update(
|
||||
self, mock_find_zones, mock_cent_update_status, *mocks):
|
||||
self.service.update_zone = Mock(return_value=False) # fail update
|
||||
mock_find_zones.return_value = self._build_zones(3, 'UPDATE',
|
||||
'PENDING')
|
||||
@ -507,15 +508,19 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
||||
self.assertEqual(1, mock_find_zones.call_count)
|
||||
criterion = mock_find_zones.call_args_list[0][0][1]
|
||||
self.assertEqual('!ERROR', criterion['status'])
|
||||
# all zones are now in ERROR status
|
||||
self.assertEqual(3, self.service.update_zone.call_count)
|
||||
|
||||
# 3 zones, all failing, with 3 attempts: 9 calls
|
||||
self.assertEqual(9, self.service.update_zone.call_count)
|
||||
|
||||
# the zones have been put in ERROR status
|
||||
self.assertEqual(3, mock_cent_update_status.call_count)
|
||||
|
||||
@patch.object(pm_module.time, 'sleep')
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
|
||||
def test_periodic_sync_with_failing_update_with_exception(
|
||||
self, mock_find_zones, mock_cent_update_status, *a):
|
||||
self, mock_find_zones, mock_cent_update_status, *mocks):
|
||||
self.service.update_zone = Mock(side_effect=Exception)
|
||||
mock_find_zones.return_value = self._build_zones(3, 'UPDATE',
|
||||
'PENDING')
|
||||
@ -524,16 +529,20 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
||||
self.assertEqual(1, mock_find_zones.call_count)
|
||||
criterion = mock_find_zones.call_args_list[0][0][1]
|
||||
self.assertEqual('!ERROR', criterion['status'])
|
||||
# the first updated zone is now in ERROR status
|
||||
self.assertEqual(1, self.service.update_zone.call_count)
|
||||
self.assertEqual(1, mock_cent_update_status.call_count)
|
||||
|
||||
# 3 zones, all failing, with 3 attempts: 9 calls
|
||||
self.assertEqual(9, self.service.update_zone.call_count)
|
||||
|
||||
# the zones have been put in ERROR status
|
||||
self.assertEqual(3, mock_cent_update_status.call_count)
|
||||
|
||||
# Periodic recovery
|
||||
|
||||
@patch.object(pm_module.time, 'sleep')
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_periodic_recovery(self, mock_find_zones,
|
||||
mock_cent_update_status, *a):
|
||||
mock_cent_update_status, *mocks):
|
||||
|
||||
def mock_get_failed_zones(ctx, action):
|
||||
if action == pm_module.DELETE_ACTION:
|
||||
@ -614,6 +623,7 @@ class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest):
|
||||
def test_periodic_sync_and_recovery(
|
||||
self, mock_cent_update_status, *a):
|
||||
# Periodic sync + recovery
|
||||
self.service._periodic_sync_retry_interval = 0
|
||||
|
||||
# Create healthy zones, run a periodic sync that will fail
|
||||
self.create_zone(name='created.example.com.')
|
||||
|
@ -65,6 +65,17 @@ class PoolManagerInitTest(test.BaseTestCase):
|
||||
self.assertEqual(1800, call2[0])
|
||||
self.assertEqual(1800, call2[-1])
|
||||
|
||||
def test_constant_retries(self):
|
||||
with patch.object(pm_module.time, 'sleep') as mock_zzz:
|
||||
gen = pm_module._constant_retries(5, 2)
|
||||
out = list(gen)
|
||||
self.assertEqual(
|
||||
[False, False, False, False, True],
|
||||
out
|
||||
)
|
||||
self.assertEqual(4, mock_zzz.call_count)
|
||||
mock_zzz.assert_called_with(2)
|
||||
|
||||
|
||||
class PoolManagerTest(test.BaseTestCase):
|
||||
|
||||
|
@ -289,6 +289,11 @@ debug = False
|
||||
# Zones Updated within last N seconds will be syncd. Use None to sync all zones
|
||||
#periodic_sync_seconds = None
|
||||
|
||||
# Perform multiple update attempts during periodic_sync
|
||||
#periodic_sync_max_attempts = 3
|
||||
#periodic_sync_retry_interval = 30
|
||||
|
||||
|
||||
# The cache driver to use
|
||||
#cache_driver = memcache
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user