Move to a batch model for incrementing serial

This patch moves the responsibility of incrementing the
serial on a zone from central to the producer. This also
means that NOTIFY is triggered by the producer after the
serial has been incremented. The advantage of this approach
is that we can now batch requests which means less work
for the DNS servers, and it removes the risk of
race-conditions when updating the serial. Finally, the
producer is sharded and is easy to scale which means that
this approach should scale well with many zones.

The disadvantage is that it may take up to 5 seconds longer
for the DNS record to be updated on the DNS server. This
can be lowered by increasing the frequency of the task
that is responsible for incrementing the serial.

Depends-On: https://review.opendev.org/#/c/871266/
Change-Id: I5e9733abaaa40c874e1d80d7b57e563df0f12cee
This commit is contained in:
Erik Olof Gunnar Andersson 2023-01-21 18:13:31 -08:00
parent 5703810a40
commit 3401043fc1
21 changed files with 518 additions and 190 deletions

View File

@ -68,8 +68,9 @@ class CentralAPI(object):
6.4 - Removed unused record and diagnostic methods
6.5 - Removed additional unused methods
6.6 - Add methods for shared zones
6.7 - Add increment_zone_serial
"""
RPC_API_VERSION = '6.6'
RPC_API_VERSION = '6.7'
# This allows us to mark some methods as not logged.
# This can be for a few reasons - some methods my not actually call over
@ -82,7 +83,7 @@ class CentralAPI(object):
target = messaging.Target(topic=self.topic,
version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='6.6')
self.client = rpc.get_client(target, version_cap='6.7')
@classmethod
def get_instance(cls):
@ -141,6 +142,9 @@ class CentralAPI(object):
return self.client.call(context, 'get_tenant', tenant_id=tenant_id)
# Zone Methods
def increment_zone_serial(self, context, zone):
return self.client.call(context, 'increment_zone_serial', zone=zone)
def create_zone(self, context, zone):
return self.client.call(context, 'create_zone', zone=zone)

View File

@ -27,6 +27,7 @@ from dns import zone as dnszone
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import timeutils
from designate.common import constants
from designate.common.decorators import lock
@ -51,7 +52,7 @@ LOG = logging.getLogger(__name__)
class Service(service.RPCService):
RPC_API_VERSION = '6.6'
RPC_API_VERSION = '6.7'
target = messaging.Target(version=RPC_API_VERSION)
@ -349,48 +350,35 @@ class Service(service.RPCService):
"A project ID must be specified when not using a project "
"scoped token.")
def _increment_zone_serial(self, context, zone, set_delayed_notify=False):
"""Update the zone serial and the SOA record
Optionally set delayed_notify to have PM issue delayed notify
"""
# Increment the serial number
zone.serial = utils.increment_serial(zone.serial)
if set_delayed_notify:
zone.delayed_notify = True
zone = self.storage.update_zone(context, zone)
# Update SOA record
self._update_soa(context, zone)
return zone
# SOA Recordset Methods
def _build_soa_record(self, zone, ns_records):
return "%s %s. %d %d %d %d %d" % (ns_records[0]['hostname'],
zone['email'].replace("@", "."),
zone['serial'],
zone['refresh'],
zone['retry'],
zone['expire'],
zone['minimum'])
@staticmethod
def _build_soa_record(zone, ns_records):
return '%s %s. %d %d %d %d %d' % (
ns_records[0]['hostname'],
zone['email'].replace('@', '.'),
zone['serial'],
zone['refresh'],
zone['retry'],
zone['expire'],
zone['minimum']
)
def _create_soa(self, context, zone):
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
soa_values = [self._build_soa_record(zone, pool_ns_records)]
recordlist = objects.RecordList(objects=[
objects.Record(data=r, managed=True) for r in soa_values])
values = {
'name': zone['name'],
'type': "SOA",
'records': recordlist
}
soa, zone = self._create_recordset_in_storage(
context, zone, objects.RecordSet(**values),
increment_serial=False)
return soa
records = objects.RecordList(objects=[
objects.Record(
data=self._build_soa_record(zone, pool_ns_records),
managed=True
)
])
return self._create_recordset_in_storage(
context, zone,
objects.RecordSet(
name=zone['name'],
type='SOA',
records=records
), increment_serial=False
)[0]
def _update_soa(self, context, zone):
# NOTE: We should not be updating SOA records when a zone is SECONDARY.
@ -400,14 +388,18 @@ class Service(service.RPCService):
# Get the pool for it's list of ns_records
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
soa = self.find_recordset(context,
criterion={'zone_id': zone['id'],
'type': "SOA"})
soa = self.find_recordset(
context, criterion={
'zone_id': zone['id'],
'type': 'SOA'
}
)
soa.records[0].data = self._build_soa_record(zone, pool_ns_records)
self._update_recordset_in_storage(context, zone, soa,
increment_serial=False)
self._update_recordset_in_storage(
context, zone, soa, increment_serial=False
)
# NS Recordset Methods
def _create_ns(self, context, zone, ns_records):
@ -730,6 +722,14 @@ class Service(service.RPCService):
pool = self.storage.get_pool(elevated_context, pool_id)
return pool.ns_records
@rpc.expected_exceptions()
@transaction
@lock.synchronized_zone()
def increment_zone_serial(self, context, zone):
zone.serial = self.storage.increment_serial(context, zone.id)
self._update_soa(context, zone)
return zone.serial
@rpc.expected_exceptions()
@notification.notify_type('dns.domain.create')
@notification.notify_type('dns.zone.create')
@ -853,7 +853,8 @@ class Service(service.RPCService):
# can be very long-lived.
time.sleep(0)
self._create_recordset_in_storage(
context, zone, rrset, increment_serial=False)
context, zone, rrset, increment_serial=False
)
return zone
@ -992,29 +993,28 @@ class Service(service.RPCService):
"""Update zone
"""
zone = self._update_zone_in_storage(
context, zone, increment_serial=increment_serial)
context, zone, increment_serial=increment_serial
)
# Fire off a XFR
if 'masters' in changes:
self.worker_api.perform_zone_xfr(context, zone)
self.worker_api.update_zone(context, zone)
return zone
@transaction
def _update_zone_in_storage(self, context, zone,
increment_serial=True, set_delayed_notify=False):
increment_serial=True,
set_delayed_notify=False):
zone.action = 'UPDATE'
zone.status = 'PENDING'
if increment_serial:
# _increment_zone_serial increments and updates the zone
zone = self._increment_zone_serial(
context, zone, set_delayed_notify=set_delayed_notify)
else:
zone = self.storage.update_zone(context, zone)
zone.increment_serial = True
if set_delayed_notify:
zone.delayed_notify = True
zone = self.storage.update_zone(context, zone)
return zone
@ -1329,7 +1329,6 @@ class Service(service.RPCService):
# RecordSet Methods
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.create')
@lock.synchronized_zone()
def create_recordset(self, context, zone_id, recordset,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id,
@ -1372,9 +1371,8 @@ class Service(service.RPCService):
context = context.elevated(all_tenants=True)
recordset, zone = self._create_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.worker_api.update_zone(context, zone)
context, zone, recordset, increment_serial=increment_serial
)
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@ -1412,33 +1410,33 @@ class Service(service.RPCService):
@transaction_shallow_copy
def _create_recordset_in_storage(self, context, zone, recordset,
increment_serial=True):
# Ensure the tenant has enough quota to continue
self._enforce_recordset_quota(context, zone)
self._validate_recordset(context, zone, recordset)
if recordset.obj_attr_is_set('records') and len(recordset.records) > 0:
if recordset.obj_attr_is_set('records') and recordset.records:
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, zone, recordset)
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
for record in recordset.records:
record.action = 'CREATE'
record.status = 'PENDING'
record.serial = zone.serial
if not increment_serial:
record.serial = zone.serial
else:
record.serial = timeutils.utcnow_ts()
recordset = self.storage.create_recordset(context, zone.id,
recordset)
new_recordset = self.storage.create_recordset(context, zone.id,
recordset)
if recordset.records and increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial
)
# Return the zone too in case it was updated
return (recordset, zone)
return new_recordset, zone
@rpc.expected_exceptions()
def get_recordset(self, context, zone_id, recordset_id):
@ -1553,7 +1551,6 @@ class Service(service.RPCService):
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.update')
@lock.synchronized_zone()
def update_recordset(self, context, recordset, increment_serial=True):
zone_id = recordset.obj_get_original_value('zone_id')
changes = recordset.obj_get_changes()
@ -1622,41 +1619,44 @@ class Service(service.RPCService):
recordset, zone = self._update_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.worker_api.update_zone(context, zone)
return recordset
@transaction
def _update_recordset_in_storage(self, context, zone, recordset,
increment_serial=True, set_delayed_notify=False):
increment_serial=True,
set_delayed_notify=False):
self._validate_recordset(context, zone, recordset)
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial,
set_delayed_notify=set_delayed_notify)
if recordset.records:
for record in recordset.records:
if record.action != 'DELETE':
record.action = 'UPDATE'
record.status = 'PENDING'
if record.action == 'DELETE':
continue
record.action = 'UPDATE'
record.status = 'PENDING'
if not increment_serial:
record.serial = zone.serial
else:
record.serial = timeutils.utcnow_ts()
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, zone, recordset)
# Update the recordset
recordset = self.storage.update_recordset(context, recordset)
new_recordset = self.storage.update_recordset(context, recordset)
return recordset, zone
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone,
increment_serial=increment_serial,
set_delayed_notify=set_delayed_notify)
return new_recordset, zone
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.delete')
@lock.synchronized_zone()
def delete_recordset(self, context, zone_id, recordset_id,
increment_serial=True):
# apply_tenant_criteria=False here as we will gate this delete
@ -1708,8 +1708,6 @@ class Service(service.RPCService):
recordset, zone = self._delete_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
self.worker_api.update_zone(context, zone)
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@ -1718,23 +1716,26 @@ class Service(service.RPCService):
@transaction
def _delete_recordset_in_storage(self, context, zone, recordset,
increment_serial=True):
if recordset.records:
for record in recordset.records:
record.action = 'DELETE'
record.status = 'PENDING'
if not increment_serial:
record.serial = zone.serial
else:
record.serial = timeutils.utcnow_ts()
# Update the recordset's action/status and then delete it
self.storage.update_recordset(context, recordset)
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
if recordset.records:
for record in recordset.records:
record.action = 'DELETE'
record.status = 'PENDING'
record.serial = zone.serial
new_recordset = self.storage.delete_recordset(context, recordset.id)
# Update the recordset's action/status and then delete it
self.storage.update_recordset(context, recordset)
recordset = self.storage.delete_recordset(context, recordset.id)
return (recordset, zone)
return new_recordset, zone
@rpc.expected_exceptions()
def count_recordsets(self, context, criterion=None):

View File

@ -20,6 +20,11 @@ PRODUCER_GROUP = cfg.OptGroup(
title='Configuration for Producer Service'
)
PRODUCER_TASK_INCREMENT_SERIAL_GROUP = cfg.OptGroup(
name='producer_task:increment_serial',
title='Configuration for Producer Task: Increment Serial'
)
PRODUCER_TASK_DELAYED_NOTIFY_GROUP = cfg.OptGroup(
name='producer_task:delayed_notify',
title='Configuration for Producer Task: Delayed Notify'
@ -62,6 +67,15 @@ PRODUCER_OPTS = [
help='RPC topic name for producer'),
]
PRODUCER_TASK_INCREMENT_SERIAL_OPTS = [
cfg.IntOpt('interval', default=5,
help='Run interval in seconds'),
cfg.IntOpt('per_page', default=100,
help='Default amount of results returned per page'),
cfg.IntOpt('batch_size', default=100,
help='How many zones to increment serial for on each run'),
]
PRODUCER_TASK_DELAYED_NOTIFY_OPTS = [
cfg.IntOpt('interval', default=5,
help='Run interval in seconds'),
@ -111,6 +125,9 @@ def register_opts(conf):
conf.register_group(PRODUCER_TASK_DELAYED_NOTIFY_GROUP)
conf.register_opts(PRODUCER_TASK_DELAYED_NOTIFY_OPTS,
group=PRODUCER_TASK_DELAYED_NOTIFY_GROUP)
conf.register_group(PRODUCER_TASK_INCREMENT_SERIAL_GROUP)
conf.register_opts(PRODUCER_TASK_INCREMENT_SERIAL_OPTS,
group=PRODUCER_TASK_INCREMENT_SERIAL_GROUP)
conf.register_group(PRODUCER_TASK_PERIODIC_EXISTS_GROUP)
conf.register_opts(PRODUCER_TASK_PERIODIC_EXISTS_OPTS,
group=PRODUCER_TASK_PERIODIC_EXISTS_GROUP)

View File

@ -66,6 +66,7 @@ class Zone(base.DesignateObject, base.DictObjectMixin,
),
'transferred_at': fields.DateTimeField(nullable=True, read_only=False),
'delayed_notify': fields.BooleanField(nullable=True),
'increment_serial': fields.BooleanField(nullable=True),
}
STRING_KEYS = [

View File

@ -227,8 +227,6 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
Call Worker to emit NOTIFY transactions,
Reset the flag.
"""
pstart, pend = self._my_range()
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
@ -237,6 +235,7 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
# There's an index on delayed_notify.
criterion = self._filter_between('shard')
criterion['delayed_notify'] = True
criterion['increment_serial'] = False
zones = self.central_api.find_zones(
ctxt,
criterion,
@ -246,6 +245,17 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
)
for zone in zones:
if zone.action == 'NONE':
zone.action = 'UPDATE'
zone.status = 'PENDING'
elif zone.action == 'DELETE':
LOG.debug(
'Skipping delayed NOTIFY for %(id)s being DELETED',
{
'id': zone.id,
}
)
continue
self.worker_api.update_zone(ctxt, zone)
zone.delayed_notify = False
self.central_api.update_zone(ctxt, zone)
@ -257,6 +267,54 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
)
class PeriodicIncrementSerialTask(PeriodicTask):
__plugin_name__ = 'increment_serial'
def __init__(self):
super(PeriodicIncrementSerialTask, self).__init__()
def __call__(self):
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
# Select zones where "increment_serial" is set and starting from the
# oldest "updated_at".
# There's an index on increment_serial.
criterion = self._filter_between('shard')
criterion['increment_serial'] = True
zones = self.central_api.find_zones(
ctxt,
criterion,
limit=CONF[self.name].batch_size,
sort_key='updated_at',
sort_dir='asc',
)
for zone in zones:
if zone.action == 'DELETE':
LOG.debug(
'Skipping increment serial for %(id)s being DELETED',
{
'id': zone.id,
}
)
continue
serial = self.central_api.increment_zone_serial(ctxt, zone)
LOG.debug(
'Incremented serial for %(id)s to %(serial)d',
{
'id': zone.id,
'serial': serial,
}
)
if not zone.delayed_notify:
# Notify the backend.
if zone.action == 'NONE':
zone.action = 'UPDATE'
zone.status = 'PENDING'
self.worker_api.update_zone(ctxt, zone)
class WorkerPeriodicRecovery(PeriodicTask):
__plugin_name__ = 'worker_periodic_recovery'

