Add delayed NOTIFY

Based on specs in https://review.openstack.org/#/c/246515
Add database column zones.pending_notify and migration script
Add Zone Manager Task to perform delayed NOTIFY
Add Admin API method to count zones pending notify

closes-bug: #1498462
partial-bug: #1436210
Change-Id: Ic5c2745bcd2cc057a42cb8edf21ed4c1a3ddc952
Depends-On: If15594099eb7cf74f1c534a05884c2d2501e57e6
This commit is contained in:
Federico Ceratto 2015-11-19 15:21:11 +00:00
parent d00328bac3
commit ecda910c2b
15 changed files with 424 additions and 17 deletions

View File

@ -527,10 +527,16 @@ class Service(service.RPCService, service.Service):
raise exceptions.InvalidTTL('TTL is below the minimum: %s'
% min_ttl)
def _increment_zone_serial(self, context, zone):
def _increment_zone_serial(self, context, zone, set_delayed_notify=False):
"""Update the zone serial and the SOA record
Optionally set delayed_notify to have PM issue delayed notify
"""
# Increment the serial number
zone.serial = utils.increment_serial(zone.serial)
if set_delayed_notify:
zone.delayed_notify = True
zone = self.storage.update_zone(context, zone)
# Update SOA record
@ -634,7 +640,8 @@ class Service(service.RPCService, service.Service):
ns_recordset.records.append(
objects.Record(data=ns_record, managed=True))
self._update_recordset_in_storage(context, zone, ns_recordset)
self._update_recordset_in_storage(context, zone, ns_recordset,
set_delayed_notify=True)
def _delete_ns(self, context, zone, ns_record):
ns_recordset = self.find_recordset(
@ -644,7 +651,8 @@ class Service(service.RPCService, service.Service):
if record.data == ns_record:
ns_recordset.records.remove(record)
self._update_recordset_in_storage(context, zone, ns_recordset)
self._update_recordset_in_storage(context, zone, ns_recordset,
set_delayed_notify=True)
# Quota Enforcement Methods
def _enforce_zone_quota(self, context, tenant_id):
@ -1047,14 +1055,15 @@ class Service(service.RPCService, service.Service):
@transaction
def _update_zone_in_storage(self, context, zone,
increment_serial=True):
increment_serial=True, set_delayed_notify=False):
zone.action = 'UPDATE'
zone.status = 'PENDING'
if increment_serial:
# _increment_zone_serial increments and updates the zone
zone = self._increment_zone_serial(context, zone)
zone = self._increment_zone_serial(
context, zone, set_delayed_notify=set_delayed_notify)
else:
zone = self.storage.update_zone(context, zone)
@ -1175,12 +1184,21 @@ class Service(service.RPCService, service.Service):
reports.append({'zones': self.count_zones(context),
'records': self.count_records(context),
'tenants': self.count_tenants(context)})
elif criterion == 'zones':
reports.append({'zones': self.count_zones(context)})
elif criterion == 'zones_delayed_notify':
num_zones = self.count_zones(context, criterion=dict(
delayed_notify=True))
reports.append({'zones_delayed_notify': num_zones})
elif criterion == 'records':
reports.append({'records': self.count_records(context)})
elif criterion == 'tenants':
reports.append({'tenants': self.count_tenants(context)})
else:
raise exceptions.ReportNotFound()
@ -1373,7 +1391,7 @@ class Service(service.RPCService, service.Service):
@transaction
def _update_recordset_in_storage(self, context, zone, recordset,
increment_serial=True):
increment_serial=True, set_delayed_notify=False):
changes = recordset.obj_get_changes()
@ -1393,7 +1411,8 @@ class Service(service.RPCService, service.Service):
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
context, zone, increment_serial,
set_delayed_notify=set_delayed_notify)
if recordset.records:
for record in recordset.records:

View File

