[zm] Periodic Secondary zone refresh task

Change-Id: Ia619153e63da27f4555fb9a719214e54dfb5a26a
Closes-Bug: 1432850
This commit is contained in:
Endre Karlson 2015-07-12 22:12:11 +02:00
parent af5fa2382d
commit 516360e769
8 changed files with 227 additions and 54 deletions

View File

@ -1081,12 +1081,25 @@ class Service(service.RPCService, service.Service):
policy.check('xfr_domain', context, target) policy.check('xfr_domain', context, target)
if domain.type == 'SECONDARY': if domain.type != 'SECONDARY':
self.mdns_api.perform_zone_xfr(context, domain)
else:
msg = "Can't XFR a non Secondary zone." msg = "Can't XFR a non Secondary zone."
raise exceptions.BadRequest(msg) raise exceptions.BadRequest(msg)
# Ensure the format of the servers are correct, then poll the
# serial
srv = random.choice(domain.masters)
status, serial, retries = self.mdns_api.get_serial_number(
context, domain, srv.host, srv.port, 3, 1, 3, 0)
# Perform XFR if serial's are not equal
if serial > domain.serial:
msg = _LI(
"Serial %(srv_serial)d is not equal to zone's %(serial)d,"
" performing AXFR")
LOG.info(
msg % {"srv_serial": serial, "serial": domain.serial})
self.mdns_api.perform_zone_xfr(context, domain)
def count_domains(self, context, criterion=None): def count_domains(self, context, criterion=None):
if criterion is None: if criterion is None:
criterion = {} criterion = {}

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock
from mock import patch from mock import patch
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging as messaging import oslo_messaging as messaging
@ -20,6 +21,7 @@ from oslo_log import log as logging
from designate import exceptions from designate import exceptions
from designate.central import service as central_service from designate.central import service as central_service
from designate.mdns import rpcapi as mdns_api
from designate.tests.test_api.test_v2 import ApiV2TestCase from designate.tests.test_api.test_v2 import ApiV2TestCase
@ -554,17 +556,23 @@ class ApiV2ZonesTest(ApiV2TestCase):
# Create a zone # Create a zone
zone = self.create_domain(**fixture) zone = self.create_domain(**fixture)
response = self.client.post_json( mdns = mock.Mock()
'/zones/%s/tasks/xfr' % zone['id'], with mock.patch.object(mdns_api.MdnsAPI, 'get_instance') as get_mdns:
None, status=202) get_mdns.return_value = mdns
mdns.get_serial_number.return_value = ('SUCCESS', 10, 1, )
response = self.client.post_json(
'/zones/%s/tasks/xfr' % zone['id'],
None, status=202)
self.assertTrue(mdns.perform_zone_xfr.called)
# Check the headers are what we expect # Check the headers are what we expect
self.assertEqual(202, response.status_int) self.assertEqual(202, response.status_int)
self.assertEqual('application/json', response.content_type) self.assertEqual('application/json', response.content_type)
self.assertEqual('""', response.body)
def test_invalid_xfr_request(self): def test_invalid_xfr_request(self):
# Create a zone
# Create a zone # Create a zone
zone = self.create_domain() zone = self.create_domain()

View File

@ -30,6 +30,7 @@ from oslo_messaging.notify import notifier
from designate import exceptions from designate import exceptions
from designate import objects from designate import objects
from designate.mdns import rpcapi as mdns_api
from designate.tests.test_central import CentralTestCase from designate.tests.test_central import CentralTestCase
from designate.storage.impl_sqlalchemy import tables from designate.storage.impl_sqlalchemy import tables
@ -1279,7 +1280,49 @@ class CentralServiceTest(CentralTestCase):
# Create a zone # Create a zone
secondary = self.create_domain(**fixture) secondary = self.create_domain(**fixture)
self.central_service.xfr_domain(self.admin_context, secondary.id) mdns = mock.Mock()
with mock.patch.object(mdns_api.MdnsAPI, 'get_instance') as get_mdns:
get_mdns.return_value = mdns
mdns.get_serial_number.return_value = ('SUCCESS', 10, 1, )
self.central_service.xfr_domain(self.admin_context, secondary.id)
self.assertTrue(mdns.perform_zone_xfr.called)
def test_xfr_domain_same_serial(self):
# Create a domain
fixture = self.get_domain_fixture('SECONDARY', 0)
fixture['email'] = cfg.CONF['service:central'].managed_resource_email
fixture['attributes'] = [{"key": "master", "value": "10.0.0.10"}]
# Create a zone
secondary = self.create_domain(**fixture)
mdns = mock.Mock()
with mock.patch.object(mdns_api.MdnsAPI, 'get_instance') as get_mdns:
get_mdns.return_value = mdns
mdns.get_serial_number.return_value = ('SUCCESS', 1, 1, )
self.central_service.xfr_domain(self.admin_context, secondary.id)
self.assertFalse(mdns.perform_zone_xfr.called)
def test_xfr_domain_lower_serial(self):
# Create a domain
fixture = self.get_domain_fixture('SECONDARY', 0)
fixture['email'] = cfg.CONF['service:central'].managed_resource_email
fixture['attributes'] = [{"key": "master", "value": "10.0.0.10"}]
fixture['serial'] = 10
# Create a zone
secondary = self.create_domain(**fixture)
secondary.serial
mdns = mock.Mock()
with mock.patch.object(mdns_api.MdnsAPI, 'get_instance') as get_mdns:
get_mdns.return_value = mdns
mdns.get_serial_number.return_value = ('SUCCESS', 0, 1, )
self.central_service.xfr_domain(self.admin_context, secondary.id)
self.assertFalse(mdns.perform_zone_xfr.called)
def test_xfr_domain_invalid_type(self): def test_xfr_domain_invalid_type(self):
domain = self.create_domain() domain = self.create_domain()