View File

@ -152,7 +152,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta):
# Ensure the Object is valid
# obj.validate()
values = obj.obj_get_changes()
values = dict(obj)
if skip_values is not None:
for skip_value in skip_values:
@ -166,7 +166,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta):
with sql.get_write_session() as session:
try:
resultproxy = session.execute(query, [dict(values)])
resultproxy = session.execute(query, [values])
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup("Duplicate %s" % obj.obj_name())

View File

@ -744,6 +744,15 @@ class Storage(DriverPlugin, metaclass=abc.ABCMeta):
:param zone_import: Zone Import to update.
"""
@abc.abstractmethod
def increment_serial(self, context, zone_id):
"""
Increment serial of a Zone
:param context: RPC Context.
:param zone_id: ID of the Zone.
"""
@abc.abstractmethod
def delete_zone_import(self, context, zone_import_id):
"""

View File

@ -15,6 +15,7 @@
# under the License.
from oslo_log import log as logging
from oslo_utils.secretutils import md5
from oslo_utils import timeutils
from sqlalchemy import case, select, distinct, func
from sqlalchemy.sql.expression import or_, literal_column
@ -435,6 +436,19 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
return updated_zone
def increment_serial(self, context, zone_id):
"""Increment the zone's serial number.
"""
new_serial = timeutils.utcnow_ts()
query = tables.zones.update().where(
tables.zones.c.id == zone_id).values(
{'serial': new_serial, 'increment_serial': False}
)
with sql.get_write_session() as session:
session.execute(query)
LOG.debug('Incremented zone serial for %s to %d', zone_id, new_serial)
return new_serial
def delete_zone(self, context, zone_id):
"""
"""