@ -161,6 +161,11 @@ class Zone(base.DictObjectMixin, base.SoftDeleteObjectMixin,
},
'read_only': True
},
'delayed_notify': {
'schema': {
'type': 'boolean',
},
},
}
STRING_KEYS = [

View File

@ -0,0 +1,37 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# See https://blueprints.launchpad.net/nova/+spec/backportable-db-migrations
# http://lists.openstack.org/pipermail/openstack-dev/2013-March/006827.html
from oslo_log import log as logging
from sqlalchemy import Boolean
from sqlalchemy.schema import Column, MetaData, Table, Index
from designate.i18n import _LI
LOG = logging.getLogger(__name__)
meta = MetaData()
def upgrade(migrate_engine):
LOG.info(_LI("Adding boolean column delayed_notify to table 'zones'"))
meta.bind = migrate_engine
zones_table = Table('zones', meta, autoload=True)
col = Column('delayed_notify', Boolean(), default=False)
col.create(zones_table)
index = Index('delayed_notify', zones_table.c.delayed_notify)
index.create(migrate_engine)

View File

@ -109,6 +109,7 @@ zones = Table('zones', metadata,
default='CREATE', server_default='CREATE', nullable=False),
Column('pool_id', UUID, default=None, nullable=True),
Column('reverse_name', String(255), nullable=False),
Column('delayed_notify', Boolean, default=False),
UniqueConstraint('name', 'deleted', 'pool_id', name='unique_zone_name'),
ForeignKeyConstraint(['parent_zone_id'],

View File

@ -75,6 +75,28 @@ class AdminApiReportsTest(AdminApiTestCase):
self.assertEqual(2, response.json['counts'][0]['zones'])
def test_get_counts_zones_delayed_notify(self):
# Count zones that are pending a NOTIFY transaction
self.policy({'count_zones_delayed_notify': '@'})
response = self.client.get('/reports/counts/zones_delayed_notify')
self.assertEqual(200, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertIn('counts', response.json)
self.assertIn('zones_delayed_notify', response.json['counts'][0])
self.assertEqual(0, response.json['counts'][0]['zones_delayed_notify'])
# Create 2 zones in pending notify and 1 with delayed_notify=False
self.create_zone(fixture=0, delayed_notify=True)
self.create_zone(fixture=1, delayed_notify=True)
self.create_zone(fixture=2)
response = self.client.get('/reports/counts/zones')
self.assertEqual(3, response.json['counts'][0]['zones'])
response = self.client.get('/reports/counts/zones_delayed_notify')
self.assertEqual(2, response.json['counts'][0]['zones_delayed_notify'])
def test_get_counts_records(self):
self.policy({'count_records': '@'})
response = self.client.get('/reports/counts/records')

View File

@ -2881,9 +2881,11 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(u"New Comment", pool.description)
def test_update_pool_add_ns_record(self):
# Create a server pool and zone
# Create a server pool and 3 zones
pool = self.create_pool(fixture=0)
zone = self.create_zone(pool_id=pool.id)
self.create_zone(fixture=1, pool_id=pool.id)
self.create_zone(fixture=2, pool_id=pool.id)
ns_record_count = len(pool.ns_records)
new_ns_record = objects.PoolNsRecord(
@ -2907,10 +2909,17 @@ class CentralServiceTest(CentralTestCase):
self.admin_context,
criterion={'zone_id': zone.id, 'type': "NS"})
# Verify that the doamins NS records ware updated correctly
# Verify that the doamins NS records were updated correctly
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
# Verify that the 3 zones are in the database and that
# the delayed_notify flag is set
zones = self._fetch_all_zones()
self.assertEqual(3, len(zones))
for z in zones:
self.assertTrue(z.delayed_notify)
def test_update_pool_add_ns_record_without_priority(self):
pool = self.create_pool(fixture=0)
self.create_zone(pool_id=pool.id)
@ -2948,6 +2957,10 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
zones = self._fetch_all_zones()
self.assertEqual(1, len(zones))
self.assertTrue(zones[0].delayed_notify)
def test_delete_pool(self):
# Create a server pool
pool = self.create_pool()

View File

@ -70,10 +70,11 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase):
"rrset_type_domainid": "CREATE INDEX rrset_type_domainid ON recordsets (type, zone_id)" # noqa
},
"zones": {
"delayed_notify": "CREATE INDEX delayed_notify ON zones (delayed_notify)", # noqa
"reverse_name_deleted": "CREATE INDEX reverse_name_deleted ON zones (reverse_name, deleted)", # noqa
"zone_created_at": "CREATE INDEX zone_created_at ON zones (created_at)", # noqa
"zone_deleted": "CREATE INDEX zone_deleted ON zones (deleted)",
"zone_tenant_deleted": "CREATE INDEX zone_tenant_deleted ON zones (tenant_id, deleted)" # noqa
"zone_tenant_deleted": "CREATE INDEX zone_tenant_deleted ON zones (tenant_id, deleted)", # noqa
}
}
self.assertDictEqual(expected, indexes)

