diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index ca29a4122..abfedba33 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -138,18 +138,16 @@ class Service(service.RPCService, coordination.CoordinationMixin, self._pool_election.start() if CONF['service:pool_manager'].enable_recovery_timer: - LOG.info(_LI('Starting periodic recovery timer')) - self.tg.add_timer( - CONF['service:pool_manager'].periodic_recovery_interval, - self.periodic_recovery, - CONF['service:pool_manager'].periodic_recovery_interval) + interval = CONF['service:pool_manager'].periodic_recovery_interval + LOG.info(_LI('Starting periodic recovery timer every' + ' %(interval)s s') % {'interval': interval}) + self.tg.add_timer(interval, self.periodic_recovery, interval) if CONF['service:pool_manager'].enable_sync_timer: - LOG.info(_LI('Starting periodic synchronization timer')) - self.tg.add_timer( - CONF['service:pool_manager'].periodic_sync_interval, - self.periodic_sync, - CONF['service:pool_manager'].periodic_sync_interval) + interval = CONF['service:pool_manager'].periodic_sync_interval + LOG.info(_LI('Starting periodic synchronization timer every' + ' %(interval)s s') % {'interval': interval}) + self.tg.add_timer(interval, self.periodic_sync, interval) def stop(self): self._pool_election.stop() @@ -170,72 +168,81 @@ class Service(service.RPCService, coordination.CoordinationMixin, # Periodioc Tasks def periodic_recovery(self): """ + Runs only on the pool leader :return: None """ # NOTE(kiall): Only run this periodic task on the pool leader - if self._pool_election.is_leader: - context = DesignateContext.get_admin_context(all_tenants=True) + if not self._pool_election.is_leader: + return - LOG.debug("Starting Periodic Recovery") + context = DesignateContext.get_admin_context(all_tenants=True) + LOG.debug("Starting Periodic Recovery") - try: - # Handle Deletion Failures - zones = self._get_failed_zones(context, DELETE_ACTION) + try: + # Handle Deletion Failures + zones = self._get_failed_zones(context, DELETE_ACTION) - for zone in zones: - self.delete_zone(context, zone) + for zone in zones: + self.delete_zone(context, zone) - # Handle Creation Failures - zones = self._get_failed_zones(context, CREATE_ACTION) + # Handle Creation Failures + zones = self._get_failed_zones(context, CREATE_ACTION) - for zone in zones: - self.create_zone(context, zone) + for zone in zones: + self.create_zone(context, zone) - # Handle Update Failures - zones = self._get_failed_zones(context, UPDATE_ACTION) + # Handle Update Failures + zones = self._get_failed_zones(context, UPDATE_ACTION) - for zone in zones: - self.update_zone(context, zone) + for zone in zones: + self.update_zone(context, zone) - except Exception: - LOG.exception(_LE('An unhandled exception in periodic ' - 'recovery occurred')) + except Exception: + LOG.exception(_LE('An unhandled exception in periodic ' + 'recovery occurred')) def periodic_sync(self): - """ + """Periodically sync all the zones that are not in ERROR status + Runs only on the pool leader :return: None """ - # NOTE(kiall): Only run this periodic task on the pool leader - if self._pool_election.is_leader: - context = DesignateContext.get_admin_context(all_tenants=True) + if not self._pool_election.is_leader: + return - LOG.debug("Starting Periodic Synchronization") + context = DesignateContext.get_admin_context(all_tenants=True) - criterion = { - 'pool_id': CONF['service:pool_manager'].pool_id, - 'status': '!%s' % ERROR_STATUS - } + LOG.debug("Starting Periodic Synchronization") - periodic_sync_seconds = \ - CONF['service:pool_manager'].periodic_sync_seconds + criterion = { + 'pool_id': CONF['service:pool_manager'].pool_id, + 'status': '!%s' % ERROR_STATUS + } - if periodic_sync_seconds is not None: - # Generate the current serial, will provide a UTC Unix TS. - current = utils.increment_serial() - criterion['serial'] = ">%s" % (current - periodic_sync_seconds) + periodic_sync_seconds = \ + CONF['service:pool_manager'].periodic_sync_seconds - zones = self.central_api.find_zones(context, criterion) + if periodic_sync_seconds is not None: + # Generate the current serial, will provide a UTC Unix TS. + current = utils.increment_serial() + criterion['serial'] = ">%s" % (current - periodic_sync_seconds) - try: - for zone in zones: - # TODO(kiall): If the zone was created within the last - # periodic_sync_seconds, attempt to recreate - # to fill in targets which may have failed. - self.update_zone(context, zone) + zones = self.central_api.find_zones(context, criterion) - except Exception: - LOG.exception(_LE('An unhandled exception in periodic ' - 'synchronization occurred.')) + try: + for zone in zones: + # TODO(kiall): If the zone was created within the last + # periodic_sync_seconds, attempt to recreate + # to fill in targets which may have failed. + success = self.update_zone(context, zone) + if not success: + self.central_api.update_status(context, zone.id, + ERROR_STATUS, zone.serial) + + except Exception: + LOG.exception(_LE('An unhandled exception in periodic ' + 'synchronization occurred.')) + self.central_api.update_status(context, zone.id, ERROR_STATUS, + zone.serial) # Standard Create/Update/Delete Methods @@ -314,32 +321,30 @@ class Service(service.RPCService, coordination.CoordinationMixin, return False def update_zone(self, context, zone): - """ + """Update a zone across every pool target, check for consensus and + for propagation. :param context: Security context information. :param zone: Zone to be updated - :return: None + :return: consensus reached (bool) """ LOG.info(_LI("Updating zone %s"), zone.name) - results = [] - # Update the zone on each of the Pool Targets + success_count = 0 for target in self.pool.targets: - results.append( - self._update_zone_on_target(context, target, zone)) + ok_status = self._update_zone_on_target(context, target, zone) + if ok_status: + success_count += 1 - if self._exceed_or_meet_threshold(results.count(True)): - LOG.debug('Consensus reached for updating zone %(zone)s ' - 'on pool targets' % {'zone': zone.name}) - - else: + if not self._exceed_or_meet_threshold(success_count): LOG.warn(_LW('Consensus not reached for updating zone %(zone)s' ' on pool targets') % {'zone': zone.name}) + self.central_api.update_status(context, zone.id, ERROR_STATUS, + zone.serial) + return False - self.central_api.update_status( - context, zone.id, ERROR_STATUS, zone.serial) - - return + LOG.debug('Consensus reached for updating zone %(zone)s ' + 'on pool targets' % {'zone': zone.name}) # Send a NOTIFY to each also-notifies for also_notify in self.pool.also_notifies: @@ -360,8 +365,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, context, zone, nameserver, self.timeout, self.retry_interval, self.max_retries, self.delay) + return True + def _update_zone_on_target(self, context, target, zone): - """ + """Instruct the appropriate backend to update a zone on a target :param context: Security context information. :param target: Target to update Zone on :param zone: Zone to be updated @@ -535,7 +542,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, criterion = { 'pool_id': CONF['service:pool_manager'].pool_id, 'action': action, - 'status': 'ERROR' + 'status': ERROR_STATUS } return self.central_api.find_zones(context, criterion) @@ -548,10 +555,12 @@ class Service(service.RPCService, coordination.CoordinationMixin, return (Decimal(count) / Decimal(total_count)) * Decimal(100) def _exceed_or_meet_threshold(self, count, threshold=None): + """Evaluate if count / the number of pool targets >= threshold + Used to implement consensus + """ threshold = threshold or self.threshold - - return self._percentage( - count, len(self.pool.targets)) >= Decimal(threshold) + perc = self._percentage(count, len(self.pool.targets)) + return perc >= Decimal(threshold) @staticmethod def _get_sorted_serials(pool_manager_statuses, descending=False): @@ -568,6 +577,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, return self._get_sorted_serials(pool_manager_statuses, descending=True) def _is_consensus(self, context, zone, action, status, threshold=None): + """Fetch zone status across all nameservers through MiniDNS and compare + it with the expected `status` + :return: consensus reached (bool) + """ status_count = 0 pool_manager_statuses = self._retrieve_statuses( context, zone, action) @@ -609,6 +622,9 @@ class Service(service.RPCService, coordination.CoordinationMixin, # value the nameserver @staticmethod def _build_status_object(nameserver, zone, action): + """ + :return: :class:`objects.PoolManagerStatus` + """ values = { 'nameserver_id': nameserver.id, 'zone_id': zone.id, @@ -643,6 +659,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, pass def _retrieve_from_mdns(self, context, nameserver, zone, action): + """Instruct MiniDNS to get a zone serial number from a nameserver + Set error status if the zone is unexpectedly missing. + :return: :class:`objects.PoolManagerStatus` or None + """ try: (status, actual_serial, retries) = \ self.mdns_api.get_serial_number( @@ -661,16 +681,16 @@ class Service(service.RPCService, coordination.CoordinationMixin, if status == NO_ZONE_STATUS: if action == CREATE_ACTION: - pool_manager_status.status = 'ERROR' + pool_manager_status.status = ERROR_STATUS elif action == DELETE_ACTION: - pool_manager_status.status = 'SUCCESS' + pool_manager_status.status = SUCCESS_STATUS elif action == UPDATE_ACTION: - pool_manager_status.action = 'CREATE' - pool_manager_status.status = 'ERROR' + pool_manager_status.action = CREATE_ACTION + pool_manager_status.status = ERROR_STATUS else: pool_manager_status.status = status - pool_manager_status.serial_number = actual_serial \ - if actual_serial is not None else 0 + + pool_manager_status.serial_number = actual_serial or 0 LOG.debug('Retrieved status %s and serial %s for zone %s ' 'on nameserver %s with action %s from mdns.' % (pool_manager_status.status, @@ -681,6 +701,11 @@ class Service(service.RPCService, coordination.CoordinationMixin, return pool_manager_status def _retrieve_statuses(self, context, zone, action): + """Instruct MiniDNS to get a zone serial number from all nameservers, + unless a cached value is available. + Set error status if the zone is unexpectedly missing. + :return: list of :class:`objects.PoolManagerStatus` + """ pool_manager_statuses = [] for nameserver in self.pool.nameservers: try: diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py index d50ee40f3..86f8ef90c 100644 --- a/designate/tests/test_pool_manager/test_service.py +++ b/designate/tests/test_pool_manager/test_service.py @@ -13,9 +13,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import uuid + import oslo_messaging as messaging from oslo_config import cfg from mock import call +from mock import Mock from mock import patch from designate import exceptions @@ -103,9 +106,10 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): self.cache = self.service.cache @staticmethod - def _build_zone(name, action, status): + def _build_zone(name, action, status, id=None): + zid = id or '75ea1626-eea7-46b5-acb7-41e5897c2d40' values = { - 'id': '75ea1626-eea7-46b5-acb7-41e5897c2d40', + 'id': zid, 'name': name, 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842', 'action': action, @@ -114,6 +118,13 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): } return objects.Zone.from_dict(values) + def _build_zones(self, n, action, status): + return [ + self._build_zone("zone%02X.example" % cnt, action, + status, id=str(uuid.uuid4())) + for cnt in range(n) + ] + @patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number', side_effect=messaging.MessagingException) @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') @@ -419,3 +430,92 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase): mock_update_status.assert_called_once_with( self.admin_context, zone.id, 'ERROR', 0) + + @patch.object(central_rpcapi.CentralAPI, 'find_zones') + def test_periodic_sync_not_leader(self, mock_find_zones): + self.service._update_zone_on_target = Mock(return_value=False) + self.service._pool_election = Mock() + self.service._pool_election.is_leader = False + self.service.update_zone = Mock() + + self.service.periodic_sync() + self.assertFalse(mock_find_zones.called) + + @patch.object(central_rpcapi.CentralAPI, 'update_status') + def test_update_zone_no_consensus(self, mock_cent_update_status): + zone = self._build_zone('example.org.', 'UPDATE', 'PENDING') + self.service._update_zone_on_target = Mock(return_value=True) + self.service._exceed_or_meet_threshold = Mock(return_value=False) + + ret = self.service.update_zone(self.admin_context, zone) + self.assertFalse(ret) + + self.assertEqual(2, self.service._update_zone_on_target.call_count) + self.assertEqual(1, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') + def test_update_zone(self, mock_mdns_poll): + zone = self._build_zone('example.org.', 'UPDATE', 'PENDING') + self.service._update_zone_on_target = Mock(return_value=True) + self.service._update_zone_on_also_notify = Mock() + self.service.pool.also_notifies = ['bogus'] + self.service._exceed_or_meet_threshold = Mock(return_value=True) + + # cache.retrieve will throw exceptions.PoolManagerStatusNotFound + # mdns_api.poll_for_serial_number will be called twice + ret = self.service.update_zone(self.admin_context, zone) + self.assertTrue(ret) + + self.assertEqual(2, self.service._update_zone_on_target.call_count) + self.assertEqual(1, self.service._update_zone_on_also_notify.call_count) # noqa + self.assertEqual(2, mock_mdns_poll.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_zones') + def test_periodic_sync(self, mock_find_zones, + mock_cent_update_status, *a): + self.service.update_zone = Mock() + mock_find_zones.return_value = self._build_zones(2, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_zones.call_count) + criterion = mock_find_zones.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + self.assertEqual(2, self.service.update_zone.call_count) + self.assertEqual(0, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_zones') + def test_periodic_sync_with_failing_update(self, mock_find_zones, + mock_cent_update_status, *a): + self.service.update_zone = Mock(return_value=False) # fail update + mock_find_zones.return_value = self._build_zones(3, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_zones.call_count) + criterion = mock_find_zones.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + # all zones are now in ERROR status + self.assertEqual(3, self.service.update_zone.call_count) + self.assertEqual(3, mock_cent_update_status.call_count) + + @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + @patch.object(central_rpcapi.CentralAPI, 'find_zones') + def test_periodic_sync_with_failing_update_with_exception( + self, mock_find_zones, mock_cent_update_status, *a): + self.service.update_zone = Mock(side_effect=Exception) + mock_find_zones.return_value = self._build_zones(3, 'UPDATE', + 'PENDING') + self.service.periodic_sync() + + self.assertEqual(1, mock_find_zones.call_count) + criterion = mock_find_zones.call_args_list[0][0][1] + self.assertEqual('!ERROR', criterion['status']) + # the first updated zone is now in ERROR status + self.assertEqual(1, self.service.update_zone.call_count) + self.assertEqual(1, mock_cent_update_status.call_count) diff --git a/designate/tests/unit/test_pool_manager/test_service.py b/designate/tests/unit/test_pool_manager/test_service.py new file mode 100644 index 000000000..4432737ec --- /dev/null +++ b/designate/tests/unit/test_pool_manager/test_service.py @@ -0,0 +1,58 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Federico Ceratto +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mock import Mock +from mock import MagicMock +from mock import patch +from oslotest import base as test + +from designate import exceptions +from designate import objects +from designate.pool_manager.service import Service + + +class PoolManagerTest(test.BaseTestCase): + def __setUp(self): + super(PoolManagerTest, self).setUp() + + def test_init_no_pool_targets(self): + with patch.object(objects.Pool, 'from_config', + return_value=MagicMock()): + self.assertRaises(exceptions.NoPoolTargetsConfigured, Service) + + def test_init(self): + with patch.object(objects.Pool, 'from_config', + return_value=Mock()): + Service._setup_target_backends = Mock() + Service() + + def test_start(self): + with patch.object(objects.Pool, 'from_config', + return_value=Mock()): + Service._setup_target_backends = Mock() + pm = Service() + pm.pool.targets = () + pm.tg.add_timer = Mock() + pm._pool_election = Mock() + with patch("designate.service.RPCService.start"): + pm.start() + + call1 = pm.tg.add_timer.call_args_list[0][0] + self.assertEqual(120, call1[0]) + self.assertEqual(120, call1[-1]) + call2 = pm.tg.add_timer.call_args_list[1][0] + self.assertEqual(1800, call2[0]) + self.assertEqual(1800, call2[-1])