View File

@ -861,9 +861,13 @@ class CentralDomainTestCase(CentralBasic):
self.service.storage.get_domain.return_value = RoObject( self.service.storage.get_domain.return_value = RoObject(
name='example.org.', name='example.org.',
tenant_id='2', tenant_id='2',
type='SECONDARY' type='SECONDARY',
masters=[RoObject(host='10.0.0.1', port=53)],
serial=1,
) )
with fx_mdns_api: with fx_mdns_api:
self.service.mdns_api.get_serial_number.return_value = \
"SUCCESS", 2, 1
self.service.xfr_domain(self.context, '1') self.service.xfr_domain(self.context, '1')
assert self.service.mdns_api.perform_zone_xfr.called assert self.service.mdns_api.perform_zone_xfr.called

View File

@ -13,10 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
import uuid import uuid
import mock import mock
from oslotest import base as test from oslotest import base as test
from oslo_utils import timeutils
import six import six
import testtools import testtools
@ -210,3 +212,66 @@ class PeriodicExistsTest(TaskTest):
data.update(self.period_data) data.update(self.period_data)
self.mock_notifier.info.assert_called_with( self.mock_notifier.info.assert_called_with(
self.ctxt, "dns.domain.exists", data) self.ctxt, "dns.domain.exists", data)
class PeriodicSecondaryRefreshTest(TaskTest):
def setUp(self):
super(PeriodicSecondaryRefreshTest, self).setUp()
opts = {
"zone_manager_task:periodic_secondary_refresh": RoObject({
"per_page": 100
})
}
self.setup_opts(opts)
# Mock a ctxt...
self.ctxt = mock.Mock()
get_admin_ctxt_patcher = mock.patch.object(context.DesignateContext,
'get_admin_context')
get_admin_context = get_admin_ctxt_patcher.start()
get_admin_context.return_value = self.ctxt
# Mock a central...
self.central = mock.Mock()
get_central_patcher = mock.patch.object(central_api.CentralAPI,
'get_instance')
get_central = get_central_patcher.start()
get_central.return_value = self.central
self.task = tasks.PeriodicSecondaryRefreshTask()
self.task.my_partitions = 0, 9
def test_refresh_no_zone(self):
with mock.patch.object(self.task, '_iter') as _iter:
_iter.return_value = []
self.task()
self.assertFalse(self.central.xfr_domain.called)
def test_refresh_zone(self):
transferred = timeutils.utcnow(True) - datetime.timedelta(minutes=62)
zone = RoObject(
id=str(uuid.uuid4()),
transferred_at=datetime.datetime.isoformat(transferred),
refresh=3600)
with mock.patch.object(self.task, '_iter') as _iter:
_iter.return_value = [zone]
self.task()
self.central.xfr_domain.assert_called_once_with(self.ctxt, zone.id)
def test_refresh_zone_not_expired(self):
# Dummy zone object
transferred = timeutils.utcnow(True) - datetime.timedelta(minutes=50)
zone = RoObject(
id=str(uuid.uuid4()),
transferred_at=datetime.datetime.isoformat(transferred),
refresh=3600)
with mock.patch.object(self.task, '_iter') as _iter:
_iter.return_value = [zone]
self.task()
self.assertFalse(self.central.xfr_domain.called)

View File