View File

@ -16,13 +16,18 @@
import datetime
from mock import MagicMock
from oslo_log import log as logging
from oslo_utils import timeutils
from designate.zone_manager import tasks
from designate.tests import TestCase
from designate.pool_manager.rpcapi import PoolManagerAPI
from designate.storage.impl_sqlalchemy import tables
from designate.tests import TestCase
from designate.tests import fixtures
from designate.zone_manager import tasks
from fixtures import MockPatch
LOG = logging.getLogger(__name__)
@ -59,8 +64,7 @@ class DeletedzonePurgeTest(TaskTest):
return zone
def _fetch_all_zones(self):
"""Fetch all zones including deleted ones
"""
# Fetch all zones including deleted ones
query = tables.zones.select()
return self.central_service.storage.session.execute(query).fetchall()
@ -95,8 +99,7 @@ class DeletedzonePurgeTest(TaskTest):
return zones
def test_purge_zones(self):
"""Create 18 zones, run zone_manager, check if 7 zones are remaining
"""
# Create 18 zones, run zone_manager, check if 7 zones are remaining
self.config(quota_zones=1000)
self._create_deleted_zones()
@ -105,3 +108,82 @@ class DeletedzonePurgeTest(TaskTest):
zones = self._fetch_all_zones()
LOG.info("Number of zones: %d", len(zones))
self.assertEqual(7, len(zones))
fx_pool_manager = MockPatch(
'designate.zone_manager.tasks.PoolManagerAPI.get_instance',
MagicMock(spec_set=[
'update_zone',
])
)
class PeriodicGenerateDelayedNotifyTaskTest(TaskTest):
def setUp(self):
super(PeriodicGenerateDelayedNotifyTaskTest, self).setUp()
self.config(
interval=5,
batch_size=100,
group="zone_manager_task:delayed_notify"
)
self.generate_delayed_notify_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
tasks.PeriodicGenerateDelayedNotifyTask
)
)
def _fetch_zones(self, query=None):
# Fetch zones including deleted ones
if query is None:
query = tables.zones.select()
return self.central_service.storage.session.execute(query).fetchall()
def _create_zones(self):
# Create a number of zones; half of them with delayed_notify set
for age in range(20):
name = "example%d.org." % age
delayed_notify = (age % 2 == 0)
self.create_zone(
name=name,
delayed_notify=delayed_notify,
)
def test_generate_delayed_notify_zones(self):
# Create zones and set some of them as pending update.
self.generate_delayed_notify_task_fixture.task()
self.config(quota_zones=1000)
self.config(
interval=1,
batch_size=5,
group="zone_manager_task:delayed_notify"
)
self._create_zones()
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8
self.assertEqual(10, len(zones))
# Run the task and check if it reset the delayed_notify flag
with fx_pool_manager:
pm_api = PoolManagerAPI.get_instance()
pm_api.update_zone.reset_mock()
self.generate_delayed_notify_task_fixture.task()
self.assertEqual(10, pm_api.update_zone.call_count)
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8
self.assertEqual(5, len(zones))
# Run the task and check if it reset the delayed_notify flag
with fx_pool_manager:
self.generate_delayed_notify_task_fixture.task()
pm_api = PoolManagerAPI.get_instance()
self.assertEqual(20, pm_api.update_zone.call_count)
zones = self._fetch_zones(tables.zones.select().where(
tables.zones.c.delayed_notify == True)) # nopep8
self.assertEqual(0, len(zones))

View File

