diff --git a/designate/central/service.py b/designate/central/service.py index 690fd39b..3b3fe543 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -32,9 +32,7 @@ from dns import exception as dnsexception from oslo_config import cfg import oslo_messaging as messaging from oslo_log import log as logging -from oslo_utils import excutils from oslo_concurrency import lockutils -from oslo_db import exception as db_exception from designate.i18n import _LI from designate.i18n import _LC @@ -52,89 +50,13 @@ from designate import utils from designate import storage from designate.mdns import rpcapi as mdns_rpcapi from designate.pool_manager import rpcapi as pool_manager_rpcapi +from designate.storage import transaction from designate.zone_manager import rpcapi as zone_manager_rpcapi LOG = logging.getLogger(__name__) ZONE_LOCKS = threading.local() NOTIFICATION_BUFFER = threading.local() -RETRY_STATE = threading.local() - - -def _retry_on_deadlock(exc): - """Filter to trigger retry a when a Deadlock is received.""" - # TODO(kiall): This is a total leak of the SQLA Driver, we'll need a better - # way to handle this. - if isinstance(exc, db_exception.DBDeadlock): - LOG.warning(_LW("Deadlock detected. Retrying...")) - return True - return False - - -def retry(cb=None, retries=50, delay=150): - """A retry decorator that ignores attempts at creating nested retries""" - def outer(f): - @functools.wraps(f) - def retry_wrapper(self, *args, **kwargs): - if not hasattr(RETRY_STATE, 'held'): - # Create the state vars if necessary - RETRY_STATE.held = False - RETRY_STATE.retries = 0 - - if not RETRY_STATE.held: - # We're the outermost retry decorator - RETRY_STATE.held = True - - try: - while True: - try: - result = f(self, *copy.deepcopy(args), - **copy.deepcopy(kwargs)) - break - except Exception as exc: - RETRY_STATE.retries += 1 - if RETRY_STATE.retries >= retries: - # Exceeded retry attempts, raise. - raise - elif cb is not None and cb(exc) is False: - # We're not setup to retry on this exception. - raise - else: - # Retry, with a delay. - time.sleep(delay / float(1000)) - - finally: - RETRY_STATE.held = False - RETRY_STATE.retries = 0 - - else: - # We're an inner retry decorator, just pass on through. - result = f(self, *copy.deepcopy(args), **copy.deepcopy(kwargs)) - - return result - retry_wrapper.__wrapped_function = f - retry_wrapper.__wrapper_name = 'retry' - return retry_wrapper - return outer - - -# TODO(kiall): Get this a better home :) -def transaction(f): - @retry(cb=_retry_on_deadlock) - @functools.wraps(f) - def transaction_wrapper(self, *args, **kwargs): - self.storage.begin() - try: - result = f(self, *args, **kwargs) - self.storage.commit() - return result - except Exception: - with excutils.save_and_reraise_exception(): - self.storage.rollback() - - transaction_wrapper.__wrapped_function = f - transaction_wrapper.__wrapper_name = 'transaction' - return transaction_wrapper def synchronized_zone(zone_arg=1, new_zone=False): @@ -422,8 +344,7 @@ class Service(service.RPCService, service.Service): criterion = criterion or {} - context = context.elevated() - context.all_tenants = True + context = context.elevated(all_tenants=True) if zone.name == recordset_name: return @@ -495,8 +416,7 @@ class Service(service.RPCService, service.Service): Ensures the provided zone_name is the subzone of an existing zone (checks across all tenants) """ - context = context.elevated() - context.all_tenants = True + context = context.elevated(all_tenants=True) # Break the name up into it's component labels labels = zone_name.split(".") @@ -523,8 +443,7 @@ class Service(service.RPCService, service.Service): Ensures the provided zone_name is the parent zone of an existing subzone (checks across all tenants) """ - context = context.elevated() - context.all_tenants = True + context = context.elevated(all_tenants=True) # Create wildcard term to catch all subzones search_term = "%%.%(name)s" % {"name": zone_name} @@ -571,14 +490,9 @@ class Service(service.RPCService, service.Service): zone['minimum']) def _create_soa(self, context, zone): - # Need elevated context to get the pool - elevated_context = context.elevated() - elevated_context.all_tenants = True + pool_ns_records = self._get_pool_ns_records(context, zone.pool_id) - # Get the pool for it's list of ns_records - pool = self.storage.get_pool(elevated_context, zone.pool_id) - - soa_values = [self._build_soa_record(zone, pool.ns_records)] + soa_values = [self._build_soa_record(zone, pool_ns_records)] recordlist = objects.RecordList(objects=[ objects.Record(data=r, managed=True) for r in soa_values]) values = { @@ -596,18 +510,14 @@ class Service(service.RPCService, service.Service): if zone.type != 'PRIMARY': return - # Need elevated context to get the pool - elevated_context = context.elevated() - elevated_context.all_tenants = True - # Get the pool for it's list of ns_records - pool = self.storage.get_pool(elevated_context, zone.pool_id) + pool_ns_records = self._get_pool_ns_records(context, zone.pool_id) soa = self.find_recordset(context, criterion={'zone_id': zone['id'], 'type': "SOA"}) - soa.records[0].data = self._build_soa_record(zone, pool.ns_records) + soa.records[0].data = self._build_soa_record(zone, pool_ns_records) self._update_recordset_in_storage(context, zone, soa, increment_serial=False) @@ -891,14 +801,25 @@ class Service(service.RPCService, service.Service): refresh_time = cfg.CONF.default_soa_refresh_min + dispersion return int(refresh_time) + def _get_pool_ns_records(self, context, pool_id): + """Get pool ns_records using an elevated context and all_tenants = True + + :param pool_id: Pool ID + :returns: ns_records + """ + elevated_context = context.elevated(all_tenants=True) + pool = self.storage.get_pool(elevated_context, pool_id) + return pool.ns_records + @notification('dns.domain.create') @notification('dns.zone.create') @synchronized_zone(new_zone=True) def create_zone(self, context, zone): - # TODO(kiall): Refactor this method into *MUCH* smaller chunks. + """Create zone: perform checks and then call _create_zone() + """ + # Default to creating in the current users tenant - if zone.tenant_id is None: - zone.tenant_id = context.tenant + zone.tenant_id = zone.tenant_id or context.tenant target = { 'tenant_id': zone.tenant_id, @@ -934,29 +855,34 @@ class Service(service.RPCService, service.Service): # Handle super-zones appropriately subzones = self._is_superzone(context, zone.name, zone.pool_id) + msg = 'Unable to create zone because another tenant owns a ' \ + 'subzone of the zone' if subzones: LOG.debug("Zone '{0}' is a superzone.".format(zone.name)) for subzone in subzones: if subzone.tenant_id != zone.tenant_id: - raise exceptions.IllegalParentZone('Unable to create ' - 'zone because another ' - 'tenant owns a subzone ' - 'of the zone') + raise exceptions.IllegalParentZone(msg) + # If this succeeds, subzone parent IDs will be updated # after zone is created # NOTE(kiall): Fetch the servers before creating the zone, this way # we can prevent zone creation if no servers are # configured. - elevated_context = context.elevated() - elevated_context.all_tenants = True - pool = self.storage.get_pool(elevated_context, zone.pool_id) - if len(pool.ns_records) == 0: + pool_ns_records = self._get_pool_ns_records(context, zone.pool_id) + if len(pool_ns_records) == 0: LOG.critical(_LC('No nameservers configured. ' 'Please create at least one nameserver')) raise exceptions.NoServersConfigured() + # End of pre-flight checks, create zone + return self._create_zone(context, zone, subzones) + + def _create_zone(self, context, zone, subzones): + """Create zone straight away + """ + if zone.type == 'SECONDARY' and zone.serial is None: zone.serial = 1 @@ -1036,8 +962,7 @@ class Service(service.RPCService, service.Service): policy.check('get_zone_ns_records', context, target) # Need elevated context to get the pool - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) # Get the pool for it's list of ns_records pool = self.storage.get_pool(elevated_context, pool_id) @@ -1064,7 +989,10 @@ class Service(service.RPCService, service.Service): @notification('dns.zone.update') @synchronized_zone() def update_zone(self, context, zone, increment_serial=True): - # TODO(kiall): Refactor this method into *MUCH* smaller chunks. + """Update zone. Perform checks and then call _update_zone() + + :returns: updated zone + """ target = { 'zone_id': zone.obj_get_original_value('id'), 'zone_name': zone.obj_get_original_value('name'), @@ -1091,6 +1019,11 @@ class Service(service.RPCService, service.Service): if ttl is not None: self._is_valid_ttl(context, ttl) + return self._update_zone(context, zone, increment_serial, changes) + + def _update_zone(self, context, zone, increment_serial, changes): + """Update zone + """ zone = self._update_zone_in_storage( context, zone, increment_serial=increment_serial) @@ -1874,10 +1807,8 @@ class Service(service.RPCService, service.Service): """ tenant_id = tenant_id or context.tenant - elevated_context = context.elevated() - elevated_context.all_tenants = True - elevated_context.edit_managed_records = True - + elevated_context = context.elevated(all_tenants=True, + edit_managed_records=True) criterion = { 'managed': True, 'managed_resource_type': 'ptr:floatingip', @@ -1915,10 +1846,8 @@ class Service(service.RPCService, service.Service): """ Utility method to delete a list of records. """ - elevated_context = context.elevated() - elevated_context.all_tenants = True - elevated_context.edit_managed_records = True - + elevated_context = context.elevated(all_tenants=True, + edit_managed_records=True) if len(records) > 0: for r in records: msg = 'Deleting record %s for FIP %s' @@ -1931,8 +1860,7 @@ class Service(service.RPCService, service.Service): Given a list of FloatingIP and Record tuples we look through creating a new dict of FloatingIPs """ - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) fips = objects.FloatingIPList() for key, value in data.items(): @@ -2006,9 +1934,8 @@ class Service(service.RPCService, service.Service): token pr Neutron in the SC B) We lookup FIPs using the configured values for this deployment. """ - elevated_context = context.elevated() - elevated_context.all_tenants = True - elevated_context.edit_managed_records = True + elevated_context = context.elevated(all_tenants=True, + edit_managed_records=True) tenant_fips = self._list_floatingips(context) @@ -2023,8 +1950,7 @@ class Service(service.RPCService, service.Service): """ Get Floating IP PTR """ - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) tenant_fips = self._list_floatingips(context, region=region) @@ -2042,9 +1968,8 @@ class Service(service.RPCService, service.Service): Set the FloatingIP's PTR record based on values. """ - elevated_context = context.elevated() - elevated_context.all_tenants = True - elevated_context.edit_managed_records = True + elevated_context = context.elevated(all_tenants=True, + edit_managed_records=True) tenant_fips = self._list_floatingips(context, region=region) @@ -2143,10 +2068,8 @@ class Service(service.RPCService, service.Service): We find the record based on the criteria and delete it or raise. """ - elevated_context = context.elevated() - elevated_context.all_tenants = True - elevated_context.edit_managed_records = True - + elevated_context = context.elevated(all_tenants=True, + edit_managed_records=True) criterion = { 'managed_resource_id': floatingip_id, 'managed_tenant_id': context.tenant @@ -2277,15 +2200,14 @@ class Service(service.RPCService, service.Service): # Since these are treated as mutable objects, we're only going to # be comparing the nameserver.value which is the FQDN if pool.obj_attr_is_set('ns_records'): - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) # TODO(kiall): ListObjects should be able to give you their # original set of values. - original_pool = self.storage.get_pool(elevated_context, pool.id) - + original_pool_ns_records = self._get_pool_ns_records(context, + pool.id) # Find the current NS hostnames - existing_ns = set([n.hostname for n in original_pool.ns_records]) + existing_ns = set([n.hostname for n in original_pool_ns_records]) # Find the desired NS hostnames request_ns = set([n.hostname for n in pool.ns_records]) @@ -2331,8 +2253,7 @@ class Service(service.RPCService, service.Service): policy.check('delete_pool', context) # Make sure that there are no existing zones in the pool - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) zones = self.find_zones( context=elevated_context, criterion={'pool_id': pool_id, 'action': '!DELETE'}) @@ -2480,9 +2401,6 @@ class Service(service.RPCService, service.Service): @transaction def create_zone_transfer_request(self, context, zone_transfer_request): - elevated_context = context.elevated() - elevated_context.all_tenants = True - # get zone zone = self.get_zone(context, zone_transfer_request.zone_id) @@ -2508,8 +2426,7 @@ class Service(service.RPCService, service.Service): def get_zone_transfer_request(self, context, zone_transfer_request_id): - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) # Get zone transfer request zone_transfer_request = self.storage.get_zone_transfer_request( @@ -2576,8 +2493,7 @@ class Service(service.RPCService, service.Service): @notification('dns.zone_transfer_accept.create') @transaction def create_zone_transfer_accept(self, context, zone_transfer_accept): - elevated_context = context.elevated() - elevated_context.all_tenants = True + elevated_context = context.elevated(all_tenants=True) zone_transfer_request = self.get_zone_transfer_request( context, zone_transfer_accept.zone_transfer_request_id) diff --git a/designate/context.py b/designate/context.py index a2204ef7..27097cfa 100644 --- a/designate/context.py +++ b/designate/context.py @@ -107,8 +107,11 @@ class DesignateContext(context.RequestContext): def from_dict(cls, values): return cls(**values) - def elevated(self, show_deleted=None): - """Return a version of this context with admin flag set.""" + def elevated(self, show_deleted=None, all_tenants=False, + edit_managed_records=False): + """Return a version of this context with admin flag set. + Optionally set all_tenants and edit_managed_records + """ context = self.deepcopy() context.is_admin = True @@ -118,6 +121,12 @@ class DesignateContext(context.RequestContext): if show_deleted is not None: context.show_deleted = show_deleted + if all_tenants: + context.all_tenants = True + + if edit_managed_records: + context.edit_managed_records = True + return context def sudo(self, tenant): diff --git a/designate/quota/impl_storage.py b/designate/quota/impl_storage.py index 735e881b..280940da 100644 --- a/designate/quota/impl_storage.py +++ b/designate/quota/impl_storage.py @@ -20,8 +20,8 @@ from oslo_log import log as logging from designate import exceptions from designate import storage from designate import objects -from designate.central import service as central_service from designate.quota.base import Quota +from designate.storage import transaction LOG = logging.getLogger(__name__) @@ -55,7 +55,7 @@ class StorageQuota(Quota): return {resource: quota['hard_limit']} - @central_service.transaction + @transaction def set_quota(self, context, tenant_id, resource, hard_limit): context = context.deepcopy() context.all_tenants = True @@ -87,7 +87,7 @@ class StorageQuota(Quota): return {resource: hard_limit} - @central_service.transaction + @transaction def reset_quotas(self, context, tenant_id): context = context.deepcopy() context.all_tenants = True diff --git a/designate/storage/__init__.py b/designate/storage/__init__.py index 827459c7..1c431217 100644 --- a/designate/storage/__init__.py +++ b/designate/storage/__init__.py @@ -13,11 +13,22 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from designate.storage.base import Storage + +import copy +import functools +import threading +import time from oslo_log import log as logging +from oslo_db import exception as db_exception +from oslo_utils import excutils + +from designate.storage.base import Storage +from designate.i18n import _LW + LOG = logging.getLogger(__name__) +RETRY_STATE = threading.local() def get_storage(storage_driver): @@ -25,3 +36,81 @@ def get_storage(storage_driver): cls = Storage.get_driver(storage_driver) return cls() + + +def _retry_on_deadlock(exc): + """Filter to trigger retry a when a Deadlock is received.""" + # TODO(kiall): This is a total leak of the SQLA Driver, we'll need a better + # way to handle this. + if isinstance(exc, db_exception.DBDeadlock): + LOG.warning(_LW("Deadlock detected. Retrying...")) + return True + return False + + +def retry(cb=None, retries=50, delay=150): + """A retry decorator that ignores attempts at creating nested retries""" + def outer(f): + @functools.wraps(f) + def retry_wrapper(self, *args, **kwargs): + if not hasattr(RETRY_STATE, 'held'): + # Create the state vars if necessary + RETRY_STATE.held = False + RETRY_STATE.retries = 0 + + if not RETRY_STATE.held: + # We're the outermost retry decorator + RETRY_STATE.held = True + + try: + while True: + try: + result = f(self, *copy.deepcopy(args), + **copy.deepcopy(kwargs)) + break + except Exception as exc: + RETRY_STATE.retries += 1 + if RETRY_STATE.retries >= retries: + # Exceeded retry attempts, raise. + raise + elif cb is not None and cb(exc) is False: + # We're not setup to retry on this exception. + raise + else: + # Retry, with a delay. + time.sleep(delay / float(1000)) + + finally: + RETRY_STATE.held = False + RETRY_STATE.retries = 0 + + else: + # We're an inner retry decorator, just pass on through. + result = f(self, *copy.deepcopy(args), **copy.deepcopy(kwargs)) + + return result + retry_wrapper.__wrapped_function = f + retry_wrapper.__wrapper_name = 'retry' + return retry_wrapper + return outer + + +def transaction(f): + """Transaction decorator, to be used on class instances with a + self.storage attribute + """ + @retry(cb=_retry_on_deadlock) + @functools.wraps(f) + def transaction_wrapper(self, *args, **kwargs): + self.storage.begin() + try: + result = f(self, *args, **kwargs) + self.storage.commit() + return result + except Exception: + with excutils.save_and_reraise_exception(): + self.storage.rollback() + + transaction_wrapper.__wrapped_function = f + transaction_wrapper.__wrapper_name = 'transaction' + return transaction_wrapper