# Copyright 2015 Hewlett-Packard Development Company, L.P. # # Author: Federico Ceratto # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import six import unittest from mock import Mock from mock import patch from oslo_config import cfg from oslo_config import fixture as cfg_fixture from oslo_log import log as logging from oslotest import base import fixtures import mock import testtools from designate import exceptions from designate import objects from designate.central.service import Service from designate.tests import TestCase from designate.tests.fixtures import random_seed import designate.central.service LOG = logging.getLogger(__name__) # TODO(Federico): move this def unwrap(f): """Unwrap a decorated function Requires __wrapped_function and __wrapper_name to be set """ for _ in range(42): try: uf = getattr(f, '__wrapped_function') LOG.debug("Unwrapping %s from %s" % (f.func_name, f.__wrapper_name)) f = uf except AttributeError: return f return f class RwObject(object): """Object mock: raise exception on __setitem__ or __setattr__ on any item/attr created after initialization. Allows updating existing items/attrs """ def __init__(self, d=None, **kw): if d: kw.update(d) self.__dict__.update(kw) def __getitem__(self, k): try: return self.__dict__[k] except KeyError: cn = self.__class__.__name__ raise NotImplementedError( "Attempt to perform __getitem__" " %r on %s %r" % (cn, k, self.__dict__) ) def __setitem__(self, k, v): if k in self.__dict__: self.__dict__.update({k: v}) return cn = self.__class__.__name__ raise NotImplementedError( "Attempt to perform __setitem__ or __setattr__" " %r on %s %r" % (cn, k, self.__dict__) ) def __setattr__(self, k, v): self.__setitem__(k, v) class RoObject(RwObject): """Read-only Object mock: raise exception on unexpected __setitem__ or __setattr__ """ def __setitem__(self, k, v): cn = self.__class__.__name__ raise NotImplementedError( "Attempt to perform __setitem__ or __setattr__" " %r on %s %r" % (cn, k, self.__dict__) ) class MockObjectTest(base.BaseTestCase): def test_ro(self): o = RoObject(a=1) self.assertEqual(1, o['a']) self.assertEqual(1, o.a) with testtools.ExpectedException(NotImplementedError): o.a = 2 with testtools.ExpectedException(NotImplementedError): o.new = 1 with testtools.ExpectedException(NotImplementedError): o['a'] = 2 with testtools.ExpectedException(NotImplementedError): o['new'] = 1 def test_rw(self): o = RwObject(a=1) self.assertEqual(1, o['a']) self.assertEqual(1, o.a) o.a = 2 self.assertEqual(2, o.a) self.assertEqual(2, o['a']) o['a'] = 3 self.assertEqual(3, o.a) self.assertEqual(3, o['a']) with testtools.ExpectedException(NotImplementedError): o.new = 1 with testtools.ExpectedException(NotImplementedError): o['new'] = 1 def mock_out(name): def decorator(meth): def wrapper(self, a): return meth(self, a) wrapper = mock.patch(name)(wrapper) return wrapper return decorator class Mockzone(object): id = 1 name = 'example.org' pool_id = 1 tenant_id = 3 ttl = 1 type = "PRIMARY" serial = 123 def obj_attr_is_set(self, n): if n == 'recordsets': return False raise NotImplementedError() def __getitem__(self, k): items = { 'id': 3, 'email': 'foo@example.org', 'serial': 123, 'refresh': 20, 'retry': 33, 'expire': 9999, 'minimum': 2, 'name': 'example.org.', } try: return items[k] except KeyError: raise NotImplementedError(k) class MockRecordSet(object): id = 1 name = 'example.org.' pool_id = 1 tenant_id = 3 ttl = 1 type = "PRIMARY" serial = 123 def obj_attr_is_set(self, n): if n == 'records': return False raise NotImplementedError() class MockRecord(object): hostname = 'bar' def __getitem__(self, n): assert n == 'hostname' return 'bar' class MockPool(object): ns_records = [MockRecord(), ] # Fixtures fx_mdns_api = fixtures.MockPatch('designate.central.service.mdns_rpcapi') mdns_api = mock.PropertyMock( return_value=mock.NonCallableMagicMock(spec_set=[ 'a' ]) ) fx_pool_manager = fixtures.MockPatch( 'designate.central.service.pool_manager_rpcapi.Pool' 'ManagerAPI.get_instance', mock.MagicMock(spec_set=[ 'create_zone', 'update_zone', 'delete_zone' ]) ) fx_worker = fixtures.MockPatch( 'designate.central.service.worker_rpcapi.WorkerAPI.get_instance', mock.MagicMock(spec_set=[ 'create_zone', 'update_zone', 'delete_zone' ]) ) fx_disable_notification = fixtures.MockPatch('designate.central.notification') class NotMockedError(NotImplementedError): pass @patch('designate.central.service.storage', mock.NonCallableMock(side_effect=NotMockedError)) class CentralBasic(TestCase): def setUp(self): super(CentralBasic, self).setUp() self.CONF = self.useFixture(cfg_fixture.Config(cfg.CONF)).conf mock_storage = mock.Mock(spec=designate.storage.base.Storage) pool_list = objects.PoolList.from_list( [ {'id': '794ccc2c-d751-44fe-b57f-8894c9f5c842'} ] ) attrs = { 'count_zones.return_value': 0, 'find_zone.return_value': Mockzone(), 'get_pool.return_value': MockPool(), 'find_pools.return_value': pool_list, } mock_storage.configure_mock(**attrs) self.useFixture(fixtures.MockPatchObject( designate.central.service.storage, 'get_storage', return_value=mock_storage) ) designate.central.service.policy = mock.NonCallableMock(spec_set=[ 'reset', 'set_rules', 'init', 'check', ]) designate.central.service.quota = mock.NonCallableMock(spec_set=[ 'get_quota', ]) designate.central.service.storage = mock.NonCallableMock(spec_set=[ 'get_storage', ]) designate.central.service.rpcapi = mock.Mock() designate.central.service.pool_manager_rpcapi = mock.Mock() designate.central.service.worker_rpcapi = mock.Mock() self.context = mock.NonCallableMock(spec_set=[ 'elevated', 'sudo', 'abandon', 'all_tenants', ]) self.service = Service() self.service.check_for_tlds = True self.service.notifier = mock.Mock() class CentralServiceTestCase(CentralBasic): def test_mdns_api_patch(self): with fx_mdns_api: q = self.service.mdns_api assert 'mdns_rpcapi.MdnsAPI.get_instance' in repr(q) def test_conf_fixture(self): assert 'service:central' in designate.central.service.cfg.CONF def test_init(self): self.assertTrue(self.service.check_for_tlds) # Ensure these attributes are lazy self.assertFalse(designate.central.service.storage.get_storage.called) self.assertFalse(designate.central.service.quota.get_quota.called) def test_storage_loads_lazily(self): assert self.service.storage self.assertTrue(designate.central.service.storage.get_storage.called) def test_quota_loads_lazily(self): assert self.service.quota self.assertTrue(designate.central.service.quota.get_quota.called) def test__is_valid_ttl(self): self.CONF.set_override('min_ttl', 10, 'service:central') self.service._is_valid_ttl(self.context, 20) # policy.check() not to raise: the user is allowed to create low TTLs designate.central.service.policy.check = mock.Mock(return_value=None) self.service._is_valid_ttl(self.context, None) self.service._is_valid_ttl(self.context, 1) # policy.check() to raise designate.central.service.policy.check = mock.Mock( side_effect=exceptions.Forbidden ) with testtools.ExpectedException(exceptions.InvalidTTL): self.service._is_valid_ttl(self.context, 3) def test__update_soa_secondary(self): ctx = mock.Mock() mock_zone = RoObject(type='SECONDARY') self.service._update_soa(ctx, mock_zone) self.assertFalse(ctx.elevated.called) def test__update_soa(self): class MockZone(dict): type = 'PRIMARY' pool_id = 1 class MockRecord(object): data = None mock_soa = RoObject(records=[MockRecord()]) self.context.elevated = mock.Mock() self.service._update_zone_in_storage = mock.Mock() self.service.storage.get_pool = mock.Mock(return_value=MockPool()) self.service.find_recordset = mock.Mock(return_value=mock_soa) self.service._build_soa_record = mock.Mock() self.service._update_recordset_in_storage = mock.Mock() self.service._update_soa(self.context, Mockzone()) self.assertTrue(self.service._update_recordset_in_storage.called) self.assertTrue(self.context.elevated.called) def test_count_zones(self): self.service.count_zones(self.context) self.service.storage.count_zones.assert_called_once_with( self.context, {} ) def test_count_zones_criterion(self): self.service.count_zones(self.context, criterion={'a': 1}) self.service.storage.count_zones.assert_called_once_with( self.context, {'a': 1} ) def test_validate_new_recordset(self): self.service._is_valid_recordset_name = mock.Mock() self.service._is_valid_recordset_placement = mock.Mock() self.service._is_valid_recordset_placement_subzone = mock.Mock() self.service._is_valid_ttl = mock.Mock() MockRecordSet.id = None self.service._validate_recordset( self.context, Mockzone, MockRecordSet ) assert self.service._is_valid_recordset_name.called assert self.service._is_valid_recordset_placement.called assert self.service._is_valid_recordset_placement_subzone.called assert self.service._is_valid_ttl.called def test_validate_existing_recordset(self): self.service._is_valid_recordset_name = mock.Mock() self.service._is_valid_recordset_placement = mock.Mock() self.service._is_valid_recordset_placement_subzone = mock.Mock() self.service._is_valid_ttl = mock.Mock() MockRecordSet.obj_get_changes = Mock(return_value={'ttl': 3600}) self.service._validate_recordset( self.context, Mockzone, MockRecordSet ) assert self.service._is_valid_recordset_name.called assert self.service._is_valid_recordset_placement.called assert self.service._is_valid_recordset_placement_subzone.called assert self.service._is_valid_ttl.called def test_create_recordset_in_storage(self): self.service._enforce_recordset_quota = Mock() self.service._validate_recordset = mock.Mock() self.service.storage.create_recordset = Mock(return_value='rs') self.service._update_zone_in_storage = Mock() rs, zone = self.service._create_recordset_in_storage( self.context, Mockzone(), MockRecordSet() ) self.assertEqual(rs, 'rs') self.assertFalse(self.service._update_zone_in_storage.called) def test_create_recordset_with_records_in_storage(self): self.service._enforce_recordset_quota = mock.Mock() self.service._enforce_record_quota = mock.Mock() self.service._is_valid_recordset_name = mock.Mock() self.service._is_valid_recordset_placement = mock.Mock() self.service._is_valid_recordset_placement_subzone = mock.Mock() self.service._is_valid_ttl = mock.Mock() self.service.storage.create_recordset = mock.Mock(return_value='rs') self.service._update_zone_in_storage = mock.Mock() recordset = Mock() recordset.obj_attr_is_set.return_value = True recordset.records = [MockRecord()] rs, zone = self.service._create_recordset_in_storage( self.context, Mockzone(), recordset ) assert self.service._enforce_record_quota.called assert self.service._update_zone_in_storage.called def test_create_recordset_checking_DBDeadLock(self): self.service._enforce_recordset_quota = mock.Mock() self.service._enforce_record_quota = mock.Mock() self.service._is_valid_recordset_name = mock.Mock() self.service._is_valid_recordset_placement = mock.Mock() self.service._is_valid_recordset_placement_subzone = mock.Mock() self.service._is_valid_ttl = mock.Mock() self.service.storage.create_recordset = mock.Mock(return_value='rs') self.service._update_zone_in_storage = mock.Mock() # NOTE(thirose): Since this is a race condition we assume that # we will hit it if we try to do the operations in a loop 100 times. for num in range(100): recordset = Mock() recordset.name = "b%s".format(num) recordset.obj_attr_is_set.return_value = True recordset.records = [MockRecord()] rs, zone = self.service._create_recordset_in_storage( self.context, Mockzone(), recordset ) assert not self.service.storage._retry_on_deadlock.called assert self.service._update_zone_in_storage.called assert self.service.storage.create_recordset.called def test__create_soa(self): self.service._create_recordset_in_storage = Mock( return_value=(None, None) ) self.service._build_soa_record = Mock( return_value='example.org. foo.bar 1 60 5 999 1' ) zone = Mockzone() self.service._create_soa(self.context, zone) ctx, md, rset = self.service._create_recordset_in_storage.call_args[0] self.assertEqual('example.org.', rset.name) self.assertEqual('SOA', rset.type) self.assertEqual('SOA', rset.type) self.assertEqual(1, len(rset.records.objects)) self.assertTrue(rset.records.objects[0].managed) def test__create_zone_in_storage(self): self.service._create_soa = Mock() self.service._create_ns = Mock() self.service.get_zone_ns_records = Mock( return_value=[RoObject(hostname='host_foo')] ) def cr_dom(ctx, zone): return zone self.service.storage.create_zone = cr_dom zone = self.service._create_zone_in_storage( self.context, Mockzone() ) self.assertEqual('PENDING', zone.status) self.assertEqual('CREATE', zone.action) ctx, zone, hostnames = self.service._create_ns.call_args[0] self.assertEqual(['host_foo'], hostnames) @unittest.expectedFailure # FIXME def test_create_zone_forbidden(self): assert not self.service.storage.count_zones.called designate.central.service.policy.check = mock.Mock(return_value=None) self.service._enforce_zone_quota = mock.Mock(return_value=None) self.service._is_valid_zone_name = mock.Mock(return_value=None) self.service._is_valid_ttl = mock.Mock(return_value=True) self.service._is_subzone = mock.Mock() self.service._create_zone_in_storage = mock.Mock( return_value=Mockzone() ) self.service.storage.find_zone(self.context, {}) parent_zone = self.service._is_subzone( self.context, 'bogusname', 1234) # self.assertEqual('', parent_zone) self.service.check_for_tlds = False with testtools.ExpectedException(exceptions.Forbidden): self.service.create_zone(self.context, Mockzone()) # TODO(Federico) add more create_zone tests assert parent_zone class CentralZoneTestCase(CentralBasic): zone__id = '1c85d9b0-1e9d-4e99-aede-a06664f1af2e' zone__id_2 = '7c85d9b0-1e9d-4e99-aede-a06664f1af2e' record__id = 'b81ebcfb-6236-4424-b77f-2dd0179fa041' record__id_2 = 'c81ebcfb-6236-4424-b77f-2dd0179fa041' pool__id = '769ca3fc-5924-4a44-8c1f-7efbe52fbd59' recordset__id = '9c85d9b0-1e9d-4e99-aede-a06664f1af2e' recordset__id_2 = 'dc85d9b0-1e9d-4e99-aede-a06664f1af2e' recordset__id_3 = '2a94a9fe-30d1-4a15-9071-0bb21996d971' zone_export__id = 'e887597f-9697-47dd-a202-7a2711f8669c' def setUp(self): super(CentralZoneTestCase, self).setUp() def storage_find_tld(c, d): if d['name'] not in ('org',): raise exceptions.TldNotFound def storage_find_tlds(c): return objects.TldList.from_list( [objects.Tld.from_dict({'name': 'org'})] ) self.service.storage.find_tlds = storage_find_tlds self.service.storage.find_tld = storage_find_tld def test__is_valid_zone_name_valid(self): self.service._is_blacklisted_zone_name = Mock() self.service._is_valid_zone_name(self.context, 'valid.org.') def test__is_valid_zone_name_invalid(self): self.service._is_blacklisted_zone_name = Mock() with testtools.ExpectedException(exceptions.InvalidZoneName): self.service._is_valid_zone_name(self.context, 'example^org.') def test__is_valid_zone_name_invalid_2(self): self.service._is_blacklisted_zone_name = Mock() with testtools.ExpectedException(exceptions.InvalidZoneName): self.service._is_valid_zone_name(self.context, 'example.tld.') def test__is_valid_zone_name_invalid_same_as_tld(self): self.service._is_blacklisted_zone_name = Mock() with testtools.ExpectedException(exceptions.InvalidZoneName): self.service._is_valid_zone_name(self.context, 'com.com.') def test__is_valid_zone_name_invalid_tld(self): self.service._is_blacklisted_zone_name = Mock() with testtools.ExpectedException(exceptions.InvalidZoneName): self.service._is_valid_zone_name(self.context, 'tld.') def test__is_valid_zone_name_blacklisted(self): self.service._is_blacklisted_zone_name = Mock( side_effect=exceptions.InvalidZoneName) with testtools.ExpectedException(exceptions.InvalidZoneName): self.service._is_valid_zone_name(self.context, 'valid.com.') def test__is_blacklisted_zone_name(self): self.service.storage.find_blacklists.return_value = [ RoObject(pattern='a'), RoObject(pattern='b') ] blacklist_tests = ( ('example.org', True), ('example.net', True), ('hi', False), ('', False) ) for zone, expected in blacklist_tests: self.assertEqual( self.service._is_blacklisted_zone_name(self.context, zone), expected ) def test__is_valid_recordset_name(self): zone = RoObject(name='example.org.') self.service._is_valid_recordset_name(self.context, zone, 'foo..example.org.') def test__is_valid_recordset_name_no_dot(self): zone = RoObject(name='example.org.') with testtools.ExpectedException(ValueError): self.service._is_valid_recordset_name(self.context, zone, 'foo.example.org') def test__is_valid_recordset_name_too_long(self): zone = RoObject(name='example.org.') designate.central.service.cfg.CONF['service:central'].\ max_recordset_name_len = 255 rs_name = 'a' * 255 + '.org.' with testtools.ExpectedException(exceptions.InvalidRecordSetName) as e: self.service._is_valid_recordset_name(self.context, zone, rs_name) self.assertEqual(six.text_type(e), 'Name too long') def test__is_valid_recordset_name_wrong_zone(self): zone = RoObject(name='example.org.') with testtools.ExpectedException(exceptions.InvalidRecordSetLocation): self.service._is_valid_recordset_name(self.context, zone, 'foo.example.com.') def test_is_valid_recordset_placement_cname(self): zone = RoObject(name='example.org.') with testtools.ExpectedException(exceptions.InvalidRecordSetLocation) \ as e: self.service._is_valid_recordset_placement( self.context, zone, 'example.org.', 'CNAME', ) self.assertEqual( 'CNAME recordsets may not be created at the zone apex', six.text_type(e)) def test_is_valid_recordset_placement_failing(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service.storage.find_recordsets.return_value = [ RoObject(id=CentralZoneTestCase.recordset__id) ] with testtools.ExpectedException(exceptions.InvalidRecordSetLocation) \ as e: self.service._is_valid_recordset_placement( self.context, zone, 'example.org.', 'A', ) self.assertEqual( 'CNAME recordsets may not share a name with any other records', six.text_type(e)) def test_is_valid_recordset_placement_failing_2(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service.storage.find_recordsets.return_value = [ RoObject(), RoObject() ] with testtools.ExpectedException(exceptions.InvalidRecordSetLocation) \ as e: self.service._is_valid_recordset_placement( self.context, zone, 'example.org.', 'A', ) self.assertEqual( 'CNAME recordsets may not share a name with any other records', six.text_type(e)) def test_is_valid_recordset_placement(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service.storage.find_recordsets.return_value = [] ret = self.service._is_valid_recordset_placement( self.context, zone, 'example.org.', 'A', ) self.assertTrue(ret) def test__is_valid_recordset_placement_subzone(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service._is_valid_recordset_placement_subzone( self.context, zone, 'example.org.' ) def test__is_valid_recordset_placement_subzone_2(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service._is_valid_recordset_name = Mock(side_effect=Exception) self.service.storage.find_zones.return_value = [ RoObject(name='foo.example.org.') ] self.service._is_valid_recordset_placement_subzone( self.context, zone, 'bar.example.org.' ) def test__is_valid_recordset_placement_subzone_failing(self): zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id) self.service._is_valid_recordset_name = Mock() self.service.storage.find_zones.return_value = [ RoObject(name='foo.example.org.') ] with testtools.ExpectedException(exceptions.InvalidRecordSetLocation): self.service._is_valid_recordset_placement_subzone( self.context, zone, 'bar.example.org.' ) def test_is_valid_recordset_records(self): recordset = RoObject( records=[ 'ww1.example.com.', 'ww2.example.com.' ], type='CNAME' ) with testtools.ExpectedException(exceptions.BadRequest): self.service._is_valid_recordset_records( recordset ) def test__is_superzone(self): self.service.storage.find_zones = Mock() self.service._is_superzone(self.context, 'example.org.', '1') _class_self_, crit = self.service.storage.find_zones.call_args[0] self.assertEqual({'name': '%.example.org.', 'pool_id': '1'}, crit) @patch('designate.central.service.utils.increment_serial') def FIXME_test__increment_zone_serial(self, utils_inc_ser): fixtures.MockPatch('designate.central.service.utils.increment_serial') zone = RoObject(serial=1) self.service._increment_zone_serial(self.context, zone) def test__create_ns(self): self.service._create_recordset_in_storage = Mock(return_value=(0, 0)) self.service._create_ns( self.context, RoObject(type='PRIMARY', name='example.org.'), [RoObject(), RoObject(), RoObject()] ) ctx, zone, rset = \ self.service._create_recordset_in_storage.call_args[0] self.assertEqual('example.org.', rset.name) self.assertEqual('NS', rset.type) self.assertEqual(3, len(rset.records)) self.assertTrue(rset.records[0].managed) def test__create_ns_skip(self): self.service._create_recordset_in_storage = Mock() self.service._create_ns( self.context, RoObject(type='SECONDARY', name='example.org.'), [], ) self.assertFalse(self.service._create_recordset_in_storage.called) def test__add_ns_creation(self): self.service._create_ns = Mock() self.service.find_recordsets = Mock( return_value=[] ) self.service._add_ns( self.context, RoObject(id=CentralZoneTestCase.zone__id), RoObject(name='bar') ) ctx, zone, records = self.service._create_ns.call_args[0] self.assertTrue(len(records), 1) def test__add_ns(self): self.service._update_recordset_in_storage = Mock() recordsets = [ RoObject(records=objects.RecordList.from_list([]), managed=True) ] self.service.find_recordsets = Mock( return_value=recordsets ) self.service._add_ns( self.context, RoObject(id=CentralZoneTestCase.zone__id), RoObject(name='bar') ) ctx, zone, rset = \ self.service._update_recordset_in_storage.call_args[0] self.assertEqual(len(rset.records), 1) self.assertTrue(rset.records[0].managed) self.assertEqual('bar', rset.records[0].data.name) def test__add_ns_with_other_ns_rs(self): self.service._update_recordset_in_storage = Mock() recordsets = [ RoObject(records=objects.RecordList.from_list([]), managed=True), RoObject(records=objects.RecordList.from_list([]), managed=False) ] self.service.find_recordsets = Mock( return_value=recordsets ) self.service._add_ns( self.context, RoObject(id=CentralZoneTestCase.zone__id), RoObject(name='bar') ) ctx, zone, rset = \ self.service._update_recordset_in_storage.call_args[0] self.assertEqual(1, len(rset.records)) self.assertTrue(rset.records[0].managed) self.assertEqual('bar', rset.records[0].data.name) def test_create_zone_no_servers(self): self.service._enforce_zone_quota = Mock() self.service._is_valid_zone_name = Mock() self.service._is_valid_ttl = Mock() self.service._is_subzone = Mock( return_value=False ) self.service._is_superzone = Mock( return_value=[] ) self.service.storage.get_pool.return_value = RoObject( ns_records=[] ) self.useFixture( fixtures.MockPatchObject( self.service.storage, 'find_pools', return_value=objects.PoolList.from_list( [ {'id': '94ccc2c1-d751-44fe-b57f-8894c9f5c842'} ] ) ) ) with testtools.ExpectedException(exceptions.NoServersConfigured): self.service.create_zone( self.context, objects.Zone(tenant_id='1', name='example.com.', ttl=60, pool_id=CentralZoneTestCase.pool__id) ) def test_create_zone(self): self.service._enforce_zone_quota = Mock() self.service._create_zone_in_storage = Mock( return_value=objects.Zone( name='example.com.', type='PRIMARY', ) ) self.service._is_valid_zone_name = Mock() self.service._is_valid_ttl = Mock() self.service._is_subzone = Mock( return_value=False ) self.service._is_superzone = Mock( return_value=[] ) self.service.storage.get_pool.return_value = RoObject( ns_records=[RoObject()] ) self.useFixture( fixtures.MockPatchObject( self.service.storage, 'find_pools', return_value=objects.PoolList.from_list( [ {'id': '94ccc2c1-d751-44fe-b57f-8894c9f5c842'} ] ) ) ) # self.service.create_zone = unwrap(self.service.create_zone) out = self.service.create_zone( self.context, objects.Zone( tenant_id='1', name='example.com.', ttl=60, pool_id=CentralZoneTestCase.pool__id, refresh=0, type='PRIMARY' ) ) self.assertEqual('example.com.', out.name) def test_get_zone(self): self.service.storage.get_zone.return_value = RoObject( name='foo', tenant_id='2', ) self.service.get_zone(self.context, CentralZoneTestCase.zone__id) n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual(CentralZoneTestCase.zone__id, target['zone_id']) self.assertEqual('foo', target['zone_name']) self.assertEqual('2', target['tenant_id']) def test_get_zone_servers(self): self.service.storage.get_zone.return_value = RoObject( name='foo', tenant_id='2', pool_id=CentralZoneTestCase.pool__id, ) self.service.get_zone_ns_records( self.context, zone_id=CentralZoneTestCase.zone__id ) ctx, pool_id = self.service.storage.get_pool.call_args[0] self.assertEqual(CentralZoneTestCase.pool__id, pool_id) def test_find_zones(self): self.context = RoObject(tenant='t') self.service.storage.find_zones = Mock() self.service.find_zones(self.context) assert self.service.storage.find_zones.called pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('find_zones', pcheck) def test_find_zone(self): self.context = RoObject(tenant='t') self.service.storage.find_zone = Mock() self.service.find_zone(self.context) assert self.service.storage.find_zone.called pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('find_zone', pcheck) def test_delete_zone_has_subzone(self): self.context.abandon = False self.service.storage.get_zone.return_value = RoObject( name='foo', tenant_id='2', ) self.service.storage.count_zones.return_value = 2 with testtools.ExpectedException(exceptions.ZoneHasSubZone): self.service.delete_zone(self.context, CentralZoneTestCase.zone__id) pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('delete_zone', pcheck) def test_delete_zone_abandon(self): self.service.storage.get_zone.return_value = RoObject( name='foo', tenant_id='2', id=CentralZoneTestCase.zone__id_2 ) designate.central.service.policy = mock.NonCallableMock(spec_set=[ 'reset', 'set_rules', 'init', 'check', ]) self.context.abandon = True self.service.storage.count_zones.return_value = 0 self.service.delete_zone(self.context, CentralZoneTestCase.zone__id) assert self.service.storage.delete_zone.called assert not self.service.pool_manager_api.delete_zone.called pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('abandon_zone', pcheck) def test_delete_zone(self): self.context.abandon = False self.service.storage.get_zone.return_value = RoObject( name='foo', tenant_id='2', ) self.service._delete_zone_in_storage = Mock( return_value=RoObject( name='foo' ) ) self.service.storage.count_zones.return_value = 0 out = self.service.delete_zone(self.context, CentralZoneTestCase.zone__id) assert not self.service.storage.delete_zone.called assert self.service.zone_api.delete_zone.called assert designate.central.service.policy.check.called ctx, deleted_dom = \ self.service.zone_api.delete_zone.call_args[0] self.assertEqual('foo', deleted_dom.name) self.assertEqual('foo', out.name) pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('delete_zone', pcheck) def test__delete_zone_in_storage(self): self.service._delete_zone_in_storage( self.context, RwObject(action='', status=''), ) d = self.service.storage.update_zone.call_args[0][1] self.assertEqual('DELETE', d.action) self.assertEqual('PENDING', d.status) def test__xfr_zone_secondary(self): self.service.storage.get_zone.return_value = RoObject( name='example.org.', tenant_id='2', type='SECONDARY', masters=[RoObject(host='10.0.0.1', port=53)], serial=1, ) with fx_mdns_api: self.service.mdns_api.get_serial_number.return_value = \ "SUCCESS", 2, 1 self.service.xfr_zone(self.context, CentralZoneTestCase.zone__id) assert self.service.mdns_api.perform_zone_xfr.called assert designate.central.service.policy.check.called self.assertEqual( 'xfr_zone', designate.central.service.policy.check.call_args[0][0] ) def test__xfr_zone_not_secondary(self): self.service.storage.get_zone.return_value = RoObject( name='example.org.', tenant_id='2', type='PRIMARY' ) with testtools.ExpectedException(exceptions.BadRequest): self.service.xfr_zone(self.context, CentralZoneTestCase.zone__id) def test_count_report(self): self.service.count_zones = Mock(return_value=1) self.service.count_records = Mock(return_value=2) self.service.count_tenants = Mock(return_value=3) reports = self.service.count_report( self.context, criterion=None ) self.assertEqual([{'zones': 1, 'records': 2, 'tenants': 3}], reports) def test_count_report_zones(self): self.service.count_zones = Mock(return_value=1) self.service.count_records = Mock(return_value=2) self.service.count_tenants = Mock(return_value=3) reports = self.service.count_report( self.context, criterion='zones' ) self.assertEqual([{'zones': 1}], reports) def test_count_report_records(self): self.service.count_zones = Mock(return_value=1) self.service.count_records = Mock(return_value=2) self.service.count_tenants = Mock(return_value=3) reports = self.service.count_report( self.context, criterion='records' ) self.assertEqual([{'records': 2}], reports) def test_count_report_tenants(self): self.service.count_zones = Mock(return_value=1) self.service.count_records = Mock(return_value=2) self.service.count_tenants = Mock(return_value=3) reports = self.service.count_report( self.context, criterion='tenants' ) self.assertEqual([{'tenants': 3}], reports) def test_count_report_not_found(self): self.service.count_zones = Mock(return_value=1) self.service.count_records = Mock(return_value=2) self.service.count_tenants = Mock(return_value=3) with testtools.ExpectedException(exceptions.ReportNotFound): self.service.count_report( self.context, criterion='bogus' ) def _test_touch_zone(self, worker_enabled=True): if not worker_enabled: self.config( enabled="False", group="service:worker" ) self.service._touch_zone_in_storage = Mock() self.service.storage.get_zone.return_value = RoObject( name='example.org.', tenant_id='2', ) if worker_enabled: with fx_worker: self.service.touch_zone(self.context, CentralZoneTestCase.zone__id) else: with fx_pool_manager: self.service.touch_zone(self.context, CentralZoneTestCase.zone__id) assert designate.central.service.policy.check.called self.assertEqual( 'touch_zone', designate.central.service.policy.check.call_args[0][0] ) def test_touch_zone_with_worker_model(self): self._test_touch_zone(worker_enabled=True) def test_touch_zone_with_pool_manager_model(self): self._test_touch_zone(worker_enabled=False) def test_get_recordset_not_found(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id, ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id_2 ) with testtools.ExpectedException(exceptions.RecordSetNotFound): self.service.get_recordset( self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id ) def test_get_recordset(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', ) self.service.storage.get_recordset.return_value = objects.RecordSet( zone_id=CentralZoneTestCase.zone__id_2, zone_name='example.org.', id=CentralZoneTestCase.recordset__id ) self.service.get_recordset( self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id, ) self.assertEqual( 'get_recordset', designate.central.service.policy.check.call_args[0][0] ) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('get_recordset', t) self.assertEqual({ 'zone_id': CentralZoneTestCase.zone__id_2, 'zone_name': 'example.org.', 'recordset_id': CentralZoneTestCase.recordset__id, 'tenant_id': '2'}, target) def test_find_recordsets(self): self.context = Mock() self.context.tenant = 't' self.service.find_recordsets(self.context) assert self.service.storage.find_recordsets.called n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('find_recordsets', n) self.assertEqual({'tenant_id': 't'}, target) def test_find_recordset(self): self.context = Mock() self.context.tenant = 't' self.service.find_recordset(self.context) assert self.service.storage.find_recordset.called n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('find_recordset', n) self.assertEqual({'tenant_id': 't'}, target) def test_update_recordset_fail_on_changes(self): self.service.storage.get_zone.return_value = RoObject() recordset = Mock() recordset.obj_get_original_value.return_value = '1' recordset.obj_get_changes.return_value = ['tenant_id', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_recordset(self.context, recordset) recordset.obj_get_changes.return_value = ['zone_id', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_recordset(self.context, recordset) recordset.obj_get_changes.return_value = ['type', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_recordset(self.context, recordset) def test_update_recordset_action_delete(self): self.service.storage.get_zone.return_value = RoObject( action='DELETE', ) recordset = Mock() recordset.obj_get_changes.return_value = ['foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_recordset(self.context, recordset) def test_update_recordset_action_fail_on_managed(self): self.service.storage.get_zone.return_value = RoObject( type='foo', name='example.org.', tenant_id='2', action='bogus', ) recordset = Mock() recordset.obj_get_changes.return_value = ['foo'] recordset.managed = True self.context = Mock() self.context.edit_managed_records = False with testtools.ExpectedException(exceptions.BadRequest): self.service.update_recordset(self.context, recordset) def _test_update_recordset(self, worker_enabled=True): if not worker_enabled: self.config( enabled="False", group="service:worker" ) self.service.storage.get_zone.return_value = RoObject( type='foo', name='example.org.', tenant_id='2', action='bogus', ) recordset = Mock() recordset.obj_get_changes.return_value = ['foo'] recordset.obj_get_original_value.return_value =\ '9c85d9b0-1e9d-4e99-aede-a06664f1af2e' recordset.managed = False self.service._update_recordset_in_storage = Mock( return_value=('x', 'y') ) if worker_enabled: with fx_worker: self.service.update_recordset(self.context, recordset) else: with fx_pool_manager: self.service.update_recordset(self.context, recordset) assert self.service._update_recordset_in_storage.called n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('update_recordset', n) self.assertEqual({ 'zone_id': '9c85d9b0-1e9d-4e99-aede-a06664f1af2e', 'zone_name': 'example.org.', 'zone_type': 'foo', 'recordset_id': '9c85d9b0-1e9d-4e99-aede-a06664f1af2e', 'tenant_id': '2'}, target) def test_update_recordset_worker_model(self): self._test_update_recordset(worker_enabled=True) def test_update_recordset_pool_manager_model(self): self._test_update_recordset(worker_enabled=False) def test__update_recordset_in_storage(self): recordset = Mock() recordset.name = 'n' recordset.type = 't' recordset.id = CentralZoneTestCase.recordset__id recordset.obj_get_changes.return_value = {'ttl': 90} recordset.ttl = 90 recordset.records = [] self.service._is_valid_recordset_name = Mock() self.service._is_valid_recordset_placement = Mock() self.service._is_valid_recordset_placement_subzone = Mock() self.service._is_valid_ttl = Mock() self.service._update_zone_in_storage = Mock() self.service._update_recordset_in_storage( self.context, RoObject(serial=3), recordset, ) self.assertEqual( 'n', self.service._is_valid_recordset_name.call_args[0][2] ) self.assertEqual( ('n', 't', CentralZoneTestCase.recordset__id), self.service._is_valid_recordset_placement.call_args[0][2:] ) self.assertEqual( 'n', self.service._is_valid_recordset_placement_subzone. call_args[0][2] ) self.assertEqual( 90, self.service._is_valid_ttl.call_args[0][1] ) assert self.service.storage.update_recordset.called assert self.service._update_zone_in_storage.called def test__update_recordset_in_storage_2(self): recordset = Mock() recordset.name = 'n' recordset.type = 't' recordset.id = CentralZoneTestCase.recordset__id recordset.ttl = None recordset.obj_get_changes.return_value = {'ttl': None} recordset.records = [RwObject( action='a', status='s', serial=9, )] self.service._is_valid_recordset_name = Mock() self.service._is_valid_recordset_placement = Mock() self.service._is_valid_recordset_placement_subzone = Mock() self.service._update_zone_in_storage = Mock() self.service._enforce_record_quota = mock.Mock() self.service._update_recordset_in_storage( self.context, RoObject(serial=3), recordset, increment_serial=False, ) self.assertEqual( 'n', self.service._is_valid_recordset_name.call_args[0][2] ) self.assertEqual( ('n', 't', CentralZoneTestCase.recordset__id), self.service._is_valid_recordset_placement.call_args[0][2:] ) self.assertEqual( 'n', self.service._is_valid_recordset_placement_subzone. call_args[0][2] ) assert not self.service._update_zone_in_storage.called assert self.service.storage.update_recordset.called assert self.service._enforce_record_quota.called def test_delete_recordset_not_found(self): self.service.storage.get_zone.return_value = RoObject( action='bogus', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id, id=CentralZoneTestCase.recordset__id, managed=False, ) self.context = Mock() self.context.edit_managed_records = False with testtools.ExpectedException(exceptions.RecordSetNotFound): self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) def test_delete_recordset_action_delete(self): self.service.storage.get_zone.return_value = RoObject( action='DELETE', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id_2, id=CentralZoneTestCase.recordset__id, managed=False, ) self.context = Mock() self.context.edit_managed_records = False with testtools.ExpectedException(exceptions.BadRequest): self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) def test_delete_recordset_managed(self): self.service.storage.get_zone.return_value = RoObject( action='foo', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id_2, id=CentralZoneTestCase.recordset__id, managed=True, ) self.context = Mock() self.context.edit_managed_records = False with testtools.ExpectedException(exceptions.BadRequest): self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) def _test_delete_recordset(self, worker_enabled=True): if not worker_enabled: self.config( enabled="False", group="service:worker" ) mock_zone = RoObject( action='foo', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) mock_rs = objects.RecordSet( zone_id=CentralZoneTestCase.zone__id_2, zone_name='example.org.', id=CentralZoneTestCase.recordset__id, records=objects.RecordList.from_list([]), ) self.service.storage.get_zone.return_value = mock_zone self.service.storage.get_recordset.return_value = mock_rs self.context = Mock() self.context.edit_managed_records = False self.service._delete_recordset_in_storage = Mock( return_value=(mock_rs, mock_zone) ) if worker_enabled: with fx_worker: self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) assert self.service.zone_api.update_zone.called else: with fx_pool_manager: self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) assert self.service.zone_api.update_zone.called assert self.service._delete_recordset_in_storage.called def test_delete_recordset_worker(self): self._test_delete_recordset(worker_enabled=True) def test_delete_recordset_pool_manager(self): self._test_delete_recordset(worker_enabled=False) def test__delete_recordset_in_storage(self): def mock_uds(c, zone, inc): return zone self.service._update_zone_in_storage = mock_uds self.service._delete_recordset_in_storage( self.context, RoObject(serial=1), RoObject(id=2, records=[ RwObject( action='', status='', serial=0, ) ]) ) assert self.service.storage.update_recordset.called assert self.service.storage.delete_recordset.called rs = self.service.storage.update_recordset.call_args[0][1] self.assertEqual(1, len(rs.records)) self.assertEqual('DELETE', rs.records[0].action) self.assertEqual('PENDING', rs.records[0].status) self.assertEqual(1, rs.records[0].serial) def test__delete_recordset_in_storage_no_increment_serial(self): self.service._update_zone_in_storage = Mock() self.service._delete_recordset_in_storage( self.context, RoObject(serial=1), RoObject(id=2, records=[ RwObject( action='', status='', serial=0, ) ]), increment_serial=False, ) assert self.service.storage.update_recordset.called assert self.service.storage.delete_recordset.called assert not self.service._update_zone_in_storage.called def test_count_recordset(self): self.service.count_recordsets(self.context) n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('count_recordsets', n) self.assertEqual({'tenant_id': None}, target) self.assertEqual( {}, self.service.storage.count_recordsets.call_args[0][1] ) def test_create_record_fail_on_delete(self): self.service.storage.get_zone.return_value = RoObject( action='DELETE', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) with testtools.ExpectedException(exceptions.BadRequest): self.service.create_record( self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, RoObject(), ) def _test_create_record(self, worker_enabled=True): if not worker_enabled: self.config(enabled="False", group="service:worker") self.service._create_record_in_storage = Mock( return_value=(None, None) ) self.service.storage.get_zone.return_value = RoObject( action='a', id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id='2', type='foo', ) self.service.storage.get_recordset.return_value = RoObject( name='rs', ) if worker_enabled: with fx_worker: self.service.create_record( self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, RoObject()) assert self.service.zone_api.update_zone.called else: with fx_pool_manager: self.service.create_record( self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, RoObject()) assert self.service.zone_api.update_zone.called n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('create_record', n) self.assertEqual({ 'zone_id': CentralZoneTestCase.zone__id, 'zone_name': 'example.org.', 'zone_type': 'foo', 'recordset_id': CentralZoneTestCase.recordset__id, 'recordset_name': 'rs', 'tenant_id': '2'}, target) def test_create_record_worker(self): self._test_create_record(worker_enabled=True) def test_create_record_pool_manager(self): self._test_create_record(worker_enabled=False) def test__create_record_in_storage(self): self.service._enforce_record_quota = Mock() self.service._create_record_in_storage( self.context, RoObject(id=CentralZoneTestCase.zone__id, serial=4), RoObject(id=CentralZoneTestCase.recordset__id), RwObject( action='', status='', serial='', ), increment_serial=False ) ctx, did, rid, record = self.service.storage.create_record.call_args[0] self.assertEqual(CentralZoneTestCase.zone__id, did) self.assertEqual(CentralZoneTestCase.recordset__id, rid) self.assertEqual('CREATE', record.action) self.assertEqual('PENDING', record.status) self.assertEqual(4, record.serial) def test_get_record_not_found(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id_2, ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.recordset__id ) with testtools.ExpectedException(exceptions.RecordNotFound): self.service.get_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id, CentralZoneTestCase.record__id) def test_get_record_not_found_2(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id_2, name='example.org.', tenant_id=2, ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id_2, id=999, # not matching record.recordset_id name='foo' ) self.service.storage.get_record.return_value = RoObject( id=CentralZoneTestCase.record__id, zone_id=CentralZoneTestCase.zone__id_2, recordset_id=CentralZoneTestCase.recordset__id ) with testtools.ExpectedException(exceptions.RecordNotFound): self.service.get_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id, CentralZoneTestCase.record__id) def test_get_record(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id, name='example.org.', tenant_id=2, ) self.service.storage.get_recordset.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id, id=CentralZoneTestCase.recordset__id, name='foo' ) self.service.storage.get_record.return_value = RoObject( id=CentralZoneTestCase.record__id, zone_id=CentralZoneTestCase.zone__id, recordset_id=CentralZoneTestCase.recordset__id ) self.service.get_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id_2, CentralZoneTestCase.record__id_2) self.assertEqual( 'get_record', designate.central.service.policy.check.call_args[0][0] ) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('get_record', t) self.assertEqual({ 'zone_id': CentralZoneTestCase.zone__id_2, 'zone_name': 'example.org.', 'record_id': CentralZoneTestCase.record__id, 'recordset_id': CentralZoneTestCase.recordset__id_2, 'recordset_name': 'foo', 'tenant_id': 2}, target) def test_update_record_fail_on_changes(self): self.service.storage.get_zone.return_value = RoObject( action='a', name='n', type='t', tenant_id='tid', ) record = Mock() record.obj_get_original_value.return_value = 1 record.obj_get_changes.return_value = ['tenant_id', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_record(self.context, record) record.obj_get_changes.return_value = ['zone_id', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_record(self.context, record) record.obj_get_changes.return_value = ['recordset_id', 'foo'] with testtools.ExpectedException(exceptions.BadRequest): self.service.update_record(self.context, record) def test_update_record_action_delete(self): self.service.storage.get_zone.return_value = RoObject( action='DELETE', ) record = Mock() with testtools.ExpectedException(exceptions.BadRequest): self.service.update_record(self.context, record) def test_update_record_action_fail_on_managed(self): self.service.storage.get_zone.return_value = RoObject( action='a', name='n', tenant_id='tid', type='t', ) self.service.storage.get_recordset.return_value = RoObject( name='rsn', managed=True ) record = Mock() record.obj_get_changes.return_value = ['foo'] self.context = Mock() self.context.edit_managed_records = False with testtools.ExpectedException(exceptions.BadRequest): self.service.update_record(self.context, record) def _test_update_record(self, worker_enabled=True): if not worker_enabled: self.config(enabled="False", group="service:worker") self.service.storage.get_zone.return_value = RoObject( action='a', name='n', tenant_id='tid', type='t', ) self.service.storage.get_recordset.return_value = RoObject( name='rsn', managed=False ) record = Mock() record.obj_get_changes.return_value = ['foo'] record.obj_get_original_value.return_value =\ 'abc12a-1e9d-4e99-aede-a06664f1af2e' record.managed = False self.service._update_record_in_storage = Mock( return_value=('x', 'y') ) if worker_enabled: with fx_worker: self.service.update_record(self.context, record) else: with fx_pool_manager: self.service.update_record(self.context, record) assert self.service._update_record_in_storage.called n, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('update_record', n) self.assertEqual({ 'zone_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e', 'zone_name': 'n', 'zone_type': 't', 'record_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e', 'recordset_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e', 'recordset_name': 'rsn', 'tenant_id': 'tid'}, target) def test_update_record_worker(self): self._test_update_record(worker_enabled=True) def test_update_record_pool_manager(self): self._test_update_record(worker_enabled=False) def test__update_record_in_storage(self): self.service._update_zone_in_storage = Mock() self.service._update_record_in_storage( self.context, RoObject(serial=1), RwObject( action='', status='', serial='', ), increment_serial=False ) ctx, record = self.service.storage.update_record.call_args[0] self.assertEqual('UPDATE', record.action) self.assertEqual('PENDING', record.status) self.assertEqual(1, record.serial) def test_delete_record_action_delete(self): self.service.storage.get_zone.return_value = RoObject( action='DELETE', ) with testtools.ExpectedException(exceptions.BadRequest): self.service.delete_record(self.context, 1, 2, 3) def test_delete_record_not_found(self): self.service.storage.get_zone.return_value = RoObject( action='a', id=CentralZoneTestCase.zone__id ) self.service.storage.get_record.return_value = RoObject( zone_id=CentralZoneTestCase.record__id_2, ) self.service.storage.get_recordset.return_value = RoObject( id=CentralZoneTestCase.recordset__id_2, ) # zone.id != record.zone_id with testtools.ExpectedException(exceptions.RecordNotFound): self.service.delete_record(self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, CentralZoneTestCase.record__id) self.service.storage.get_record.return_value = RoObject( id=CentralZoneTestCase.record__id, zone_id=CentralZoneTestCase.zone__id, recordset_id=CentralZoneTestCase.recordset__id_3, ) # recordset.id != record.recordset_id with testtools.ExpectedException(exceptions.RecordNotFound): self.service.delete_record(self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, CentralZoneTestCase.record__id) def _test_delete_record(self, worker_enabled=True): if not worker_enabled: self.config(enabled="False", group="service:worker") self.service._delete_record_in_storage = Mock( return_value=(None, None) ) self.service.storage.get_zone.return_value = RoObject( action='a', id=CentralZoneTestCase.zone__id, name='dn', tenant_id='tid', type='t', ) self.service.storage.get_record.return_value = RoObject( id=CentralZoneTestCase.record__id_2, zone_id=CentralZoneTestCase.zone__id, recordset_id=CentralZoneTestCase.recordset__id, ) self.service.storage.get_recordset.return_value = RoObject( name='rsn', id=CentralZoneTestCase.recordset__id, managed=False, ) if worker_enabled: with fx_worker: self.service.delete_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id_2, CentralZoneTestCase.record__id) else: with fx_pool_manager: self.service.delete_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id_2, CentralZoneTestCase.record__id) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('delete_record', t) self.assertEqual({ 'zone_id': CentralZoneTestCase.zone__id_2, 'zone_name': 'dn', 'zone_type': 't', 'record_id': CentralZoneTestCase.record__id_2, 'recordset_id': CentralZoneTestCase.recordset__id_2, 'recordset_name': 'rsn', 'tenant_id': 'tid'}, target) def test_delete_record_worker(self): self._test_delete_record(worker_enabled=True) def test_delete_record_pool_manager(self): self._test_delete_record(worker_enabled=False) def test_delete_record_fail_on_managed(self): self.service._delete_record_in_storage = Mock( return_value=(None, None) ) self.service.storage.get_zone.return_value = RoObject( action='a', id=CentralZoneTestCase.zone__id, name='dn', tenant_id='tid', type='t', ) self.service.storage.get_record.return_value = RoObject( id=CentralZoneTestCase.record__id, zone_id=CentralZoneTestCase.zone__id, recordset_id=CentralZoneTestCase.recordset__id, ) self.service.storage.get_recordset.return_value = RoObject( name='rsn', id=CentralZoneTestCase.recordset__id, managed=True, ) self.context = Mock() self.context.edit_managed_records = False with fx_pool_manager: with testtools.ExpectedException(exceptions.BadRequest): self.service.delete_record(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id_2, CentralZoneTestCase.record__id_2) def test__delete_record_in_storage(self): self.service._delete_record_in_storage( self.context, RoObject(serial=2), RwObject(action='', status='', serial=''), increment_serial=False ) r = self.service.storage.update_record.call_args[0][1] self.assertEqual('DELETE', r.action) self.assertEqual('PENDING', r.status) self.assertEqual(2, r.serial) def test_count_records(self): self.service.count_records(self.context) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('count_records', t) self.assertEqual({'tenant_id': None}, target) def test_sync_zones(self): self.service._sync_zone = Mock() self.service.storage.find_zones.return_value = [ RoObject(id=CentralZoneTestCase.zone__id), RoObject(id=CentralZoneTestCase.zone__id_2) ] res = self.service.sync_zones(self.context) t, ctx = designate.central.service.policy.check.call_args[0] self.assertEqual('diagnostics_sync_zones', t) self.assertEqual(2, len(res)) def test_sync_zone(self): self.service._sync_zone = Mock() self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id, name='n', tenant_id='tid', ) self.service.sync_zone(self.context, CentralZoneTestCase.zone__id) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('diagnostics_sync_zone', t) self.assertEqual({'tenant_id': 'tid', 'zone_id': CentralZoneTestCase.zone__id, 'zone_name': 'n'}, target) def test_sync_record(self): self.service.storage.get_zone.return_value = RoObject( id=CentralZoneTestCase.zone__id, name='n', tenant_id='tid', ) self.service.storage.get_recordset.return_value = RoObject( name='n', ) self.service.sync_record(self.context, CentralZoneTestCase.zone__id, CentralZoneTestCase.recordset__id, CentralZoneTestCase.record__id) t, ctx, target = designate.central.service.policy.check.call_args[0] self.assertEqual('diagnostics_sync_record', t) self.assertEqual({ 'zone_id': CentralZoneTestCase.zone__id, 'zone_name': 'n', 'record_id': CentralZoneTestCase.record__id, 'recordset_id': CentralZoneTestCase.recordset__id, 'recordset_name': 'n', 'tenant_id': 'tid'}, target) def test_ping(self): self.service.storage.ping.return_value = True r = self.service.ping(self.context) self.assertEqual({'status': None}, r['backend']) self.assertTrue(r['status']) self.assertTrue(r['storage']) def test_ping2(self): self.service.storage.ping.return_value = False r = self.service.ping(self.context) self.assertEqual({'status': None}, r['backend']) self.assertFalse(r['status']) self.assertFalse(r['storage']) def test__determine_floatingips(self): self.context = Mock() self.context.tenant = 'tnt' self.service.find_records = Mock(return_value=[ RoObject(managed_extra='') ]) fips = {} data, invalid = self.service._determine_floatingips(self.context, fips) self.assertEqual({}, data) self.assertEqual([], invalid) def test__determine_floatingips_with_data(self): self.context = Mock() self.context.tenant = 2 self.service.find_records = Mock(return_value=[ RoObject(managed_extra=1, managed_tenant_id=1), RoObject(managed_extra=2, managed_tenant_id=2), ]) fips = { 'k': {'address': 1}, 'k2': {'address': 2}, } data, invalid = self.service._determine_floatingips(self.context, fips) self.assertEqual(1, len(invalid)) self.assertEqual(1, invalid[0].managed_tenant_id) self.assertEqual(data['k'], ({'address': 1}, None)) def test__generate_soa_refresh_interval(self): with random_seed(42): refresh_time = self.service._generate_soa_refresh_interval() self.assertEqual(3563, refresh_time) class IsSubzoneTestCase(CentralBasic): def setUp(self): super(IsSubzoneTestCase, self).setUp() def find_zone(ctx, criterion): LOG.debug("Calling find_zone on %r" % criterion) if criterion['name'] == 'example.com.': LOG.debug("Returning %r" % criterion['name']) return criterion['name'] LOG.debug("Not found") raise exceptions.ZoneNotFound self.service.storage.find_zone = find_zone def test__is_subzone_false(self): r = self.service._is_subzone(self.context, 'com', CentralZoneTestCase.pool__id) self.assertFalse(r) def FIXME_test__is_subzone_false2(self): r = self.service._is_subzone(self.context, 'com.', CentralZoneTestCase.pool__id) self.assertEqual('com.', r) def FIXME_test__is_subzone_false3(self): r = self.service._is_subzone(self.context, 'example.com.', CentralZoneTestCase.pool__id) self.assertEqual('example.com.', r) def test__is_subzone_false4(self): r = self.service._is_subzone(self.context, 'foo.a.b.example.com.', CentralZoneTestCase.pool__id) self.assertEqual('example.com.', r) class CentralZoneExportTests(CentralBasic): def setUp(self): super(CentralZoneExportTests, self).setUp() def storage_find_tld(c, d): if d['name'] not in ('org',): raise exceptions.TldNotFound self.service.storage.find_tld = storage_find_tld def test_create_zone_export(self): self.context = Mock() self.context.tenant = 't' self.service.storage.get_zone.return_value = RoObject( name='example.com.', id=CentralZoneTestCase.zone__id ) self.service.storage.create_zone_export = Mock( return_value=RwObject( id=CentralZoneTestCase.zone_export__id, zone_id=CentralZoneTestCase.zone__id, task_type='EXPORT', status='PENDING', message=None, tenant_id='t', location=None, ) ) self.service.zone_api.start_zone_export = Mock() out = self.service.create_zone_export( self.context, CentralZoneTestCase.zone_export__id ) self.assertEqual(CentralZoneTestCase.zone__id, out.zone_id) self.assertEqual('PENDING', out.status) self.assertEqual('EXPORT', out.task_type) self.assertIsNone(out.message) self.assertEqual('t', out.tenant_id) def test_get_zone_export(self): self.context = Mock() self.context.tenant = 't' self.service.storage.get_zone_export.return_value = RoObject( zone_id=CentralZoneTestCase.zone__id, task_type='EXPORT', status='PENDING', message=None, tenant_id='t' ) out = self.service.get_zone_export(self.context, CentralZoneTestCase.zone_export__id) n, ctx, target = designate.central.service.policy.check.call_args[0] # Check arguments to policy self.assertEqual('t', target['tenant_id']) # Check output self.assertEqual(CentralZoneTestCase.zone__id, out.zone_id) self.assertEqual('PENDING', out.status) self.assertEqual('EXPORT', out.task_type) self.assertIsNone(out.message) self.assertEqual('t', out.tenant_id) def test_find_zone_exports(self): self.context = Mock() self.context.tenant = 't' self.service.storage.find_zone_exports = Mock() self.service.find_zone_exports(self.context) assert self.service.storage.find_zone_exports.called pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('find_zone_exports', pcheck) def test_delete_zone_export(self): self.context = Mock() self.context.tenant = 't' self.service.storage.delete_zone_export = Mock( return_value=RoObject( zone_id=CentralZoneTestCase.zone__id, task_type='EXPORT', status='PENDING', message=None, tenant_id='t' ) ) out = self.service.delete_zone_export(self.context, CentralZoneTestCase.zone_export__id) assert self.service.storage.delete_zone_export.called self.assertEqual(CentralZoneTestCase.zone__id, out.zone_id) self.assertEqual('PENDING', out.status) self.assertEqual('EXPORT', out.task_type) self.assertIsNone(out.message) self.assertEqual('t', out.tenant_id) assert designate.central.service.policy.check.called pcheck, ctx, target = \ designate.central.service.policy.check.call_args[0] self.assertEqual('delete_zone_export', pcheck) self.assertEqual(pcheck, 'delete_zone_export') class CentralStatusTests(CentralBasic): def test__update_zone_or_record_status_no_zone(self): zone = RwObject( action='UPDATE', status='SUCCESS', serial=0, ) dom, deleted = self.service.\ _update_zone_or_record_status(zone, 'NO_ZONE', 0) self.assertEqual(dom.action, 'CREATE') self.assertEqual(dom.status, 'ERROR') class CentralQuotaTest(unittest.TestCase): def setUp(self): self.context = mock.Mock() self.zone = mock.Mock() @patch('designate.central.service.storage') @patch('designate.central.service.quota') def test_zone_record_quota_allows_lowering_value(self, quota, storage): service = Service() service.storage.count_records.return_value = 10 recordset = mock.Mock() recordset.managed = False recordset.records = ['1.1.1.%i' % (i + 1) for i in range(5)] service._enforce_record_quota( self.context, self.zone, recordset ) # Ensure we check against the number of records that will # result in the API call. The 5 value is as if there were 10 # unmanaged records unders a single recordset. We find 10 # total - 10 for the recordset being passed in and add the 5 # from the new recordset. check_zone_records = mock.call( self.context, self.zone.tenant_id, zone_records=10 - 10 + 5 ) assert check_zone_records in service.quota.limit_check.mock_calls # Check the recordset limit as well check_recordset_records = mock.call( self.context, self.zone.tenant_id, recordset_records=10 ) assert check_recordset_records in service.quota.limit_check.mock_calls