View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Add increment serial
Revision ID: a005af3aa38e
Revises: b20189fd288e
Create Date: 2023-01-21 17:39:00.822775
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a005af3aa38e'
down_revision = 'b20189fd288e'
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
'zones',
sa.Column('increment_serial', sa.Boolean, default=False)
)
op.create_index(
'increment_serial', 'zones', ['increment_serial']
)

View File

@ -139,6 +139,7 @@ zones = Table('zones', metadata,
Column('pool_id', UUID, default=None, nullable=True),
Column('reverse_name', String(255), nullable=False),
Column('delayed_notify', Boolean, default=False),
Column('increment_serial', Boolean, default=False),
UniqueConstraint('name', 'deleted', 'pool_id', name='unique_zone_name'),
ForeignKeyConstraint(['parent_zone_id'],

View File

@ -17,6 +17,7 @@ from unittest.mock import patch
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import timeutils
from designate.central import service as central_service
from designate import exceptions
@ -414,10 +415,9 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
self.client.delete(url, status=202)
# Simulate the zone having been deleted on the backend
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE'
self.admin_context, zone['id'], 'SUCCESS', timeutils.utcnow_ts(),
'DELETE'
)
# Try to get the record and ensure that we get a

View File

@ -19,6 +19,7 @@ from collections import namedtuple
from concurrent import futures
import copy
import datetime
import futurist
import random
import unittest
@ -33,7 +34,6 @@ from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_utils import timeutils
from oslo_versionedobjects import exception as ovo_exc
import testtools
from testtools.matchers import GreaterThan
from designate import exceptions
from designate import objects
@ -886,7 +886,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone(self, mock_notifier):
# Create a zone
zone = self.create_zone(email='info@example.org')
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Update the object
zone.email = 'info@example.net'
@ -902,7 +902,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, zone.id)
# Ensure the zone was updated correctly
self.assertGreater(zone.serial, original_serial)
self.assertTrue(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
self.assertEqual(2, mock_notifier.call_count)
@ -927,7 +927,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone_without_incrementing_serial(self):
# Create a zone
zone = self.create_zone(email='info@example.org')
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Update the object
zone.email = 'info@example.net'
@ -940,7 +940,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.central_service.get_zone(self.admin_context, zone.id)
# Ensure the zone was updated correctly
self.assertEqual(original_serial, zone.serial)
self.assertFalse(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
def test_update_zone_name_fail(self):
@ -961,7 +961,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone_deadlock_retry(self):
# Create a zone
zone = self.create_zone(name='example.org.')
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Update the Object
zone.email = 'info@example.net'
@ -988,7 +988,7 @@ class CentralServiceTest(CentralTestCase):
self.assertTrue(i[0])
# Ensure the zone was updated correctly
self.assertGreater(zone.serial, original_serial)
self.assertTrue(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
@mock.patch.object(notifier.Notifier, "info")
@ -1453,7 +1453,7 @@ class CentralServiceTest(CentralTestCase):
# RecordSet Tests
def test_create_recordset(self):
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Create the Object
recordset = objects.RecordSet(name='www.%s' % zone.name, type='A')
@ -1465,7 +1465,6 @@ class CentralServiceTest(CentralTestCase):
# Get the zone again to check if serial increased
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
# Ensure all values have been set correctly
self.assertIsNotNone(recordset.id)
@ -1475,7 +1474,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(recordset.records)
# The serial number does not get updated is there are no records
# in the recordset
self.assertEqual(original_serial, new_serial)
self.assertFalse(updated_zone.increment_serial)
def test_create_recordset_shared_zone(self):
zone = self.create_zone()
@ -1542,15 +1541,15 @@ class CentralServiceTest(CentralTestCase):
def test_create_recordset_with_records(self):
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Create the Object
recordset = objects.RecordSet(
name='www.%s' % zone.name,
type='A',
records=objects.RecordList(objects=[
objects.Record(data='192.3.3.15'),
objects.Record(data='192.3.3.16'),
objects.Record(data='192.0.2.15'),
objects.Record(data='192.0.2.16'),
])
)
@ -1561,14 +1560,13 @@ class CentralServiceTest(CentralTestCase):
# Get updated serial number
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
# Ensure all values have been set correctly
self.assertIsNotNone(recordset.records)
self.assertEqual(2, len(recordset.records))
self.assertIsNotNone(recordset.records[0].id)
self.assertIsNotNone(recordset.records[1].id)
self.assertThat(new_serial, GreaterThan(original_serial))
self.assertTrue(updated_zone.increment_serial)
def test_create_recordset_over_quota(self):
# SOA, NS recordsets exist by default.
@ -1847,7 +1845,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset(self):
# Create a zone
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Create a recordset
recordset = self.create_recordset(zone)
@ -1861,7 +1859,7 @@ class CentralServiceTest(CentralTestCase):
# Get zone again to verify that serial number was updated
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
self.assertTrue(updated_zone.increment_serial)
# Fetch the resource again
recordset = self.central_service.get_recordset(
@ -1869,7 +1867,6 @@ class CentralServiceTest(CentralTestCase):
# Ensure the new value took
self.assertEqual(1800, recordset.ttl)
self.assertThat(new_serial, GreaterThan(original_serial))
@unittest.expectedFailure # FIXME
def test_update_recordset_deadlock_retry(self):
@ -1932,7 +1929,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset_with_record_delete(self):
# Create a zone
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Create a recordset and two records
records = [
@ -1956,12 +1953,11 @@ class CentralServiceTest(CentralTestCase):
# Fetch the Zone again
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
# Ensure two Records are attached to the RecordSet correctly
self.assertEqual(1, len(recordset.records))
self.assertIsNotNone(recordset.records[0].id)
self.assertThat(new_serial, GreaterThan(original_serial))
self.assertTrue(updated_zone.increment_serial)
def test_update_recordset_with_record_update(self):
# Create a zone
@ -2062,7 +2058,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset_shared_zone(self):
# Create a zone
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
context = self.get_context(project_id='1')
self.share_zone(context=self.admin_context, zone_id=zone.id,
@ -2080,7 +2076,9 @@ class CentralServiceTest(CentralTestCase):
# Get zone again to verify that serial number was updated
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
# Ensure that we are incrementing the zone serial
self.assertTrue(updated_zone.increment_serial)
# Fetch the resource again
recordset = self.central_service.get_recordset(
@ -2088,11 +2086,10 @@ class CentralServiceTest(CentralTestCase):
# Ensure the new value took
self.assertEqual(1800, recordset.ttl)
self.assertThat(new_serial, GreaterThan(original_serial))
def test_delete_recordset(self):
zone = self.create_zone()
original_serial = zone.serial
self.assertFalse(zone.increment_serial)
# Create a recordset
recordset = self.create_recordset(zone)
@ -2112,8 +2109,7 @@ class CentralServiceTest(CentralTestCase):
# Fetch the zone again to verify serial number increased
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
new_serial = updated_zone.serial
self.assertThat(new_serial, GreaterThan(original_serial))
self.assertTrue(updated_zone.increment_serial)
def test_delete_recordset_without_incrementing_serial(self):
zone = self.create_zone()
@ -2215,8 +2211,8 @@ class CentralServiceTest(CentralTestCase):
name='www.%s' % zone.name,
type='A',
records=objects.RecordList(objects=[
objects.Record(data='192.3.3.15'),
objects.Record(data='192.3.3.16'),
objects.Record(data='203.0.113.15'),
objects.Record(data='203.0.113.16'),
])
)
@ -2397,7 +2393,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure that the record is still in DB (No invalidation)
self.central_service.find_records(elevated_a, criterion)
# Now give the fip id to tenant 'b' and see that it get's deleted
# Now give the fip id to tenant 'b' and see that it gets deleted
self.network_api.fake.allocate_floatingip(
context_b.project_id, fip['id'])
@ -2407,8 +2403,10 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNone(fip_ptr['ptrdname'])
# Simulate the invalidation on the backend
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
zone = self.central_service.get_zone(
elevated_a, zone_id)
zone_serial = self.central_service.increment_zone_serial(
elevated_a, zone)
self.central_service.update_status(
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
@ -2478,10 +2476,8 @@ class CentralServiceTest(CentralTestCase):
elevated_a, criterion)[0].zone_id
# Simulate the update on the backend
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
self.central_service.update_status(
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE')
self.network_api.fake.deallocate_floatingip(fip['id'])
@ -2491,7 +2487,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure that the record is still in DB (No invalidation)
self.central_service.find_records(elevated_a, criterion)
# Now give the fip id to tenant 'b' and see that it get's deleted
# Now give the fip id to tenant 'b' and see that it gets deleted
self.network_api.fake.allocate_floatingip(
context_b.project_id, fip['id'])
@ -2501,10 +2497,8 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNone(fips[0]['ptrdname'])
# Simulate the invalidation on the backend
zone_serial = self.central_service.get_zone(
elevated_a, zone_id).serial
self.central_service.update_status(
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE')
record = self.central_service.find_records(elevated_a, criterion)[0]
self.assertEqual('NONE', record.action)
@ -3937,3 +3931,91 @@ class CentralServiceTest(CentralTestCase):
retrived_shared_zone.target_project_id)
self.assertEqual(shared_zone.project_id,
retrived_shared_zone.project_id)
def test_batch_increment_serial(self):
zone = self.create_zone()
zone_serial = zone.serial
self.assertFalse(zone.increment_serial)
for index in range(10):
recordset = objects.RecordSet(
name='www.%d.%s' % (index, zone.name),
type='A',
records=objects.RecordList(objects=[
objects.Record(data='192.0.2.%d' % index),
objects.Record(data='198.51.100.%d' % index),
])
)
self.central_service.create_recordset(
self.admin_context, zone.id, recordset=recordset
)
updated_zone = self.central_service.get_zone(
self.admin_context, zone.id
)
recordsets = self.central_service.find_recordsets(
self.admin_context,
criterion={'zone_id': zone.id, 'type': 'A'}
)
# Increment serial hasn't been triggered yet.
self.assertEqual(zone_serial, updated_zone.serial)
self.assertTrue(updated_zone.increment_serial)
self.assertEqual('PENDING', updated_zone.status)
self.assertEqual(10, len(recordsets))
for recordset in recordsets:
self.assertEqual('PENDING', recordset.status)
self.assertEqual(2, len(recordset.records))
for record in recordset.records:
self.assertEqual('PENDING', record.status)
# Increment serial (Producer -> Central) for zone.
with mock.patch.object(timeutils, 'utcnow_ts',
return_value=zone_serial + 5):
self.central_service.increment_zone_serial(
self.admin_context, zone
)
updated_zone = self.central_service.get_zone(
self.admin_context, zone.id
)
recordsets = self.central_service.find_recordsets(
self.admin_context,
criterion={'zone_id': zone.id, 'type': 'A'}
)
# Ensure that serial is now correct.
self.assertEqual(zone_serial + 5, updated_zone.serial)
self.assertFalse(updated_zone.increment_serial)
# But the zone is still in pending status as we haven't notified
# the upstream dns servers yet.
self.assertEqual('PENDING', updated_zone.status)
for recordset in recordsets:
self.assertEqual('PENDING', recordset.status)
for record in recordset.records:
self.assertEqual('PENDING', record.status)
# Trigger update_status (Producer -> Worker -> Central).
# This happens after the upstream DNS servers have been notified
# and updated.
self.central_service.update_status(
self.admin_context, zone.id, 'SUCCESS', updated_zone.serial
)
updated_zone = self.central_service.get_zone(
self.admin_context, zone.id
)
recordsets = self.central_service.find_recordsets(
self.admin_context,
criterion={'zone_id': zone.id, 'type': 'A'}
)
# Validate that the status is now ACTIVE.
self.assertEqual('ACTIVE', updated_zone.status)
self.assertEqual(zone_serial + 5, updated_zone.serial)
for recordset in recordsets:
self.assertEqual('ACTIVE', recordset.status)
for record in recordset.records:
self.assertEqual('ACTIVE', record.status)

View File

@ -15,6 +15,7 @@
# under the License.
import datetime
from unittest import mock
from oslo_log import log as logging
from oslo_utils import timeutils
@ -24,6 +25,7 @@ from designate.storage.impl_sqlalchemy import tables
from designate.storage import sql
from designate.tests import fixtures
from designate.tests import TestCase
from designate.worker import rpcapi as worker_api
LOG = logging.getLogger(__name__)
@ -39,7 +41,7 @@ class DeletedZonePurgeTest(TestCase):
self.config(
time_threshold=self.time_threshold,
batch_size=self.batch_size,
group="producer_task:zone_purge"
group='producer_task:zone_purge'
)
self.purge_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(tasks.DeletedZonePurgeTask)
@ -77,7 +79,7 @@ class DeletedZonePurgeTest(TestCase):
age = index * (self.time_threshold // self.number_of_zones * 2) - 1
delta = datetime.timedelta(seconds=age)
deletion_time = now - delta
name = "example%d.org." % index
name = 'example%d.org.' % index
self._create_deleted_zone(name, deletion_time)
def test_purge_zones(self):
@ -101,9 +103,8 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
super(PeriodicGenerateDelayedNotifyTaskTest, self).setUp()
self.config(quota_zones=self.number_of_zones)
self.config(
interval=1,
batch_size=self.batch_size,
group="producer_task:delayed_notify"
group='producer_task:delayed_notify'
)
self.generate_delayed_notify_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
@ -123,7 +124,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
def _create_zones(self):
# Create a number of zones; half of them with delayed_notify set.
for index in range(self.number_of_zones):
name = "example%d.org." % index
name = 'example%d.org.' % index
delayed_notify = (index % 2 == 0)
self.create_zone(
name=name,
@ -149,3 +150,43 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
remaining, len(zones),
message='Remaining zones: %s' % zones
)
class PeriodicIncrementSerialTaskTest(TestCase):
number_of_zones = 20
batch_size = 20
def setUp(self):
super(PeriodicIncrementSerialTaskTest, self).setUp()
self.worker_api = mock.Mock()
mock.patch.object(worker_api.WorkerAPI, 'get_instance',
return_value=self.worker_api).start()
self.config(quota_zones=self.number_of_zones)
self.config(
batch_size=self.batch_size,
group='producer_task:increment_serial'
)
self.increment_serial_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
tasks.PeriodicIncrementSerialTask
)
)
def _create_zones(self):
for index in range(self.number_of_zones):
name = 'example%d.org.' % index
increment_serial = (index % 2 == 0)
delayed_notify = (index % 4 == 0)
self.create_zone(
name=name,
increment_serial=increment_serial,
delayed_notify=delayed_notify,
)
def test_increment_serial(self):
self._create_zones()
self.increment_serial_task_fixture.task()
self.worker_api.update_zone.assert_called()
self.assertEqual(5, self.worker_api.update_zone.call_count)

View File

@ -113,6 +113,7 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase):
},
"zones": {
"delayed_notify": "CREATE INDEX delayed_notify ON zones (delayed_notify)", # noqa
"increment_serial": "CREATE INDEX increment_serial ON zones (increment_serial)", # noqa
"reverse_name_deleted": "CREATE INDEX reverse_name_deleted ON zones (reverse_name, deleted)", # noqa
"zone_created_at": "CREATE INDEX zone_created_at ON zones (created_at)", # noqa
"zone_deleted": "CREATE INDEX zone_deleted ON zones (deleted)",

View File

@ -31,7 +31,9 @@ from designate import context
from designate.producer import tasks
from designate import rpc
from designate.tests.unit import RoObject
from designate.tests.unit import RwObject
from designate.utils import generate_uuid
from designate.worker import rpcapi as worker_api
DUMMY_TASK_GROUP = cfg.OptGroup(
name='producer_task:dummy',
@ -244,3 +246,82 @@ class PeriodicSecondaryRefreshTest(oslotest.base.BaseTestCase):
self.task()
self.assertFalse(self.central.xfr_zone.called)
class PeriodicIncrementSerialTest(oslotest.base.BaseTestCase):
def setUp(self):
super(PeriodicIncrementSerialTest, self).setUp()
self.useFixture(cfg_fixture.Config(CONF))
self.central_api = mock.Mock()
self.context = mock.Mock()
self.worker_api = mock.Mock()
mock.patch.object(worker_api.WorkerAPI, 'get_instance',
return_value=self.worker_api).start()
mock.patch.object(central_api.CentralAPI, 'get_instance',
return_value=self.central_api).start()
mock.patch.object(context.DesignateContext, 'get_admin_context',
return_value=self.context).start()
self.central_api.increment_zone_serial.return_value = 123
self.task = tasks.PeriodicIncrementSerialTask()
self.task.my_partitions = 0, 9
def test_increment_zone(self):
zone = RoObject(
id=generate_uuid(),
action='CREATE',
increment_serial=True,
delayed_notify=False,
)
self.central_api.find_zones.return_value = [zone]
self.task()
self.central_api.increment_zone_serial.assert_called()
self.worker_api.update_zone.assert_called()
def test_increment_zone_with_action_none(self):
zone = RwObject(
id=generate_uuid(),
action='NONE',
status='ACTIVE',
increment_serial=True,
delayed_notify=False,
)
self.central_api.find_zones.return_value = [zone]
self.task()
self.central_api.increment_zone_serial.assert_called()
self.worker_api.update_zone.assert_called()
self.assertEqual('UPDATE', zone.action)
self.assertEqual('PENDING', zone.status)
def test_increment_zone_with_delayed_notify(self):
zone = RoObject(
id=generate_uuid(),
action='CREATE',
increment_serial=True,
delayed_notify=True,
)
self.central_api.find_zones.return_value = [zone]
self.task()
self.central_api.increment_zone_serial.assert_called()
self.worker_api.update_zone.assert_not_called()
def test_increment_zone_skip_deleted(self):
zone = RoObject(
id=generate_uuid(),
action='DELETE',
increment_serial=True,
delayed_notify=False,
)
self.central_api.find_zones.return_value = [zone]
self.task()
self.central_api.increment_zone_serial.assert_not_called()
self.worker_api.update_zone.assert_not_called()

View File

@ -179,6 +179,7 @@ class MockRecordSet(object):
ttl = 1
type = "PRIMARY"
serial = 123
records = []
def obj_attr_is_set(self, n):
if n == 'records':
@ -418,8 +419,9 @@ class CentralServiceTestCase(CentralBasic):
central_service._is_valid_ttl = mock.Mock()
central_service.storage.create_recordset = mock.Mock(return_value='rs')
central_service._update_zone_in_storage = mock.Mock()
central_service._update_zone_in_storage = mock.Mock(
return_value=Mockzone()
)
recordset = mock.Mock(spec=objects.RecordSet)
recordset.obj_attr_is_set.return_value = True
recordset.records = [MockRecord()]
@ -440,7 +442,9 @@ class CentralServiceTestCase(CentralBasic):
self.service._is_valid_ttl = mock.Mock()
self.service.storage.create_recordset = mock.Mock(return_value='rs')
self.service._update_zone_in_storage = mock.Mock()
self.service._update_zone_in_storage = mock.Mock(
return_value=Mockzone()
)
# NOTE(thirose): Since this is a race condition we assume that
# we will hit it if we try to do the operations in a loop 100 times.
@ -1506,8 +1510,6 @@ class CentralZoneTestCase(CentralBasic):
self.service.delete_recordset(self.context,
CentralZoneTestCase.zone__id_2,
CentralZoneTestCase.recordset__id)
self.assertTrue(
self.service.worker_api.update_zone.called)
self.assertTrue(
self.service._delete_recordset_in_storage.called)
@ -1524,6 +1526,7 @@ class CentralZoneTestCase(CentralBasic):
action='',
status='',
serial=0,
increment_serial=False,
)
])
)
@ -1533,7 +1536,7 @@ class CentralZoneTestCase(CentralBasic):
self.assertEqual(1, len(rs.records))
self.assertEqual('DELETE', rs.records[0].action)
self.assertEqual('PENDING', rs.records[0].status)
self.assertEqual(1, rs.records[0].serial)
self.assertTrue(rs.records[0].serial, 1)
def test_delete_recordset_in_storage_no_increment_serial(self):
self.service._update_zone_in_storage = mock.Mock()

View File

@ -16,7 +16,6 @@ import jinja2
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
from oslo_utils import timeutils
import oslotest.base
from designate import exceptions
@ -213,22 +212,6 @@ class TestUtils(oslotest.base.BaseTestCase):
self.assertEqual('Hello World', result)
@mock.patch.object(timeutils, 'utcnow_ts')
def test_increment_serial_lower_than_ts(self, mock_utcnow_ts):
mock_utcnow_ts.return_value = 1561698354
ret_serial = utils.increment_serial(serial=1)
self.assertEqual(1561698354, ret_serial)
@mock.patch.object(timeutils, 'utcnow_ts')
def test_increment_serial_higher_than_ts(self, mock_utcnow_ts):
mock_utcnow_ts.return_value = 1561698354
ret_serial = utils.increment_serial(serial=1561698354 * 2)
self.assertEqual(1561698354 * 2 + 1, ret_serial)
def test_is_uuid_like(self):
self.assertTrue(
utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64')

View File

@ -26,7 +26,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils.netutils import is_valid_ipv6
from oslo_utils import timeutils
from oslo_utils import uuidutils
import pkg_resources
@ -119,16 +118,6 @@ def execute(*cmd, **kw):
root_helper=root_helper, **kw)
def increment_serial(serial=0):
# This provides for *roughly* unix timestamp based serial numbers
new_serial = timeutils.utcnow_ts()
if new_serial <= serial:
new_serial = serial + 1
return new_serial
def deep_dict_merge(a, b):
if not isinstance(b, dict):
return b

View File

@ -26,7 +26,6 @@ from oslo_utils import timeutils
from designate import dnsutils
from designate import exceptions
from designate import objects
from designate import utils
from designate.worker.tasks import base
LOG = logging.getLogger(__name__)
@ -795,11 +794,10 @@ class RecoverShard(base.Task):
# Include things that have been hanging out in PENDING
# status for longer than they should
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
stale_criterion = {
'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard),
'status': 'PENDING',
'serial': "<%s" % (current - self.max_prop_time)
'serial': "<%s" % (timeutils.utcnow_ts() - self.max_prop_time)
}
stale_zones = self.storage.find_zones(self.context, stale_criterion)

View File

@ -0,0 +1,6 @@
---
features:
- |
Moved zone serial updates to a `designate-producer` task called
`increment_serial` to fix race conditions and to reduce the number of
updates to the upstream DNS servers when performing multiple DNS updates.

View File

@ -118,6 +118,7 @@ designate.producer_tasks =
periodic_exists = designate.producer.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.producer.tasks:PeriodicSecondaryRefreshTask
delayed_notify = designate.producer.tasks:PeriodicGenerateDelayedNotifyTask
increment_serial = designate.producer.tasks:PeriodicIncrementSerialTask
worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery
designate.heartbeat_emitter =