diff --git a/designate/central/rpcapi.py b/designate/central/rpcapi.py index 43184a88c..ccd92d76f 100644 --- a/designate/central/rpcapi.py +++ b/designate/central/rpcapi.py @@ -62,8 +62,9 @@ class CentralAPI(object): 6.0 - Renamed domains to zones 6.1 - Add ServiceStatus methods 6.2 - Changed 'find_recordsets' method args + 6.3 - Changed 'update_status' method args """ - RPC_API_VERSION = '6.2' + RPC_API_VERSION = '6.3' # This allows us to mark some methods as not logged. # This can be for a few reasons - some methods my not actually call over @@ -76,7 +77,7 @@ class CentralAPI(object): target = messaging.Target(topic=self.topic, version=self.RPC_API_VERSION) - self.client = rpc.get_client(target, version_cap='6.2') + self.client = rpc.get_client(target, version_cap='6.3') @classmethod def get_instance(cls): @@ -351,10 +352,9 @@ class CentralAPI(object): def delete_pool(self, context, pool_id): return self.client.call(context, 'delete_pool', pool_id=pool_id) - # Pool Manager Integration Methods - def update_status(self, context, zone_id, status, serial): + def update_status(self, context, zone_id, status, serial, action=None): self.client.cast(context, 'update_status', zone_id=zone_id, - status=status, serial=serial) + status=status, serial=serial, action=action) # Zone Ownership Transfers def create_zone_transfer_request(self, context, zone_transfer_request): diff --git a/designate/central/service.py b/designate/central/service.py index c8820cd64..4f10773da 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -185,7 +185,7 @@ def notification(notification_type): class Service(service.RPCService): - RPC_API_VERSION = '6.2' + RPC_API_VERSION = '6.3' target = messaging.Target(version=RPC_API_VERSION) @@ -2637,38 +2637,58 @@ class Service(service.RPCService): @notification('dns.zone.update') @transaction @synchronized_zone() - def update_status(self, context, zone_id, status, serial): + def update_status(self, context, zone_id, status, serial, action=None): """ :param context: Security context information. :param zone_id: The ID of the designate zone. :param status: The status, 'SUCCESS' or 'ERROR'. :param serial: The consensus serial number for the zone. - :return: updated zone - """ - # TODO(kiall): If the status is SUCCESS and the zone is already ACTIVE, - # we likely don't need to do anything. - zone = self._update_zone_status(context, zone_id, status, serial) - self._update_record_status(context, zone_id, status, serial) - return zone - - def _update_zone_status(self, context, zone_id, status, serial): - """Update zone status in storage - + :param action: The action, 'CREATE', 'UPDATE', 'DELETE' or 'NONE'. :return: updated zone """ zone = self.storage.get_zone(context, zone_id) + if action is None or zone.action == action: + if zone.action == 'DELETE' and zone.status != 'ERROR': + status = 'NO_ZONE' + zone = self._update_zone_or_record_status( + zone, status, serial + ) + else: + LOG.debug( + 'Updated action different from current action. ' + '%(previous_action)s != %(current_action)s ' + '(%(status)s). Keeping current action %(current_action)s ' + 'for %(zone_id)s', + { + 'previous_action': action, + 'current_action': zone.action, + 'status': zone.status, + 'zone_id': zone.id, + } + ) - zone, deleted = self._update_zone_or_record_status( - zone, status, serial) - - if zone.status != 'DELETED': - LOG.debug('Setting zone %s, serial %s: action %s, status %s', - zone.id, zone.serial, zone.action, zone.status) + if zone.status == 'DELETED': + LOG.debug( + 'Updated Status: Deleting %(zone_id)s', + { + 'zone_id': zone.id, + } + ) + self.storage.delete_zone(context, zone.id) + else: + LOG.debug( + 'Setting Zone: %(zone_id)s action: %(action)s ' + 'status: %(status)s serial: %(serial)s', + { + 'zone_id': zone.id, + 'action': zone.action, + 'status': zone.status, + 'serial': zone.serial, + } + ) self.storage.update_zone(context, zone) - if deleted: - LOG.debug('update_status: deleting %s', zone.name) - self.storage.delete_zone(context, zone.id) + self._update_record_status(context, zone_id, status, serial) return zone @@ -2700,8 +2720,7 @@ class Service(service.RPCService): records = self.storage.find_records(context, criterion=criterion) for record in records: - record, deleted = self._update_zone_or_record_status( - record, status, serial) + record = self._update_zone_or_record_status(record, status, serial) if record.obj_what_changed(): LOG.debug('Setting record %s, serial %s: action %s, ' @@ -2712,7 +2731,7 @@ class Service(service.RPCService): # TODO(Ron): Including this to retain the current logic. # We should NOT be deleting records. The record status should # be used to indicate the record has been deleted. - if deleted: + if record.status == 'DELETED': LOG.debug('Deleting record %s, serial %s: action %s, ' 'status %s', record.id, record.serial, record.action, record.status) @@ -2728,23 +2747,19 @@ class Service(service.RPCService): @staticmethod def _update_zone_or_record_status(zone_or_record, status, serial): - deleted = False if status == 'SUCCESS': - if zone_or_record.action in ['CREATE', 'UPDATE'] \ - and zone_or_record.status in ['PENDING', 'ERROR'] \ - and serial >= zone_or_record.serial: - zone_or_record.action = 'NONE' - zone_or_record.status = 'ACTIVE' - elif zone_or_record.action == 'DELETE' \ - and zone_or_record.status in ['PENDING', 'ERROR'] \ - and serial >= zone_or_record.serial: - zone_or_record.action = 'NONE' - zone_or_record.status = 'DELETED' - deleted = True + if (zone_or_record.status in ['PENDING', 'ERROR'] and + serial >= zone_or_record.serial): + if zone_or_record.action in ['CREATE', 'UPDATE']: + zone_or_record.action = 'NONE' + zone_or_record.status = 'ACTIVE' + elif zone_or_record.action == 'DELETE': + zone_or_record.action = 'NONE' + zone_or_record.status = 'DELETED' elif status == 'ERROR': - if zone_or_record.status == 'PENDING' \ - and (serial >= zone_or_record.serial or serial == 0): + if (zone_or_record.status == 'PENDING' and + (serial >= zone_or_record.serial or serial == 0)): zone_or_record.status = 'ERROR' elif status == 'NO_ZONE': @@ -2754,9 +2769,8 @@ class Service(service.RPCService): elif zone_or_record.action == 'DELETE': zone_or_record.action = 'NONE' zone_or_record.status = 'DELETED' - deleted = True - return zone_or_record, deleted + return zone_or_record # Zone Transfers def _transfer_key_generator(self, size=8): diff --git a/designate/producer/tasks.py b/designate/producer/tasks.py index 9fc841487..986fd6d01 100644 --- a/designate/producer/tasks.py +++ b/designate/producer/tasks.py @@ -249,19 +249,16 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): sort_dir='asc', ) - LOG.debug( - "Performing delayed NOTIFY for %(start)s to %(end)s: %(n)d", - { - 'start': pstart, - 'end': pend, - 'n': len(zones) - } - ) - for zone in zones: self.zone_api.update_zone(ctxt, zone) zone.delayed_notify = False self.central_api.update_zone(ctxt, zone) + LOG.debug( + 'Performed delayed NOTIFY for %(id)s', + { + 'id': zone.id, + } + ) class WorkerPeriodicRecovery(PeriodicTask): diff --git a/designate/tests/test_api/test_v2/test_recordsets.py b/designate/tests/test_api/test_v2/test_recordsets.py index 933eaaa10..50aeec9d8 100644 --- a/designate/tests/test_api/test_v2/test_recordsets.py +++ b/designate/tests/test_api/test_v2/test_recordsets.py @@ -369,6 +369,32 @@ class ApiV2RecordSetsTest(ApiV2TestCase): self._assert_exception('timeout', 504, self.client.get, url) def test_get_deleted_recordsets(self): + zone = self.create_zone(fixture=1) + recordset = self.create_recordset(zone) + url = '/zones/%s/recordsets' % zone['id'] + + response = self.client.get(url) + + # Check the headers are what we expect + self.assertEqual(200, response.status_int) + + # Now delete the recordset + url = '/zones/%s/recordsets/%s' % (zone['id'], recordset.id) + self.client.delete(url, status=202) + + # Simulate the zone having been deleted on the backend + zone_serial = self.central_service.get_zone( + self.admin_context, zone['id']).serial + self.central_service.update_status( + self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE' + ) + + # Try to get the record and ensure that we get a + # recordset_not_found error + self._assert_exception('recordset_not_found', 404, self.client.get, + url) + + def test_get_deleted_recordset_after_deleting_zone(self): zone = self.create_zone(fixture=1) self.create_recordset(zone) url = '/zones/%s/recordsets' % zone['id'] @@ -378,16 +404,18 @@ class ApiV2RecordSetsTest(ApiV2TestCase): # Check the headers are what we expect self.assertEqual(200, response.status_int) - # now delete the zone and get the recordsets + # Now delete the zone self.client.delete('/zones/%s' % zone['id'], status=202) # Simulate the zone having been deleted on the backend zone_serial = self.central_service.get_zone( self.admin_context, zone['id']).serial self.central_service.update_status( - self.admin_context, zone['id'], "SUCCESS", zone_serial) + self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'DELETE' + ) - # Check that we get a zone_not_found error + # Try to get the record and ensure that we get a + # zone_not_found error self._assert_exception('zone_not_found', 404, self.client.get, url) def test_get_recordset(self): diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py index 7c14116ea..9401a47cb 100644 --- a/designate/tests/test_central/test_service.py +++ b/designate/tests/test_central/test_service.py @@ -28,6 +28,7 @@ from oslo_db import exception as db_exception from oslo_log import log as logging from oslo_messaging.notify import notifier from oslo_messaging.rpc import dispatcher as rpc_dispatcher +from oslo_utils import timeutils from oslo_versionedobjects import exception as ovo_exc import testtools from testtools.matchers import GreaterThan @@ -2550,7 +2551,7 @@ class CentralServiceTest(CentralTestCase): zone_serial = self.central_service.get_zone( elevated_a, zone_id).serial self.central_service.update_status( - elevated_a, zone_id, "SUCCESS", zone_serial) + elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') self.network_api.fake.deallocate_floatingip(fip['id']) @@ -2661,7 +2662,7 @@ class CentralServiceTest(CentralTestCase): zone_serial = self.central_service.get_zone( elevated_a, zone_id).serial self.central_service.update_status( - elevated_a, zone_id, "SUCCESS", zone_serial) + elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') count = self.central_service.count_records( elevated_a, {'managed_resource_id': fip['id']}) @@ -2682,7 +2683,7 @@ class CentralServiceTest(CentralTestCase): zone_serial = self.central_service.get_zone( elevated_a, zone_id).serial self.central_service.update_status( - elevated_a, zone_id, "SUCCESS", zone_serial) + elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') count = self.central_service.count_records( elevated_a, {'managed_resource_id': fip['id']}) @@ -3127,7 +3128,7 @@ class CentralServiceTest(CentralTestCase): self.admin_context, zone['id']).serial self.central_service.update_status( - self.admin_context, zone['id'], "SUCCESS", zone_serial) + self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'DELETE') # Fetch the zone again, ensuring an exception is raised exc = self.assertRaises(rpc_dispatcher.ExpectedException, @@ -3151,7 +3152,7 @@ class CentralServiceTest(CentralTestCase): zone_serial = self.central_service.get_zone( self.admin_context, zone['id']).serial self.central_service.update_status( - self.admin_context, zone['id'], "SUCCESS", zone_serial) + self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE') # Fetch the record again, ensuring an exception is raised exc = self.assertRaises(rpc_dispatcher.ExpectedException, @@ -3162,6 +3163,131 @@ class CentralServiceTest(CentralTestCase): self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0]) + def test_update_status_create_zone(self): + zone = self.create_zone() + + updated_zone = self.central_service.get_zone( + self.admin_context, zone['id'] + ) + self.assertEqual('CREATE', updated_zone.action) + self.assertEqual('PENDING', updated_zone.status) + + ns_recordsets = self.central_service.find_recordset( + self.admin_context, + criterion={'zone_id': updated_zone.id, 'type': 'NS'} + ) + + self.assertEqual('CREATE', ns_recordsets.action) + self.assertEqual('PENDING', ns_recordsets.status) + + soa_recordsets = self.central_service.find_recordset( + self.admin_context, + criterion={'zone_id': updated_zone.id, 'type': 'SOA'} + ) + + self.assertEqual('CREATE', soa_recordsets.action) + self.assertEqual('PENDING', soa_recordsets.status) + + self.central_service.update_status( + self.admin_context, zone['id'], 'SUCCESS', + timeutils.utcnow_ts() + 1, 'CREATE', + ) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone['id'] + ) + self.assertEqual('NONE', updated_zone.action) + self.assertEqual('ACTIVE', updated_zone.status) + + ns_recordsets = self.central_service.find_recordset( + self.admin_context, + criterion={'zone_id': updated_zone.id, 'type': 'NS'} + ) + + self.assertEqual('NONE', ns_recordsets.action) + self.assertEqual('ACTIVE', ns_recordsets.status) + + soa_recordsets = self.central_service.find_recordset( + self.admin_context, + criterion={'zone_id': updated_zone.id, 'type': 'SOA'} + ) + + self.assertEqual('NONE', soa_recordsets.action) + self.assertEqual('ACTIVE', soa_recordsets.status) + + def test_update_status_create_record_before_zone_finished(self): + zone = self.create_zone() + self.assertEqual('CREATE', zone.action) + self.assertEqual('PENDING', zone.status) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone['id'] + ) + self.assertEqual('CREATE', updated_zone.action) + self.assertEqual('PENDING', updated_zone.status) + + recordset = objects.RecordSet( + name='www.%s' % zone.name, + type='A', + records=objects.RecordList(objects=[ + objects.Record(data='127.0.0.1'), + objects.Record(data='127.0.0.2'), + ]), + ) + + # Create a recordset before the zone is properly setup. + updated_recordset = self.central_service.create_recordset( + self.admin_context, zone['id'], recordset + ) + self.assertEqual('CREATE', updated_recordset.action) + self.assertEqual('PENDING', updated_recordset.status) + + # Finish setting up the zone. + self.central_service.update_status( + self.admin_context, zone['id'], 'SUCCESS', + timeutils.utcnow_ts() + 1, 'CREATE', + ) + + # Check that we are still waiting for the zone to be updated + # with the new recordset. + # It's likely that the DNS server already knows about the recordset, + # but there is also a chance that the recordset wasn't ready yet + # and is missing from the upstream DNS servers. + updated_zone = self.central_service.get_zone( + self.admin_context, zone['id'] + ) + self.assertEqual('UPDATE', updated_zone.action) + self.assertEqual('PENDING', updated_zone.status) + + updated_recordset = self.central_service.get_recordset( + self.admin_context, zone['id'], recordset['id'] + ) + # The recordset is already marked as ACTIVE, since we cannot + # tell if the record was created / updated successfully or not at this + # stage, also Zone NS and SOA records always needs to be marked ACTIVE. + # Safer to just mark all records as ACTIVE. + self.assertEqual('NONE', updated_recordset.action) + self.assertEqual('ACTIVE', updated_recordset.status) + + # We just got the word from the worker that both the zone and the + # recordset are known to the upstream DNS servers now. + self.central_service.update_status( + self.admin_context, zone['id'], 'SUCCESS', + timeutils.utcnow_ts() + 2, 'UPDATE', + ) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone['id'] + ) + self.assertEqual('NONE', updated_zone.action) + self.assertEqual('ACTIVE', updated_zone.status) + + updated_recordset = self.central_service.get_recordset( + self.admin_context, zone['id'], recordset['id'] + ) + self.assertEqual('NONE', updated_recordset.action) + self.assertEqual('ACTIVE', updated_recordset.status) + @mock.patch.object(notifier.Notifier, "info") def test_update_status_send_notification(self, mock_notifier): @@ -3184,7 +3310,7 @@ class CentralServiceTest(CentralTestCase): # have been sent and the zone is now in ACTIVE status mock_notifier.reset_mock() self.central_service.update_status( - self.admin_context, zone['id'], "SUCCESS", zone.serial) + self.admin_context, zone['id'], 'SUCCESS', zone.serial, 'CREATE') self.assertEqual(2, mock_notifier.call_count) notify_string, notified_zone = mock_notifier.call_args_list[0][0][1:] @@ -3218,7 +3344,7 @@ class CentralServiceTest(CentralTestCase): zone_serial = self.central_service.get_zone( self.admin_context, zone['id']).serial self.central_service.update_status( - self.admin_context, zone['id'], "SUCCESS", zone_serial) + self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE') # Fetch the record again, ensuring an exception is raised exc = self.assertRaises(rpc_dispatcher.ExpectedException, diff --git a/designate/tests/unit/test_central/test_basic.py b/designate/tests/unit/test_central/test_basic.py index 8d11b59df..b52523fb5 100644 --- a/designate/tests/unit/test_central/test_basic.py +++ b/designate/tests/unit/test_central/test_basic.py @@ -2151,18 +2151,39 @@ class CentralZoneExportTests(CentralBasic): class CentralStatusTests(CentralBasic): - def test_update_zone_or_record_status_no_zone(self): zone = RwObject( - action='UPDATE', + id='uuid', + action='CREATE', status='SUCCESS', serial=0, ) - dom, deleted = self.service._update_zone_or_record_status( - zone, 'NO_ZONE', 0) - self.assertEqual(dom.action, 'CREATE') - self.assertEqual(dom.status, 'ERROR') + self.service.storage.get_zone.return_value = zone + self.service.storage.find_records.return_value = [] + + new_zone = self.service.update_status( + self.context, zone.id, 'NO_ZONE', 0, 'CREATE') + + self.assertEqual(new_zone.action, 'CREATE') + self.assertEqual(new_zone.status, 'ERROR') + + def test_update_zone_or_record_status_handle_update_after_create(self): + zone = RwObject( + id='uuid', + action='UPDATE', + status='PENDING', + serial=0, + ) + + self.service.storage.get_zone.return_value = zone + self.service.storage.find_records.return_value = [] + + new_zone = self.service.update_status( + self.context, zone.id, 'PENDING', 0, 'CREATE') + + self.assertEqual(new_zone.action, 'UPDATE') + self.assertEqual(new_zone.status, 'PENDING') class CentralQuotaTest(unittest.TestCase): diff --git a/designate/tests/unit/workers/test_zone_tasks.py b/designate/tests/unit/workers/test_zone_tasks.py index 4d191d79d..33875379a 100644 --- a/designate/tests/unit/workers/test_zone_tasks.py +++ b/designate/tests/unit/workers/test_zone_tasks.py @@ -311,12 +311,6 @@ class TestZoneActor(oslotest.base.BaseTestCase): mock.Mock(action='CREATE'), ) - def test_invalid_action(self): - self.assertRaisesRegex( - exceptions.BadAction, 'Unexpected action: BAD', - self.actor._validate_action, 'BAD' - ) - def test_threshold_from_config(self): actor = zone.ZoneActor( self.executor, self.context, self.pool, mock.Mock(action='CREATE') @@ -598,19 +592,21 @@ class TestUpdateStatus(oslotest.base.BaseTestCase): def test_call_on_delete(self): self.task.zone.action = 'DELETE' - - self.task() - - self.assertEqual('NONE', self.task.zone.action) - self.assertEqual('NO_ZONE', self.task.zone.status) - self.assertTrue(self.task.central_api.update_status.called) - - def test_call_on_success(self): self.task.zone.status = 'SUCCESS' self.task() - self.assertEqual('NONE', self.task.zone.action) + self.assertEqual('DELETE', self.task.zone.action) + self.assertEqual('SUCCESS', self.task.zone.status) + self.assertTrue(self.task.central_api.update_status.called) + + def test_call_on_success(self): + self.task.zone.action = 'UPDATE' + self.task.zone.status = 'SUCCESS' + + self.task() + + self.assertEqual('UPDATE', self.task.zone.action) self.assertTrue(self.task.central_api.update_status.called) def test_call_central_call(self): @@ -623,6 +619,7 @@ class TestUpdateStatus(oslotest.base.BaseTestCase): self.task.zone.id, self.task.zone.status, self.task.zone.serial, + self.task.zone.action, ) def test_call_on_delete_error(self): diff --git a/designate/worker/tasks/zone.py b/designate/worker/tasks/zone.py index c573d2e18..19a864007 100644 --- a/designate/worker/tasks/zone.py +++ b/designate/worker/tasks/zone.py @@ -78,11 +78,11 @@ class ZoneActionOnTarget(base.Task): if self.action == 'CREATE': self.target.backend.create_zone(self.context, self.zone) SendNotify(self.executor, self.zone, self.target)() - elif self.action == 'UPDATE': - self.target.backend.update_zone(self.context, self.zone) - SendNotify(self.executor, self.zone, self.target)() elif self.action == 'DELETE': self.target.backend.delete_zone(self.context, self.zone) + else: + self.target.backend.update_zone(self.context, self.zone) + SendNotify(self.executor, self.zone, self.target)() LOG.debug("Successful %s zone %s on %s", self.action, self.zone.name, self.target) @@ -160,10 +160,6 @@ class ZoneActor(base.Task, ThresholdMixin): self.pool = pool self.zone = zone - def _validate_action(self, action): - if action not in ['CREATE', 'UPDATE', 'DELETE']: - raise exceptions.BadAction('Unexpected action: %s' % action) - def _execute(self): results = self.executor.run([ ZoneActionOnTarget(self.executor, self.context, self.zone, target) @@ -193,7 +189,6 @@ class ZoneActor(base.Task, ThresholdMixin): return True def __call__(self): - self._validate_action(self.zone.action) results = self._execute() return self._threshold_met(results) @@ -496,25 +491,20 @@ class UpdateStatus(base.Task): self.context = context def __call__(self): - # TODO(timsim): Fix this when central's logic is sane - if self.zone.action == 'DELETE' and self.zone.status != 'ERROR': - self.zone.action = 'NONE' - self.zone.status = 'NO_ZONE' - - if self.zone.status == 'SUCCESS': - self.zone.action = 'NONE' - - # This log message will always have action as NONE and then we - # don't use the action in the update_status call. - LOG.debug('Updating status for %(zone)s to %(status)s:%(action)s', - {'zone': self.zone.name, 'status': self.zone.status, - 'action': self.zone.action}) + LOG.debug( + 'Updating status for %(zone)s to %(status)s:%(action)s', { + 'zone': self.zone.name, + 'status': self.zone.status, + 'action': self.zone.action, + } + ) self.central_api.update_status( self.context, self.zone.id, self.zone.status, - self.zone.serial + self.zone.serial, + self.zone.action, ) diff --git a/releasenotes/notes/rpc-version-update-f87b852b361d0aad.yaml b/releasenotes/notes/rpc-version-update-f87b852b361d0aad.yaml new file mode 100644 index 000000000..444ee1f7d --- /dev/null +++ b/releasenotes/notes/rpc-version-update-f87b852b361d0aad.yaml @@ -0,0 +1,7 @@ +--- +upgrade: + - | + The Designate Central RPC API version was updated and may not be backwards + compatible with an older version of the Designate RPC API. When upgrading + a multi-controller deployment make sure that all designate-central services + are upgraded before starting the designate-worker service.