Merge "Actually poll for zone deletes"
This commit is contained in:
commit
57c29fdb3f
@ -2311,17 +2311,13 @@ class Service(service.RPCService, service.Service):
|
||||
zone, deleted = self._update_zone_or_record_status(
|
||||
zone, status, serial)
|
||||
|
||||
LOG.debug('Setting zone %s, serial %s: action %s, status %s'
|
||||
% (zone.id, zone.serial, zone.action, zone.status))
|
||||
self.storage.update_zone(context, zone)
|
||||
if zone.status != 'DELETED':
|
||||
LOG.debug('Setting zone %s, serial %s: action %s, status %s'
|
||||
% (zone.id, zone.serial, zone.action, zone.status))
|
||||
self.storage.update_zone(context, zone)
|
||||
|
||||
# TODO(Ron): Including this to retain the current logic.
|
||||
# We should NOT be deleting zones. The zone status should be
|
||||
# used to indicate the zone has been deleted and not the deleted
|
||||
# column. The deleted column is needed for unique constraints.
|
||||
if deleted:
|
||||
# TODO(vinod): Pass a zone to delete_zone rather than id so
|
||||
# that the action, status and serial are updated correctly.
|
||||
LOG.debug('update_status: deleting %s' % zone.name)
|
||||
self.storage.delete_zone(context, zone.id)
|
||||
|
||||
return zone
|
||||
|
@ -118,9 +118,14 @@ class NotifyEndpoint(base.BaseEndpoint):
|
||||
while (True):
|
||||
(response, retry) = self._make_and_send_dns_message(
|
||||
zone, host, port, timeout, retry_interval, retries)
|
||||
if response and response.rcode() in (
|
||||
dns.rcode.NXDOMAIN, dns.rcode.REFUSED, dns.rcode.SERVFAIL):
|
||||
|
||||
if response and (response.rcode() in (
|
||||
dns.rcode.NXDOMAIN, dns.rcode.REFUSED, dns.rcode.SERVFAIL)
|
||||
or not bool(response.answer)):
|
||||
status = 'NO_ZONE'
|
||||
if zone.serial == 0 and zone.action in ['DELETE', 'NONE']:
|
||||
return (status, 0, retries)
|
||||
|
||||
elif response and len(response.answer) == 1 \
|
||||
and str(response.answer[0].name) == str(zone.name) \
|
||||
and response.answer[0].rdclass == dns.rdataclass.IN \
|
||||
@ -138,12 +143,14 @@ class NotifyEndpoint(base.BaseEndpoint):
|
||||
{'zone': zone.name, 'host': host,
|
||||
'port': port, 'es': zone.serial,
|
||||
'as': actual_serial, 'retries': retries})
|
||||
|
||||
if retries > 0:
|
||||
# retry again
|
||||
time.sleep(retry_interval)
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
else:
|
||||
# Everything looks good at this point. Return SUCCESS.
|
||||
status = 'SUCCESS'
|
||||
@ -211,8 +218,11 @@ class NotifyEndpoint(base.BaseEndpoint):
|
||||
break
|
||||
# Check that we actually got a NOERROR in the rcode and and an
|
||||
# authoritative answer
|
||||
elif response.rcode() in (dns.rcode.NXDOMAIN, dns.rcode.REFUSED,
|
||||
dns.rcode.SERVFAIL):
|
||||
elif (response.rcode() in
|
||||
(dns.rcode.NXDOMAIN, dns.rcode.REFUSED,
|
||||
dns.rcode.SERVFAIL)) or \
|
||||
(response.rcode() == dns.rcode.NOERROR and
|
||||
not bool(response.answer)):
|
||||
LOG.info(_LI("%(zone)s not found on %(server)s:%(port)d"),
|
||||
{'zone': zone.name, 'server': host,
|
||||
'port': port})
|
||||
|
@ -197,7 +197,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
def _get_admin_context_all_tenants(self):
|
||||
return DesignateContext.get_admin_context(all_tenants=True)
|
||||
|
||||
# Periodioc Tasks
|
||||
# Periodic Tasks
|
||||
def periodic_recovery(self):
|
||||
"""
|
||||
Runs only on the pool leader
|
||||
@ -317,7 +317,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
for also_notify in self.pool.also_notifies:
|
||||
self._update_zone_on_also_notify(context, also_notify, zone)
|
||||
|
||||
# Send a NOTIFY to each nameserver
|
||||
# Ensure the change has propagated to each nameserver
|
||||
for nameserver in self.pool.nameservers:
|
||||
create_status = self._build_status_object(
|
||||
nameserver, zone, CREATE_ACTION)
|
||||
@ -391,7 +391,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
for also_notify in self.pool.also_notifies:
|
||||
self._update_zone_on_also_notify(context, also_notify, zone)
|
||||
|
||||
# Ensure the change has propogated to each nameserver
|
||||
# Ensure the change has propagated to each nameserver
|
||||
for nameserver in self.pool.nameservers:
|
||||
# See if there is already another update in progress
|
||||
try:
|
||||
@ -453,25 +453,29 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
results.append(
|
||||
self._delete_zone_on_target(context, target, zone))
|
||||
|
||||
# TODO(kiall): We should monitor that the Zone is actually deleted
|
||||
# correctly on each of the nameservers, rather than
|
||||
# assuming a successful delete-on-target is OK as we have
|
||||
# in the past.
|
||||
if self._exceed_or_meet_threshold(
|
||||
if not self._exceed_or_meet_threshold(
|
||||
results.count(True), MAXIMUM_THRESHOLD):
|
||||
LOG.debug('Consensus reached for deleting zone %(zone)s '
|
||||
'on pool targets' % {'zone': zone.name})
|
||||
|
||||
self.central_api.update_status(
|
||||
context, zone.id, SUCCESS_STATUS, zone.serial)
|
||||
|
||||
else:
|
||||
LOG.warning(_LW('Consensus not reached for deleting zone %(zone)s'
|
||||
' on pool targets') % {'zone': zone.name})
|
||||
|
||||
' on pool targets') % {'zone': zone.name})
|
||||
self.central_api.update_status(
|
||||
context, zone.id, ERROR_STATUS, zone.serial)
|
||||
|
||||
zone.serial = 0
|
||||
# Ensure the change has propagated to each nameserver
|
||||
for nameserver in self.pool.nameservers:
|
||||
# See if there is already another update in progress
|
||||
try:
|
||||
self.cache.retrieve(context, nameserver.id, zone.id,
|
||||
DELETE_ACTION)
|
||||
except exceptions.PoolManagerStatusNotFound:
|
||||
update_status = self._build_status_object(
|
||||
nameserver, zone, DELETE_ACTION)
|
||||
self.cache.store(context, update_status)
|
||||
|
||||
self.mdns_api.poll_for_serial_number(
|
||||
context, zone, nameserver, self.timeout,
|
||||
self.retry_interval, self.max_retries, self.delay)
|
||||
|
||||
def _delete_zone_on_target(self, context, target, zone):
|
||||
"""
|
||||
:param context: Security context information.
|
||||
@ -542,7 +546,11 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
current_status.serial_number = actual_serial
|
||||
self.cache.store(context, current_status)
|
||||
|
||||
LOG.debug('Attempting to get consensus serial for %s' %
|
||||
zone.name)
|
||||
consensus_serial = self._get_consensus_serial(context, zone)
|
||||
LOG.debug('Consensus serial for %s is %s' %
|
||||
(zone.name, consensus_serial))
|
||||
|
||||
# If there is a valid consensus serial we can still send a success
|
||||
# for that serial.
|
||||
@ -568,11 +576,15 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
||||
self.central_api.update_status(
|
||||
context, zone.id, ERROR_STATUS, error_serial)
|
||||
|
||||
if status == NO_ZONE_STATUS and action != DELETE_ACTION:
|
||||
LOG.warning(_LW('Zone %(zone)s is not present in some '
|
||||
'targets') % {'zone': zone.name})
|
||||
self.central_api.update_status(
|
||||
context, zone.id, NO_ZONE_STATUS, 0)
|
||||
if status == NO_ZONE_STATUS:
|
||||
if action == DELETE_ACTION:
|
||||
self.central_api.update_status(
|
||||
context, zone.id, NO_ZONE_STATUS, 0)
|
||||
else:
|
||||
LOG.warning(_LW('Zone %(zone)s is not present in some '
|
||||
'targets') % {'zone': zone.name})
|
||||
self.central_api.update_status(
|
||||
context, zone.id, NO_ZONE_STATUS, 0)
|
||||
|
||||
if consensus_serial == zone.serial and self._is_consensus(
|
||||
context, zone, action, SUCCESS_STATUS,
|
||||
|
@ -252,65 +252,6 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
||||
|
||||
self.assertFalse(mock_update_status.called)
|
||||
|
||||
@patch.object(impl_fake.FakeBackend, 'delete_zone',
|
||||
side_effect=exceptions.Backend)
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_delete_zone(self, mock_update_status, _):
|
||||
zone = self._build_zone('example.org.', 'DELETE', 'PENDING')
|
||||
|
||||
self.service.delete_zone(self.admin_context, zone)
|
||||
|
||||
mock_update_status.assert_called_once_with(
|
||||
self.admin_context, zone.id, 'ERROR', zone.serial)
|
||||
|
||||
@patch.object(impl_fake.FakeBackend, 'delete_zone')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_delete_zone_target_both_failure(
|
||||
self, mock_update_status, mock_delete_zone):
|
||||
|
||||
zone = self._build_zone('example.org.', 'DELETE', 'PENDING')
|
||||
|
||||
mock_delete_zone.side_effect = exceptions.Backend
|
||||
|
||||
self.service.delete_zone(self.admin_context, zone)
|
||||
|
||||
mock_update_status.assert_called_once_with(
|
||||
self.admin_context, zone.id, 'ERROR', zone.serial)
|
||||
|
||||
@patch.object(impl_fake.FakeBackend, 'delete_zone')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_delete_zone_target_one_failure(
|
||||
self, mock_update_status, mock_delete_zone):
|
||||
|
||||
zone = self._build_zone('example.org.', 'DELETE', 'PENDING')
|
||||
|
||||
mock_delete_zone.side_effect = [None, exceptions.Backend]
|
||||
|
||||
self.service.delete_zone(self.admin_context, zone)
|
||||
|
||||
mock_update_status.assert_called_once_with(
|
||||
self.admin_context, zone.id, 'ERROR', zone.serial)
|
||||
|
||||
@patch.object(impl_fake.FakeBackend, 'delete_zone')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_delete_zone_target_one_failure_consensus(
|
||||
self, mock_update_status, mock_delete_zone):
|
||||
|
||||
self.service.stop()
|
||||
self.config(
|
||||
threshold_percentage=50,
|
||||
group='service:pool_manager')
|
||||
self.service = self.start_service('pool_manager')
|
||||
|
||||
zone = self._build_zone('example.org.', 'DELETE', 'PENDING')
|
||||
|
||||
mock_delete_zone.side_effect = [None, exceptions.Backend]
|
||||
|
||||
self.service.delete_zone(self.admin_context, zone)
|
||||
|
||||
mock_update_status.assert_called_once_with(
|
||||
self.admin_context, zone.id, 'ERROR', zone.serial)
|
||||
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'get_serial_number',
|
||||
side_effect=messaging.MessagingException)
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
|
@ -55,3 +55,38 @@ class RoObject(object):
|
||||
|
||||
def to_dict(self):
|
||||
return self.__dict__
|
||||
|
||||
|
||||
class RwObject(object):
|
||||
"""Object mock: raise exception on __setitem__ or __setattr__
|
||||
on any item/attr created after initialization.
|
||||
Allows updating existing items/attrs
|
||||
"""
|
||||
def __init__(self, d=None, **kw):
|
||||
if d:
|
||||
kw.update(d)
|
||||
self.__dict__.update(kw)
|
||||
|
||||
def __getitem__(self, k):
|
||||
try:
|
||||
return self.__dict__[k]
|
||||
except KeyError:
|
||||
cn = self.__class__.__name__
|
||||
raise NotImplementedError(
|
||||
"Attempt to perform __getitem__"
|
||||
" %r on %s %r" % (cn, k, self.__dict__)
|
||||
)
|
||||
|
||||
def __setitem__(self, k, v):
|
||||
if k in self.__dict__:
|
||||
self.__dict__.update({k: v})
|
||||
return
|
||||
|
||||
cn = self.__class__.__name__
|
||||
raise NotImplementedError(
|
||||
"Attempt to perform __setitem__ or __setattr__"
|
||||
" %r on %s %r" % (cn, k, self.__dict__)
|
||||
)
|
||||
|
||||
def __setattr__(self, k, v):
|
||||
self.__setitem__(k, v)
|
||||
|
@ -161,6 +161,7 @@ class MdnsNotifyTest(base.BaseTestCase):
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but (flags & dns.flags.AA) gives 0
|
||||
flags=0,
|
||||
answer=['answer'],
|
||||
)
|
||||
self.notify._send_dns_message = Mock(return_value=response)
|
||||
|
||||
@ -176,7 +177,8 @@ class MdnsNotifyTest(base.BaseTestCase):
|
||||
rcode=Mock(return_value=dns.rcode.NOERROR),
|
||||
# rcode is NOERROR but flags are not NOERROR
|
||||
flags=123,
|
||||
ednsflags=321
|
||||
ednsflags=321,
|
||||
answer=['answer'],
|
||||
)
|
||||
self.notify._send_dns_message = Mock(return_value=response)
|
||||
|
||||
|
@ -29,6 +29,7 @@ from designate import exceptions
|
||||
from designate import objects
|
||||
from designate.pool_manager.service import Service
|
||||
from designate.tests.unit import RoObject
|
||||
from designate.tests.unit import RwObject
|
||||
import designate.pool_manager.service as pm_module
|
||||
|
||||
|
||||
@ -180,3 +181,30 @@ class PoolManagerTest(test.BaseTestCase):
|
||||
self.pm.periodic_sync()
|
||||
|
||||
self.assertEqual(3, self.pm.update_zone.call_count)
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
def test_create_zone(self, mock_get_ctx, *mocks):
|
||||
z = RwObject(name='a_zone', serial=1)
|
||||
|
||||
mock_ctx = mock_get_ctx.return_value
|
||||
self.pm._exceed_or_meet_threshold = Mock(return_value=True)
|
||||
|
||||
self.pm.create_zone(mock_ctx, z)
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
def test_update_zone(self, mock_get_ctx, *mocks):
|
||||
z = RwObject(name='a_zone', serial=1)
|
||||
|
||||
mock_ctx = mock_get_ctx.return_value
|
||||
self.pm._exceed_or_meet_threshold = Mock(return_value=True)
|
||||
|
||||
self.pm.update_zone(mock_ctx, z)
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
def test_delete_zone(self, mock_get_ctx, *mocks):
|
||||
z = RwObject(name='a_zone', serial=1)
|
||||
|
||||
mock_ctx = mock_get_ctx.return_value
|
||||
self.pm._exceed_or_meet_threshold = Mock(return_value=True)
|
||||
|
||||
self.pm.delete_zone(mock_ctx, z)
|
||||
|
@ -29,4 +29,4 @@ TEMPEST_DIR=${TEMPEST_DIR:-/opt/stack/new/tempest}
|
||||
|
||||
pushd $DESIGNATE_DIR
|
||||
export TEMPEST_CONFIG=$TEMPEST_DIR/etc/tempest.conf
|
||||
tox -e functional
|
||||
tox -e functional -- --concurrency 4
|
||||
|
@ -87,7 +87,7 @@ def parameterized(data):
|
||||
return wrapped
|
||||
|
||||
|
||||
def wait_for_condition(condition, interval=1, timeout=40):
|
||||
def wait_for_condition(condition, interval=5, timeout=45):
|
||||
end_time = time.time() + timeout
|
||||
while time.time() < end_time:
|
||||
result = condition()
|
||||
|
Loading…
Reference in New Issue
Block a user