@ -782,11 +782,12 @@ class CentralzoneTestCase(CentralBasic):
out = self.service.create_zone(
self.context,
RoObject(
RwObject(
tenant_id='1',
name='example.com.',
ttl=60,
pool_id='2',
refresh=0,
type='PRIMARY'
)
)

View File

@ -20,6 +20,7 @@ from designate import plugin
from designate import rpc
from designate.central import rpcapi
from designate.i18n import _LI
from designate.pool_manager.rpcapi import PoolManagerAPI
from oslo_config import cfg
from oslo_log import log as logging
@ -234,3 +235,65 @@ class PeriodicSecondaryRefreshTask(PeriodicTask):
"executing AXFR"
LOG.debug(msg, {"id": zone.id, "seconds": seconds})
self.central_api.xfr_zone(ctxt, zone.id)
class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
"""Generate delayed NOTIFY transactions
Scan the database for zones with the delayed_notify flag set.
"""
__plugin_name__ = 'delayed_notify'
__interval__ = 5
def __init__(self):
super(PeriodicGenerateDelayedNotifyTask, self).__init__()
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(cls.get_canonical_name())
options = cls.get_base_opts() + [
cfg.IntOpt(
'interval',
default=cls.__interval__,
help='Run interval in seconds'
),
cfg.IntOpt(
'batch_size',
default=100,
help='How many zones to receive NOTIFY on each run'
),
]
return [(group, options)]
def __call__(self):
"""Fetch a list of zones with the delayed_notify flag set up to
"batch_size"
Call Pool Manager to emit NOTIFY transactions,
Reset the flag.
"""
pstart, pend = self._my_range()
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
# Select zones where "delayed_notify" is set and starting from the
# oldest "updated_at".
# There's an index on delayed_notify.
criterion = self._filter_between('shard')
criterion['delayed_notify'] = True
zones = self.central_api.find_zones(
ctxt,
criterion,
limit=self.options.batch_size,
sort_key='updated_at',
sort_dir='asc',
)
msg = _LI("Performing delayed NOTIFY for %(start)s to %(end)s: %(n)d")
LOG.debug(msg % dict(start=pstart, end=pend, n=len(zones)))
pm_api = PoolManagerAPI.get_instance()
for z in zones:
pm_api.update_zone(ctxt, z)
z.delayed_notify = False
self.central_api.update_zone(ctxt, z)

View File

@ -246,6 +246,16 @@ debug = False
# How old deleted records should be (deleted_at) to be purged, in seconds
#time_threshold = 604800 # 7 days
#------------------------
# Delayed zones NOTIFY
#------------------------
[zone_manager_task:delayed_notify]
# How frequently to scan for zones pending NOTIFY, in seconds
#interval = 5
# How many zones to receive NOTIFY on each run
#batch_size = 100
#-----------------------
# Pool Manager Service
#-----------------------

View File

@ -51,6 +51,7 @@
"xfr_zone": "rule:admin_or_owner",
"abandon_zone": "rule:admin",
"count_zones": "rule:admin_or_owner",
"count_zones_pending_notify": "rule:admin_or_owner",
"purge_zones": "rule:admin",
"touch_zone": "rule:admin_or_owner",

View File

@ -113,6 +113,7 @@ designate.zone_manager_tasks =
zone_purge = designate.zone_manager.tasks:DeletedZonePurgeTask
periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.zone_manager.tasks:PeriodicSecondaryRefreshTask
delayed_notify = designate.zone_manager.tasks:PeriodicGenerateDelayedNotifyTask
[build_sphinx]
all_files = 1

View File

@ -0,0 +1,56 @@
A simple benchmark was run on 2015-11-24 to measure the effectiveness of adding an index on pending_notify
$ sudo ./runner
Creating DB and table
Populating non-pending rows
Populating pending rows
+-----------+
| COUNT(id) |
+-----------+
| 5000 |
+-----------+
+-----------+
| COUNT(id) |
+-----------+
| 200000 |
+-----------+
Without any index
+----+-------------+-------+------+---------------+------+---------+------+--------+-----------------------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+---------------+------+---------+------+--------+-----------------------------+
| 1 | SIMPLE | zones | ALL | NULL | NULL | NULL | NULL | 204013 | Using where; Using filesort |
+----+-------------+-------+------+---------------+------+---------+------+--------+-----------------------------+
Benchmark
Average number of seconds to run all queries: 0.137 seconds
Minimum number of seconds to run all queries: 0.122 seconds
Maximum number of seconds to run all queries: 0.158 seconds
Number of clients running queries: 3
Average number of queries per client: 1
With pending_notify index
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
| 1 | SIMPLE | zones | ref | pending_notify_idx | pending_notify_idx | 2 | const | 4999 | Using where; Using filesort |
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
Benchmark
Average number of seconds to run all queries: 0.012 seconds
Minimum number of seconds to run all queries: 0.010 seconds
Maximum number of seconds to run all queries: 0.019 seconds
Number of clients running queries: 3
Average number of queries per client: 1
With created_at and pending_notify index
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
| 1 | SIMPLE | zones | ref | pending_notify_idx | pending_notify_idx | 2 | const | 4999 | Using where; Using filesort |
+----+-------------+-------+------+--------------------+--------------------+---------+-------+------+-----------------------------+
Benchmark
Average number of seconds to run all queries: 0.012 seconds
Minimum number of seconds to run all queries: 0.010 seconds
Maximum number of seconds to run all queries: 0.018 seconds
Number of clients running queries: 3
Average number of queries per client: 1

View File

@ -0,0 +1,95 @@
#!/bin/bash
#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0, see LICENSE file
echo "Creating DB and table"
mysql <<EOF
DROP DATABASE IF EXISTS mysqlslap;
CREATE DATABASE mysqlslap;
USE mysqlslap;
CREATE TABLE zones (
id CHAR(32),
version INT(32),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
name VARCHAR(255),
email VARCHAR(255),
pending_notify BOOLEAN
);
EOF
populate() {
pending_num=$1
not_pending_num=$2
end_num=$(($pending_num+$not_pending_num))
mysql mysqlslap <<EOF
DROP PROCEDURE IF EXISTS populate_not_pending;
DELIMITER #
CREATE PROCEDURE populate_not_pending()
BEGIN
DECLARE i INT UNSIGNED DEFAULT 0;
WHILE i < $not_pending_num DO
INSERT INTO zones (id, version, name, pending_notify) SELECT i, 1, concat(i, '.example.com'), 0;
SET i = i + 1;
END WHILE;
COMMIT;
END #
EOF
mysql mysqlslap <<EOF
DROP PROCEDURE IF EXISTS populate_pending;
DELIMITER #
CREATE PROCEDURE populate_pending()
BEGIN
DECLARE i INT UNSIGNED DEFAULT $not_pending_num;
WHILE i < $end_num DO
INSERT INTO zones (id, version, name, pending_notify) SELECT i, 1, concat(i, '.example.com'), 1;
SET i = i + 1;
END WHILE;
COMMIT;
END #
EOF
echo "Populating non-pending rows"
mysql mysqlslap -e "CALL populate_not_pending();"
echo "Populating pending rows"
mysql mysqlslap -e "CALL populate_pending();"
mysql mysqlslap -e "SELECT COUNT(id) FROM zones WHERE pending_notify = True ORDER BY updated_at DESC;"
mysql mysqlslap -e "SELECT COUNT(id) FROM zones WHERE pending_notify = False ORDER BY updated_at DESC;"
}
run_bench() {
query="SELECT * FROM zones WHERE pending_notify = True ORDER BY updated_at ASC LIMIT 500;"
mysqlslap -C --iterations=100 --concurrency 3 --query "$query" > /dev/null # warmup
mysql mysqlslap -e "EXPLAIN $query"
mysqlslap -C --iterations=100 --concurrency 3 --query "$query"
}
populate 5000 200000
# mysql mysqlslap -e "$query"
echo "Without any index"
run_bench
echo "With pending_notify index"
mysql mysqlslap -e "CREATE INDEX pending_notify_idx ON zones (pending_notify);"
run_bench
echo "With created_at and pending_notify index"
mysql mysqlslap -e "CREATE INDEX created_at_idx ON zones (created_at);"
run_bench
echo "Drop DB"
mysql -e "DROP DATABASE mysqlslap"