@ -90,49 +90,6 @@ class PeriodicTask(plugin.ExtensionPlugin):
return self._iter(self.central_api.find_domains, ctxt, criterion) return self._iter(self.central_api.find_domains, ctxt, criterion)
class PeriodicExistsTask(PeriodicTask):
__plugin_name__ = 'periodic_exists'
__interval__ = 3600
def __init__(self):
super(PeriodicExistsTask, self).__init__()
self.notifier = rpc.get_notifier('zone_manager')
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(cls.get_canonical_name())
options = cls.get_base_opts()
return [(group, options)]
@staticmethod
def _get_period(seconds):
interval = datetime.timedelta(seconds=seconds)
end = timeutils.utcnow()
return end - interval, end
def __call__(self):
pstart, pend = self._my_range()
msg = _LI("Emitting zone exist events for %(start)s to %(end)s")
LOG.info(msg % {"start": pstart, "end": pend})
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
start, end = self._get_period(self.options.interval)
data = {
"audit_period_beginning": str(start),
"audit_period_ending": str(end)
}
for zone in self._iter_zones(ctxt):
zone_data = dict(zone)
zone_data.update(data)
self.notifier.info(ctxt, 'dns.domain.exists', zone_data)
LOG.info(_LI("Finished emitting events."))
class DeletedDomainPurgeTask(PeriodicTask): class DeletedDomainPurgeTask(PeriodicTask):
"""Purge deleted domains that are exceeding the grace period time interval. """Purge deleted domains that are exceeding the grace period time interval.
Deleted domains have values in the deleted_at column. Deleted domains have values in the deleted_at column.
@ -187,3 +144,85 @@ class DeletedDomainPurgeTask(PeriodicTask):
criterion, criterion,
limit=self.options.batch_size, limit=self.options.batch_size,
) )
class PeriodicExistsTask(PeriodicTask):
__plugin_name__ = 'periodic_exists'
__interval__ = 3600
def __init__(self):
super(PeriodicExistsTask, self).__init__()
self.notifier = rpc.get_notifier('zone_manager')
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(cls.get_canonical_name())
options = cls.get_base_opts()
return [(group, options)]
@staticmethod
def _get_period(seconds):
interval = datetime.timedelta(seconds=seconds)
end = timeutils.utcnow()
return end - interval, end
def __call__(self):
pstart, pend = self._my_range()
msg = _LI("Emitting zone exist events for %(start)s to %(end)s")
LOG.info(msg % {"start": pstart, "end": pend})
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
start, end = self._get_period(self.options.interval)
data = {
"audit_period_beginning": str(start),
"audit_period_ending": str(end)
}
for zone in self._iter_zones(ctxt):
zone_data = dict(zone)
zone_data.update(data)
self.notifier.info(ctxt, 'dns.domain.exists', zone_data)
LOG.info(_LI("Finished emitting events."))
class PeriodicSecondaryRefreshTask(PeriodicTask):
__plugin_name__ = 'periodic_secondary_refresh'
__interval__ = 3600
@classmethod
def get_cfg_opts(cls):
group = cfg.OptGroup(cls.get_canonical_name())
options = cls.get_base_opts()
return [(group, options)]
def __call__(self):
pstart, pend = self._my_range()
msg = _LI("Refreshing zones between for %(start)s to %(end)s")
LOG.info(msg % {"start": pstart, "end": pend})
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
# each zone can have a different refresh / expire etc interval defined
# in the SOA at the source / master servers
criterion = {
"type": "SECONDARY"
}
for zone in self._iter_zones(ctxt, criterion):
# NOTE: If the zone isn't transferred yet, ignore it.
if zone.transferred_at is None:
continue
now = timeutils.utcnow(True)
transferred = timeutils.parse_isotime(zone.transferred_at)
seconds = timeutils.delta_seconds(transferred, now)
if seconds > zone.refresh:
msg = "Zone %(id)s has %(seconds)d seconds since last transfer, " \
"executing AXFR"
LOG.debug(msg % {"id": zone.id, "seconds": seconds})
self.central_api.xfr_domain(ctxt, zone.id)

0
functional-tests.log Normal file
View File

View File

@ -111,8 +111,9 @@ designate.manage =
tlds = designate.manage.tlds:TLDCommands tlds = designate.manage.tlds:TLDCommands
designate.zone_manager_tasks = designate.zone_manager_tasks =
periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask
domain_purge = designate.zone_manager.tasks:DeletedDomainPurgeTask domain_purge = designate.zone_manager.tasks:DeletedDomainPurgeTask
periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.zone_manager.tasks:PeriodicSecondaryRefreshTask
[build_sphinx] [build_sphinx]
all_files = 1 all_files = 1