diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index abfedba33..609710ca5 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -103,7 +103,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, # Fetch an instance of the Backend class, passing in the options # and masters self.target_backends[target.id] = backend.get_backend( - target.type, target) + target.type, target) LOG.info(_LI('%d targets setup'), len(self.pool.targets)) @@ -165,35 +165,40 @@ class Service(service.RPCService, coordination.CoordinationMixin, def mdns_api(self): return mdns_api.MdnsAPI.get_instance() + def _get_admin_context_all_tenants(self): + return DesignateContext.get_admin_context(all_tenants=True) + # Periodioc Tasks def periodic_recovery(self): """ Runs only on the pool leader :return: None """ - # NOTE(kiall): Only run this periodic task on the pool leader if not self._pool_election.is_leader: return - context = DesignateContext.get_admin_context(all_tenants=True) + context = self._get_admin_context_all_tenants() LOG.debug("Starting Periodic Recovery") try: # Handle Deletion Failures zones = self._get_failed_zones(context, DELETE_ACTION) - + LOG.info(_LI("periodic_recovery:delete_zone needed on %d zones"), + len(zones)) for zone in zones: self.delete_zone(context, zone) # Handle Creation Failures zones = self._get_failed_zones(context, CREATE_ACTION) - + LOG.info(_LI("periodic_recovery:create_zone needed on %d zones"), + len(zones)) for zone in zones: self.create_zone(context, zone) # Handle Update Failures zones = self._get_failed_zones(context, UPDATE_ACTION) - + LOG.info(_LI("periodic_recovery:update_zone needed on %d zones"), + len(zones)) for zone in zones: self.update_zone(context, zone) @@ -209,24 +214,9 @@ class Service(service.RPCService, coordination.CoordinationMixin, if not self._pool_election.is_leader: return - context = DesignateContext.get_admin_context(all_tenants=True) - LOG.debug("Starting Periodic Synchronization") - - criterion = { - 'pool_id': CONF['service:pool_manager'].pool_id, - 'status': '!%s' % ERROR_STATUS - } - - periodic_sync_seconds = \ - CONF['service:pool_manager'].periodic_sync_seconds - - if periodic_sync_seconds is not None: - # Generate the current serial, will provide a UTC Unix TS. - current = utils.increment_serial() - criterion['serial'] = ">%s" % (current - periodic_sync_seconds) - - zones = self.central_api.find_zones(context, criterion) + context = self._get_admin_context_all_tenants() + zones = self._fetch_healthy_zones(context) try: for zone in zones: @@ -265,6 +255,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, if self._exceed_or_meet_threshold(results.count(True)): LOG.debug('Consensus reached for creating zone %(zone)s ' 'on pool targets' % {'zone': zone.name}) + # The zone status will be updated asyncronously by MiniDNS else: @@ -272,7 +263,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, ' on pool targets') % {'zone': zone.name}) self.central_api.update_status( - context, zone.id, ERROR_STATUS, zone.serial) + context, zone.id, ERROR_STATUS, zone.serial) return @@ -309,13 +300,14 @@ class Service(service.RPCService, coordination.CoordinationMixin, return True except Exception: retries += 1 - LOG.exception(_LE("Failed to create zone %(zone)s on " - "target %(target)s on attempt %(attempt)d"), + LOG.exception(_LE( + "Failed to create zone %(zone)s on " + "target %(target)s on attempt %(attempt)d"), { - 'zone': zone.name, - 'target': target.id, - 'attempt': retries - }) + 'zone': zone.name, + 'target': target.id, + 'attempt': retries + }) # noqa time.sleep(self.retry_interval) return False @@ -346,6 +338,8 @@ class Service(service.RPCService, coordination.CoordinationMixin, LOG.debug('Consensus reached for updating zone %(zone)s ' 'on pool targets' % {'zone': zone.name}) + # The zone status will be updated asyncronously by MiniDNS + # Send a NOTIFY to each also-notifies for also_notify in self.pool.also_notifies: self._update_zone_on_also_notify(context, also_notify, zone) @@ -422,14 +416,14 @@ class Service(service.RPCService, coordination.CoordinationMixin, 'on pool targets' % {'zone': zone.name}) self.central_api.update_status( - context, zone.id, SUCCESS_STATUS, zone.serial) + context, zone.id, SUCCESS_STATUS, zone.serial) else: LOG.warn(_LW('Consensus not reached for deleting zone %(zone)s' ' on pool targets') % {'zone': zone.name}) self.central_api.update_status( - context, zone.id, ERROR_STATUS, zone.serial) + context, zone.id, ERROR_STATUS, zone.serial) def _delete_zone_on_target(self, context, target, zone): """ @@ -450,12 +444,13 @@ class Service(service.RPCService, coordination.CoordinationMixin, return True except Exception: retries += 1 - LOG.exception(_LE("Failed to delete zone %(zone)s on " - "target %(target)s on attempt %(attempt)d"), + LOG.exception(_LE( + "Failed to delete zone %(zone)s on " + "target %(target)s on attempt %(attempt)d"), { - 'zone': zone.name, - 'target': target.id, - 'attempt': retries + 'zone': zone.name, + 'target': target.id, + 'attempt': retries }) time.sleep(self.retry_interval) @@ -546,6 +541,26 @@ class Service(service.RPCService, coordination.CoordinationMixin, } return self.central_api.find_zones(context, criterion) + def _fetch_healthy_zones(self, context): + """Fetch all zones not in error + :return: :class:`ZoneList` zones + """ + criterion = { + 'pool_id': CONF['service:pool_manager'].pool_id, + 'status': '!%s' % ERROR_STATUS + } + + periodic_sync_seconds = \ + CONF['service:pool_manager'].periodic_sync_seconds + + if periodic_sync_seconds is not None: + # Generate the current serial, will provide a UTC Unix TS. + current = utils.increment_serial() + criterion['serial'] = ">%s" % (current - periodic_sync_seconds) + + zones = self.central_api.find_zones(context, criterion) + return zones + @staticmethod def _get_destination(nameserver): return '%s:%s' % (nameserver.host, nameserver.port) diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py index 86f8ef90c..c635bd77e 100644 --- a/designate/tests/test_pool_manager/test_service.py +++ b/designate/tests/test_pool_manager/test_service.py @@ -13,6 +13,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import logging import uuid import oslo_messaging as messaging @@ -26,7 +28,11 @@ from designate import objects from designate.backend import impl_fake from designate.central import rpcapi as central_rpcapi from designate.mdns import rpcapi as mdns_rpcapi +from designate.storage.impl_sqlalchemy import tables from designate.tests.test_pool_manager import PoolManagerTestCase +import designate.pool_manager.service as pm_module + +LOG = logging.getLogger(__name__) class PoolManagerServiceNoopTest(PoolManagerTestCase): @@ -470,6 +476,8 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): self.assertEqual(1, self.service._update_zone_on_also_notify.call_count) # noqa self.assertEqual(2, mock_mdns_poll.call_count) + # Periodic sync + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') @patch.object(central_rpcapi.CentralAPI, 'update_status') @patch.object(central_rpcapi.CentralAPI, 'find_zones') @@ -519,3 +527,123 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): # the first updated zone is now in ERROR status self.assertEqual(1, self.service.update_zone.call_count) self.assertEqual(1, mock_cent_update_status.call_count) + + # Periodic recovery + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + def test_periodic_recovery(self, mock_find_zones, + mock_cent_update_status, *a): + + def mock_get_failed_zones(ctx, action): + if action == pm_module.DELETE_ACTION: + return self._build_zones(3, 'DELETE', 'ERROR') + if action == pm_module.CREATE_ACTION: + return self._build_zones(4, 'CREATE', 'ERROR') + if action == pm_module.UPDATE_ACTION: + return self._build_zones(5, 'UPDATE', 'ERROR') + + self.service._get_failed_zones = mock_get_failed_zones + self.service.delete_zone = Mock() + self.service.create_zone = Mock() + self.service.update_zone = Mock() + + self.service.periodic_recovery() + + self.assertEqual(3, self.service.delete_zone.call_count) + self.assertEqual(4, self.service.create_zone.call_count) + self.assertEqual(5, self.service.update_zone.call_count) + + +class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest): + + def setUp(self): + super(PoolManagerServiceEndToEndTest, self).setUp() + + def _fetch_all_zones(self): + """Fetch all zones including deleted ones + """ + query = tables.zones.select() + return self.storage.session.execute(query).fetchall() + + def _log_all_zones(self, zones, msg=None): + """Log out a summary of zones + """ + if msg: + LOG.debug("--- %s ---" % msg) + cols = ('name', 'status', 'action', 'deleted', 'deleted_at', + 'parent_zone_id') + tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s" + LOG.debug(tpl % cols) + for z in zones: + LOG.debug(tpl % tuple(z[k] for k in cols)) + + def _assert_count_all_zones(self, n): + """Assert count ALL zones including deleted ones + """ + zones = self._fetch_all_zones() + if len(zones) == n: + return + + msg = "failed: %d zones expected, %d found" % (n, len(zones)) + self._log_all_zones(zones, msg=msg) + raise Exception("Unexpected number of zones") + + def _assert_num_failed_zones(self, action, n): + zones = self.service._get_failed_zones( + self.admin_context, action) + if len(zones) != n: + LOG.error("Expected %d failed zones, got %d", n, len(zones)) + self._log_all_zones(zones, msg='listing zones') + self.assertEqual(n, len(zones)) + + def _assert_num_healthy_zones(self, action, n): + criterion = { + 'action': action, + 'pool_id': pm_module.CONF['service:pool_manager'].pool_id, + 'status': '!%s' % pm_module.ERROR_STATUS + } + zones = self.service.central_api.find_zones(self.admin_context, + criterion) + if len(zones) != n: + LOG.error("Expected %d healthy zones, got %d", n, len(zones)) + self._log_all_zones(zones, msg='listing zones') + self.assertEqual(n, len(zones)) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + def test_periodic_sync_and_recovery( + self, mock_cent_update_status, *a): + # Periodic sync + recovery + + # Create healthy zones, run a periodic sync that will fail + self.create_zone(name='created.example.com.') + self._assert_num_healthy_zones(pm_module.CREATE_ACTION, 1) + + z = self.create_zone(name='updated.example.net.') + z.email = 'info@example.net' + self.service.central_api.update_zone(self.admin_context, z) + self._assert_num_healthy_zones(pm_module.UPDATE_ACTION, 1) + + with patch.object(self.service, '_update_zone_on_target', + return_value=False): + self.service.periodic_sync() + + zones = self.service._fetch_healthy_zones(self.admin_context) + self.assertEqual(0, len(zones)) + self._assert_num_failed_zones(pm_module.CREATE_ACTION, 1) + self._assert_num_failed_zones(pm_module.UPDATE_ACTION, 1) + + # Now run a periodic_recovery that will fix the zones + + backends = self.service.target_backends + for tid in self.service.target_backends: + backends[tid].create_zone = Mock() + backends[tid].update_zone = Mock() + backends[tid].delete_zone = Mock() + + self.service.periodic_recovery() + + # There are 2 pool targets in use + for backend in self.service.target_backends.itervalues(): + self.assertEqual(1, backend.create_zone.call_count) + self.assertEqual(1, backend.update_zone.call_count) diff --git a/designate/tests/unit/__init__.py b/designate/tests/unit/__init__.py index e69de29bb..225125268 100644 --- a/designate/tests/unit/__init__.py +++ b/designate/tests/unit/__init__.py @@ -0,0 +1,54 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Federico Ceratto +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit test utilities +""" + +import six + + +class RoObject(object): + """Read-only object: raise exception on unexpected + __setitem__ or __setattr__ + """ + def __init__(self, d=None, **kw): + if d: + kw.update(d) + + self.__dict__.update(kw) + + def __getitem__(self, k): + try: + return self.__dict__[k] + except KeyError: + raise NotImplementedError( + "Attempt to perform __getitem__" + " %r on RoObject %r" % (k, self.__dict__) + ) + + def __setitem__(self, k, v): + raise NotImplementedError( + "Attempt to perform __setitem__ or __setattr__" + " %r on RoObject %r" % (k, self.__dict__) + ) + + def __setattr__(self, k, v): + self.__setitem__(k, v) + + def __iter__(self): + for k in six.iterkeys(self.__dict__): + yield k, self.__dict__[k] diff --git a/designate/tests/unit/test_pool_manager/__init__.py b/designate/tests/unit/test_pool_manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py index 4432737ec..398a3ef7c 100644 --- a/designate/tests/unit/test_pool_manager/test_service.py +++ b/designate/tests/unit/test_pool_manager/test_service.py @@ -14,6 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +""" +Unit tests +""" + from mock import Mock from mock import MagicMock from mock import patch @@ -22,9 +26,11 @@ from oslotest import base as test from designate import exceptions from designate import objects from designate.pool_manager.service import Service +from designate.tests.unit import RoObject +import designate.pool_manager.service as pm_module -class PoolManagerTest(test.BaseTestCase): +class PoolManagerInitTest(test.BaseTestCase): def __setUp(self): super(PoolManagerTest, self).setUp() @@ -56,3 +62,79 @@ class PoolManagerTest(test.BaseTestCase): call2 = pm.tg.add_timer.call_args_list[1][0] self.assertEqual(1800, call2[0]) self.assertEqual(1800, call2[-1]) + + +class PoolManagerTest(test.BaseTestCase): + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + @patch.object(pm_module.central_api.CentralAPI, 'get_instance') + @patch.object(objects.Pool, 'from_config') + @patch.object(Service, '_setup_target_backends') + def setUp(self, *mocks): + super(PoolManagerTest, self).setUp() + self.pm = Service() + self.pm.pool.targets = () + self.pm.tg.add_timer = Mock() + self.pm._pool_election = Mock() + self.pm.target_backends = {} + + def test_get_failed_zones(self, *mocks): + with patch.object(self.pm.central_api, 'find_zones') as \ + mock_find_zones: + self.pm._get_failed_zones('ctx', pm_module.DELETE_ACTION) + + mock_find_zones.assert_called_once_with( + 'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id': + '794ccc2c-d751-44fe-b57f-8894c9f5c842'}) + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + def test_periodic_recover(self, mock_get_ctx, *mocks): + z = RoObject(name='a_zone') + + def mock_get_failed_zones(ctx, action): + if action == pm_module.DELETE_ACTION: + return [z] * 3 + if action == pm_module.CREATE_ACTION: + return [z] * 4 + if action == pm_module.UPDATE_ACTION: + return [z] * 5 + + self.pm._get_failed_zones = mock_get_failed_zones + self.pm.delete_zone = Mock() + self.pm.create_zone = Mock() + self.pm.update_zone = Mock() + mock_ctx = mock_get_ctx.return_value + + self.pm.periodic_recovery() + + self.pm.delete_zone.assert_called_with(mock_ctx, z) + self.assertEqual(3, self.pm.delete_zone.call_count) + self.pm.create_zone.assert_called_with(mock_ctx, z) + self.assertEqual(4, self.pm.create_zone.call_count) + self.pm.update_zone.assert_called_with(mock_ctx, z) + self.assertEqual(5, self.pm.update_zone.call_count) + + @patch.object(pm_module.DesignateContext, 'get_admin_context') + def test_periodic_recover_exception(self, mock_get_ctx, *mocks): + z = RoObject(name='a_zone') + # Raise an exception half through the recovery + + def mock_get_failed_zones(ctx, action): + if action == pm_module.DELETE_ACTION: + return [z] * 3 + if action == pm_module.CREATE_ACTION: + return [z] * 4 + + self.pm._get_failed_zones = mock_get_failed_zones + self.pm.delete_zone = Mock() + self.pm.create_zone = Mock(side_effect=Exception('oops')) + self.pm.update_zone = Mock() + mock_ctx = mock_get_ctx.return_value + + self.pm.periodic_recovery() + + self.pm.delete_zone.assert_called_with(mock_ctx, z) + self.assertEqual(3, self.pm.delete_zone.call_count) + self.pm.create_zone.assert_called_with(mock_ctx, z) + self.assertEqual(1, self.pm.create_zone.call_count) + self.assertEqual(0, self.pm.update_zone.call_count)