Updated status logic to always NOTIFY on change

In this patch we revise the workflow after the zone api has
finished updating the upstream dns servers after a change. The goal
is to fix issues with actions that would overwrite other actions,
potentially causing zone change notifications to not be sent in a
timely manner.

Additional changes.
- Changed update_status method args
- Improved unit test coverage of multiple code paths.

Change-Id: I5d566588be66e9ed0df9484e36504a69b4f4b5a9
This commit is contained in:
Erik Olof Gunnar Andersson 2022-01-05 21:48:15 -08:00 committed by Dr. Jens Harbott
parent 0c7d218ba1
commit 3c495ed76d
9 changed files with 288 additions and 108 deletions

View File

@ -62,8 +62,9 @@ class CentralAPI(object):
6.0 - Renamed domains to zones
6.1 - Add ServiceStatus methods
6.2 - Changed 'find_recordsets' method args
6.3 - Changed 'update_status' method args
"""
RPC_API_VERSION = '6.2'
RPC_API_VERSION = '6.3'
# This allows us to mark some methods as not logged.
# This can be for a few reasons - some methods my not actually call over
@ -76,7 +77,7 @@ class CentralAPI(object):
target = messaging.Target(topic=self.topic,
version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='6.2')
self.client = rpc.get_client(target, version_cap='6.3')
@classmethod
def get_instance(cls):
@ -351,10 +352,9 @@ class CentralAPI(object):
def delete_pool(self, context, pool_id):
return self.client.call(context, 'delete_pool', pool_id=pool_id)
# Pool Manager Integration Methods
def update_status(self, context, zone_id, status, serial):
def update_status(self, context, zone_id, status, serial, action=None):
self.client.cast(context, 'update_status', zone_id=zone_id,
status=status, serial=serial)
status=status, serial=serial, action=action)
# Zone Ownership Transfers
def create_zone_transfer_request(self, context, zone_transfer_request):

View File

@ -185,7 +185,7 @@ def notification(notification_type):
class Service(service.RPCService):
RPC_API_VERSION = '6.2'
RPC_API_VERSION = '6.3'
target = messaging.Target(version=RPC_API_VERSION)
@ -2637,38 +2637,58 @@ class Service(service.RPCService):
@notification('dns.zone.update')
@transaction
@synchronized_zone()
def update_status(self, context, zone_id, status, serial):
def update_status(self, context, zone_id, status, serial, action=None):
"""
:param context: Security context information.
:param zone_id: The ID of the designate zone.
:param status: The status, 'SUCCESS' or 'ERROR'.
:param serial: The consensus serial number for the zone.
:return: updated zone
"""
# TODO(kiall): If the status is SUCCESS and the zone is already ACTIVE,
# we likely don't need to do anything.
zone = self._update_zone_status(context, zone_id, status, serial)
self._update_record_status(context, zone_id, status, serial)
return zone
def _update_zone_status(self, context, zone_id, status, serial):
"""Update zone status in storage
:param action: The action, 'CREATE', 'UPDATE', 'DELETE' or 'NONE'.
:return: updated zone
"""
zone = self.storage.get_zone(context, zone_id)
if action is None or zone.action == action:
if zone.action == 'DELETE' and zone.status != 'ERROR':
status = 'NO_ZONE'
zone = self._update_zone_or_record_status(
zone, status, serial
)
else:
LOG.debug(
'Updated action different from current action. '
'%(previous_action)s != %(current_action)s '
'(%(status)s). Keeping current action %(current_action)s '
'for %(zone_id)s',
{
'previous_action': action,
'current_action': zone.action,
'status': zone.status,
'zone_id': zone.id,
}
)
zone, deleted = self._update_zone_or_record_status(
zone, status, serial)
if zone.status != 'DELETED':
LOG.debug('Setting zone %s, serial %s: action %s, status %s',
zone.id, zone.serial, zone.action, zone.status)
if zone.status == 'DELETED':
LOG.debug(
'Updated Status: Deleting %(zone_id)s',
{
'zone_id': zone.id,
}
)
self.storage.delete_zone(context, zone.id)
else:
LOG.debug(
'Setting Zone: %(zone_id)s action: %(action)s '
'status: %(status)s serial: %(serial)s',
{
'zone_id': zone.id,
'action': zone.action,
'status': zone.status,
'serial': zone.serial,
}
)
self.storage.update_zone(context, zone)
if deleted:
LOG.debug('update_status: deleting %s', zone.name)
self.storage.delete_zone(context, zone.id)
self._update_record_status(context, zone_id, status, serial)
return zone
@ -2700,8 +2720,7 @@ class Service(service.RPCService):
records = self.storage.find_records(context, criterion=criterion)
for record in records:
record, deleted = self._update_zone_or_record_status(
record, status, serial)
record = self._update_zone_or_record_status(record, status, serial)
if record.obj_what_changed():
LOG.debug('Setting record %s, serial %s: action %s, '
@ -2712,7 +2731,7 @@ class Service(service.RPCService):
# TODO(Ron): Including this to retain the current logic.
# We should NOT be deleting records. The record status should
# be used to indicate the record has been deleted.
if deleted:
if record.status == 'DELETED':
LOG.debug('Deleting record %s, serial %s: action %s, '
'status %s', record.id, record.serial,
record.action, record.status)
@ -2728,23 +2747,19 @@ class Service(service.RPCService):
@staticmethod
def _update_zone_or_record_status(zone_or_record, status, serial):
deleted = False
if status == 'SUCCESS':
if zone_or_record.action in ['CREATE', 'UPDATE'] \
and zone_or_record.status in ['PENDING', 'ERROR'] \
and serial >= zone_or_record.serial:
zone_or_record.action = 'NONE'
zone_or_record.status = 'ACTIVE'
elif zone_or_record.action == 'DELETE' \
and zone_or_record.status in ['PENDING', 'ERROR'] \
and serial >= zone_or_record.serial:
zone_or_record.action = 'NONE'
zone_or_record.status = 'DELETED'
deleted = True
if (zone_or_record.status in ['PENDING', 'ERROR'] and
serial >= zone_or_record.serial):
if zone_or_record.action in ['CREATE', 'UPDATE']:
zone_or_record.action = 'NONE'
zone_or_record.status = 'ACTIVE'
elif zone_or_record.action == 'DELETE':
zone_or_record.action = 'NONE'
zone_or_record.status = 'DELETED'
elif status == 'ERROR':
if zone_or_record.status == 'PENDING' \
and (serial >= zone_or_record.serial or serial == 0):
if (zone_or_record.status == 'PENDING' and
(serial >= zone_or_record.serial or serial == 0)):
zone_or_record.status = 'ERROR'
elif status == 'NO_ZONE':
@ -2754,9 +2769,8 @@ class Service(service.RPCService):
elif zone_or_record.action == 'DELETE':
zone_or_record.action = 'NONE'
zone_or_record.status = 'DELETED'
deleted = True
return zone_or_record, deleted
return zone_or_record
# Zone Transfers
def _transfer_key_generator(self, size=8):

View File

@ -249,19 +249,16 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
sort_dir='asc',
)
LOG.debug(
"Performing delayed NOTIFY for %(start)s to %(end)s: %(n)d",
{
'start': pstart,
'end': pend,
'n': len(zones)
}
)
for zone in zones:
self.zone_api.update_zone(ctxt, zone)
zone.delayed_notify = False
self.central_api.update_zone(ctxt, zone)
LOG.debug(
'Performed delayed NOTIFY for %(id)s',
{
'id': zone.id,
}
)
class WorkerPeriodicRecovery(PeriodicTask):

View File

@ -369,6 +369,32 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
self._assert_exception('timeout', 504, self.client.get, url)
def test_get_deleted_recordsets(self):
zone = self.create_zone(fixture=1)
recordset = self.create_recordset(zone)
url = '/zones/%s/recordsets' % zone['id']
response = self.client.get(url)
# Check the headers are what we expect
self.assertEqual(200, response.status_int)
# Now delete the recordset
url = '/zones/%s/recordsets/%s' % (zone['id'], recordset.id)
self.client.delete(url, status=202)
# Simulate the zone having been deleted on the backend
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE'
)
# Try to get the record and ensure that we get a
# recordset_not_found error
self._assert_exception('recordset_not_found', 404, self.client.get,
url)
def test_get_deleted_recordset_after_deleting_zone(self):
zone = self.create_zone(fixture=1)
self.create_recordset(zone)
url = '/zones/%s/recordsets' % zone['id']
@ -378,16 +404,18 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
# Check the headers are what we expect
self.assertEqual(200, response.status_int)
# now delete the zone and get the recordsets
# Now delete the zone
self.client.delete('/zones/%s' % zone['id'], status=202)
# Simulate the zone having been deleted on the backend
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], "SUCCESS", zone_serial)
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'DELETE'
)
# Check that we get a zone_not_found error
# Try to get the record and ensure that we get a
# zone_not_found error
self._assert_exception('zone_not_found', 404, self.client.get, url)
def test_get_recordset(self):

View File

@ -28,6 +28,7 @@ from oslo_db import exception as db_exception
from oslo_log import log as logging
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_utils import timeutils
from oslo_versionedobjects import exception as ovo_exc
import testtools
from testtools.matchers import GreaterThan
@ -2550,7 +2551,7 @@ class CentralServiceTest(CentralTestCase):
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
self.central_service.update_status(
elevated_a, zone_id, "SUCCESS", zone_serial)
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
self.network_api.fake.deallocate_floatingip(fip['id'])
@ -2661,7 +2662,7 @@ class CentralServiceTest(CentralTestCase):
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
self.central_service.update_status(
elevated_a, zone_id, "SUCCESS", zone_serial)
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
count = self.central_service.count_records(
elevated_a, {'managed_resource_id': fip['id']})
@ -2682,7 +2683,7 @@ class CentralServiceTest(CentralTestCase):
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
self.central_service.update_status(
elevated_a, zone_id, "SUCCESS", zone_serial)
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
count = self.central_service.count_records(
elevated_a, {'managed_resource_id': fip['id']})
@ -3127,7 +3128,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], "SUCCESS", zone_serial)
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'DELETE')
# Fetch the zone again, ensuring an exception is raised
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
@ -3151,7 +3152,7 @@ class CentralServiceTest(CentralTestCase):
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], "SUCCESS", zone_serial)
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE')
# Fetch the record again, ensuring an exception is raised
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
@ -3162,6 +3163,131 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0])
def test_update_status_create_zone(self):
zone = self.create_zone()
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id']
)
self.assertEqual('CREATE', updated_zone.action)
self.assertEqual('PENDING', updated_zone.status)
ns_recordsets = self.central_service.find_recordset(
self.admin_context,
criterion={'zone_id': updated_zone.id, 'type': 'NS'}
)
self.assertEqual('CREATE', ns_recordsets.action)
self.assertEqual('PENDING', ns_recordsets.status)
soa_recordsets = self.central_service.find_recordset(
self.admin_context,
criterion={'zone_id': updated_zone.id, 'type': 'SOA'}
)
self.assertEqual('CREATE', soa_recordsets.action)
self.assertEqual('PENDING', soa_recordsets.status)
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS',
timeutils.utcnow_ts() + 1, 'CREATE',
)
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id']
)
self.assertEqual('NONE', updated_zone.action)
self.assertEqual('ACTIVE', updated_zone.status)
ns_recordsets = self.central_service.find_recordset(
self.admin_context,
criterion={'zone_id': updated_zone.id, 'type': 'NS'}
)
self.assertEqual('NONE', ns_recordsets.action)
self.assertEqual('ACTIVE', ns_recordsets.status)
soa_recordsets = self.central_service.find_recordset(
self.admin_context,
criterion={'zone_id': updated_zone.id, 'type': 'SOA'}
)
self.assertEqual('NONE', soa_recordsets.action)
self.assertEqual('ACTIVE', soa_recordsets.status)
def test_update_status_create_record_before_zone_finished(self):
zone = self.create_zone()
self.assertEqual('CREATE', zone.action)
self.assertEqual('PENDING', zone.status)
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id']
)
self.assertEqual('CREATE', updated_zone.action)
self.assertEqual('PENDING', updated_zone.status)
recordset = objects.RecordSet(
name='www.%s' % zone.name,
type='A',
records=objects.RecordList(objects=[
objects.Record(data='127.0.0.1'),
objects.Record(data='127.0.0.2'),
]),
)
# Create a recordset before the zone is properly setup.
updated_recordset = self.central_service.create_recordset(
self.admin_context, zone['id'], recordset
)
self.assertEqual('CREATE', updated_recordset.action)
self.assertEqual('PENDING', updated_recordset.status)
# Finish setting up the zone.
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS',
timeutils.utcnow_ts() + 1, 'CREATE',
)
# Check that we are still waiting for the zone to be updated
# with the new recordset.
# It's likely that the DNS server already knows about the recordset,
# but there is also a chance that the recordset wasn't ready yet
# and is missing from the upstream DNS servers.
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id']
)
self.assertEqual('UPDATE', updated_zone.action)
self.assertEqual('PENDING', updated_zone.status)
updated_recordset = self.central_service.get_recordset(
self.admin_context, zone['id'], recordset['id']
)
# The recordset is already marked as ACTIVE, since we cannot
# tell if the record was created / updated successfully or not at this
# stage, also Zone NS and SOA records always needs to be marked ACTIVE.
# Safer to just mark all records as ACTIVE.
self.assertEqual('NONE', updated_recordset.action)
self.assertEqual('ACTIVE', updated_recordset.status)
# We just got the word from the worker that both the zone and the
# recordset are known to the upstream DNS servers now.
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS',
timeutils.utcnow_ts() + 2, 'UPDATE',
)
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id']
)
self.assertEqual('NONE', updated_zone.action)
self.assertEqual('ACTIVE', updated_zone.status)
updated_recordset = self.central_service.get_recordset(
self.admin_context, zone['id'], recordset['id']
)
self.assertEqual('NONE', updated_recordset.action)
self.assertEqual('ACTIVE', updated_recordset.status)
@mock.patch.object(notifier.Notifier, "info")
def test_update_status_send_notification(self, mock_notifier):
@ -3184,7 +3310,7 @@ class CentralServiceTest(CentralTestCase):
# have been sent and the zone is now in ACTIVE status
mock_notifier.reset_mock()
self.central_service.update_status(
self.admin_context, zone['id'], "SUCCESS", zone.serial)
self.admin_context, zone['id'], 'SUCCESS', zone.serial, 'CREATE')
self.assertEqual(2, mock_notifier.call_count)
notify_string, notified_zone = mock_notifier.call_args_list[0][0][1:]
@ -3218,7 +3344,7 @@ class CentralServiceTest(CentralTestCase):
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], "SUCCESS", zone_serial)
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE')
# Fetch the record again, ensuring an exception is raised
exc = self.assertRaises(rpc_dispatcher.ExpectedException,

View File

@ -2151,18 +2151,39 @@ class CentralZoneExportTests(CentralBasic):
class CentralStatusTests(CentralBasic):
def test_update_zone_or_record_status_no_zone(self):
zone = RwObject(
action='UPDATE',
id='uuid',
action='CREATE',
status='SUCCESS',
serial=0,
)
dom, deleted = self.service._update_zone_or_record_status(
zone, 'NO_ZONE', 0)
self.assertEqual(dom.action, 'CREATE')
self.assertEqual(dom.status, 'ERROR')
self.service.storage.get_zone.return_value = zone
self.service.storage.find_records.return_value = []
new_zone = self.service.update_status(
self.context, zone.id, 'NO_ZONE', 0, 'CREATE')
self.assertEqual(new_zone.action, 'CREATE')
self.assertEqual(new_zone.status, 'ERROR')
def test_update_zone_or_record_status_handle_update_after_create(self):
zone = RwObject(
id='uuid',
action='UPDATE',
status='PENDING',
serial=0,
)
self.service.storage.get_zone.return_value = zone
self.service.storage.find_records.return_value = []
new_zone = self.service.update_status(
self.context, zone.id, 'PENDING', 0, 'CREATE')
self.assertEqual(new_zone.action, 'UPDATE')
self.assertEqual(new_zone.status, 'PENDING')
class CentralQuotaTest(unittest.TestCase):

View File

@ -311,12 +311,6 @@ class TestZoneActor(oslotest.base.BaseTestCase):
mock.Mock(action='CREATE'),
)
def test_invalid_action(self):
self.assertRaisesRegex(
exceptions.BadAction, 'Unexpected action: BAD',
self.actor._validate_action, 'BAD'
)
def test_threshold_from_config(self):
actor = zone.ZoneActor(
self.executor, self.context, self.pool, mock.Mock(action='CREATE')
@ -598,19 +592,21 @@ class TestUpdateStatus(oslotest.base.BaseTestCase):
def test_call_on_delete(self):
self.task.zone.action = 'DELETE'
self.task()
self.assertEqual('NONE', self.task.zone.action)
self.assertEqual('NO_ZONE', self.task.zone.status)
self.assertTrue(self.task.central_api.update_status.called)
def test_call_on_success(self):
self.task.zone.status = 'SUCCESS'
self.task()
self.assertEqual('NONE', self.task.zone.action)
self.assertEqual('DELETE', self.task.zone.action)
self.assertEqual('SUCCESS', self.task.zone.status)
self.assertTrue(self.task.central_api.update_status.called)
def test_call_on_success(self):
self.task.zone.action = 'UPDATE'
self.task.zone.status = 'SUCCESS'
self.task()
self.assertEqual('UPDATE', self.task.zone.action)
self.assertTrue(self.task.central_api.update_status.called)
def test_call_central_call(self):
@ -623,6 +619,7 @@ class TestUpdateStatus(oslotest.base.BaseTestCase):
self.task.zone.id,
self.task.zone.status,
self.task.zone.serial,
self.task.zone.action,
)
def test_call_on_delete_error(self):

View File

@ -78,11 +78,11 @@ class ZoneActionOnTarget(base.Task):
if self.action == 'CREATE':
self.target.backend.create_zone(self.context, self.zone)
SendNotify(self.executor, self.zone, self.target)()
elif self.action == 'UPDATE':
self.target.backend.update_zone(self.context, self.zone)
SendNotify(self.executor, self.zone, self.target)()
elif self.action == 'DELETE':
self.target.backend.delete_zone(self.context, self.zone)
else:
self.target.backend.update_zone(self.context, self.zone)
SendNotify(self.executor, self.zone, self.target)()
LOG.debug("Successful %s zone %s on %s",
self.action, self.zone.name, self.target)
@ -160,10 +160,6 @@ class ZoneActor(base.Task, ThresholdMixin):
self.pool = pool
self.zone = zone
def _validate_action(self, action):
if action not in ['CREATE', 'UPDATE', 'DELETE']:
raise exceptions.BadAction('Unexpected action: %s' % action)
def _execute(self):
results = self.executor.run([
ZoneActionOnTarget(self.executor, self.context, self.zone, target)
@ -193,7 +189,6 @@ class ZoneActor(base.Task, ThresholdMixin):
return True
def __call__(self):
self._validate_action(self.zone.action)
results = self._execute()
return self._threshold_met(results)
@ -496,25 +491,20 @@ class UpdateStatus(base.Task):
self.context = context
def __call__(self):
# TODO(timsim): Fix this when central's logic is sane
if self.zone.action == 'DELETE' and self.zone.status != 'ERROR':
self.zone.action = 'NONE'
self.zone.status = 'NO_ZONE'
if self.zone.status == 'SUCCESS':
self.zone.action = 'NONE'
# This log message will always have action as NONE and then we
# don't use the action in the update_status call.
LOG.debug('Updating status for %(zone)s to %(status)s:%(action)s',
{'zone': self.zone.name, 'status': self.zone.status,
'action': self.zone.action})
LOG.debug(
'Updating status for %(zone)s to %(status)s:%(action)s', {
'zone': self.zone.name,
'status': self.zone.status,
'action': self.zone.action,
}
)
self.central_api.update_status(
self.context,
self.zone.id,
self.zone.status,
self.zone.serial
self.zone.serial,
self.zone.action,
)

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
The Designate Central RPC API version was updated and may not be backwards
compatible with an older version of the Designate RPC API. When upgrading
a multi-controller deployment make sure that all designate-central services
are upgraded before starting the designate-worker service.