Refactor central/service and others

Extract _get_pool_ns_records
Improve context.elevated()

Change-Id: I4ac3c1311a0fbda49c9d9619b286875bbd95f048
This commit is contained in:
Federico Ceratto 2016-04-08 18:23:56 +01:00
parent ff5bbe4c5c
commit 74aabd8f24
4 changed files with 168 additions and 154 deletions

View File

@ -32,9 +32,7 @@ from dns import exception as dnsexception
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_concurrency import lockutils
from oslo_db import exception as db_exception
from designate.i18n import _LI
from designate.i18n import _LC
@ -52,89 +50,13 @@ from designate import utils
from designate import storage
from designate.mdns import rpcapi as mdns_rpcapi
from designate.pool_manager import rpcapi as pool_manager_rpcapi
from designate.storage import transaction
from designate.zone_manager import rpcapi as zone_manager_rpcapi
LOG = logging.getLogger(__name__)
ZONE_LOCKS = threading.local()
NOTIFICATION_BUFFER = threading.local()
RETRY_STATE = threading.local()
def _retry_on_deadlock(exc):
"""Filter to trigger retry a when a Deadlock is received."""
# TODO(kiall): This is a total leak of the SQLA Driver, we'll need a better
# way to handle this.
if isinstance(exc, db_exception.DBDeadlock):
LOG.warning(_LW("Deadlock detected. Retrying..."))
return True
return False
def retry(cb=None, retries=50, delay=150):
"""A retry decorator that ignores attempts at creating nested retries"""
def outer(f):
@functools.wraps(f)
def retry_wrapper(self, *args, **kwargs):
if not hasattr(RETRY_STATE, 'held'):
# Create the state vars if necessary
RETRY_STATE.held = False
RETRY_STATE.retries = 0
if not RETRY_STATE.held:
# We're the outermost retry decorator
RETRY_STATE.held = True
try:
while True:
try:
result = f(self, *copy.deepcopy(args),
**copy.deepcopy(kwargs))
break
except Exception as exc:
RETRY_STATE.retries += 1
if RETRY_STATE.retries >= retries:
# Exceeded retry attempts, raise.
raise
elif cb is not None and cb(exc) is False:
# We're not setup to retry on this exception.
raise
else:
# Retry, with a delay.
time.sleep(delay / float(1000))
finally:
RETRY_STATE.held = False
RETRY_STATE.retries = 0
else:
# We're an inner retry decorator, just pass on through.
result = f(self, *copy.deepcopy(args), **copy.deepcopy(kwargs))
return result
retry_wrapper.__wrapped_function = f
retry_wrapper.__wrapper_name = 'retry'
return retry_wrapper
return outer
# TODO(kiall): Get this a better home :)
def transaction(f):
@retry(cb=_retry_on_deadlock)
@functools.wraps(f)
def transaction_wrapper(self, *args, **kwargs):
self.storage.begin()
try:
result = f(self, *args, **kwargs)
self.storage.commit()
return result
except Exception:
with excutils.save_and_reraise_exception():
self.storage.rollback()
transaction_wrapper.__wrapped_function = f
transaction_wrapper.__wrapper_name = 'transaction'
return transaction_wrapper
def synchronized_zone(zone_arg=1, new_zone=False):
@ -422,8 +344,7 @@ class Service(service.RPCService, service.Service):
criterion = criterion or {}
context = context.elevated()
context.all_tenants = True
context = context.elevated(all_tenants=True)
if zone.name == recordset_name:
return
@ -495,8 +416,7 @@ class Service(service.RPCService, service.Service):
Ensures the provided zone_name is the subzone
of an existing zone (checks across all tenants)
"""
context = context.elevated()
context.all_tenants = True
context = context.elevated(all_tenants=True)
# Break the name up into it's component labels
labels = zone_name.split(".")
@ -523,8 +443,7 @@ class Service(service.RPCService, service.Service):
Ensures the provided zone_name is the parent zone
of an existing subzone (checks across all tenants)
"""
context = context.elevated()
context.all_tenants = True
context = context.elevated(all_tenants=True)
# Create wildcard term to catch all subzones
search_term = "%%.%(name)s" % {"name": zone_name}
@ -571,14 +490,9 @@ class Service(service.RPCService, service.Service):
zone['minimum'])
def _create_soa(self, context, zone):
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, zone.pool_id)
soa_values = [self._build_soa_record(zone, pool.ns_records)]
soa_values = [self._build_soa_record(zone, pool_ns_records)]
recordlist = objects.RecordList(objects=[
objects.Record(data=r, managed=True) for r in soa_values])
values = {
@ -596,18 +510,14 @@ class Service(service.RPCService, service.Service):
if zone.type != 'PRIMARY':
return
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, zone.pool_id)
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
soa = self.find_recordset(context,
criterion={'zone_id': zone['id'],
'type': "SOA"})
soa.records[0].data = self._build_soa_record(zone, pool.ns_records)
soa.records[0].data = self._build_soa_record(zone, pool_ns_records)
self._update_recordset_in_storage(context, zone, soa,
increment_serial=False)
@ -891,14 +801,25 @@ class Service(service.RPCService, service.Service):
refresh_time = cfg.CONF.default_soa_refresh_min + dispersion
return int(refresh_time)
def _get_pool_ns_records(self, context, pool_id):
"""Get pool ns_records using an elevated context and all_tenants = True
:param pool_id: Pool ID
:returns: ns_records
"""
elevated_context = context.elevated(all_tenants=True)
pool = self.storage.get_pool(elevated_context, pool_id)
return pool.ns_records
@notification('dns.domain.create')
@notification('dns.zone.create')
@synchronized_zone(new_zone=True)
def create_zone(self, context, zone):
# TODO(kiall): Refactor this method into *MUCH* smaller chunks.
"""Create zone: perform checks and then call _create_zone()
"""
# Default to creating in the current users tenant
if zone.tenant_id is None:
zone.tenant_id = context.tenant
zone.tenant_id = zone.tenant_id or context.tenant
target = {
'tenant_id': zone.tenant_id,
@ -934,29 +855,34 @@ class Service(service.RPCService, service.Service):
# Handle super-zones appropriately
subzones = self._is_superzone(context, zone.name, zone.pool_id)
msg = 'Unable to create zone because another tenant owns a ' \
'subzone of the zone'
if subzones:
LOG.debug("Zone '{0}' is a superzone.".format(zone.name))
for subzone in subzones:
if subzone.tenant_id != zone.tenant_id:
raise exceptions.IllegalParentZone('Unable to create '
'zone because another '
'tenant owns a subzone '
'of the zone')
raise exceptions.IllegalParentZone(msg)
# If this succeeds, subzone parent IDs will be updated
# after zone is created
# NOTE(kiall): Fetch the servers before creating the zone, this way
# we can prevent zone creation if no servers are
# configured.
elevated_context = context.elevated()
elevated_context.all_tenants = True
pool = self.storage.get_pool(elevated_context, zone.pool_id)
if len(pool.ns_records) == 0:
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
if len(pool_ns_records) == 0:
LOG.critical(_LC('No nameservers configured. '
'Please create at least one nameserver'))
raise exceptions.NoServersConfigured()
# End of pre-flight checks, create zone
return self._create_zone(context, zone, subzones)
def _create_zone(self, context, zone, subzones):
"""Create zone straight away
"""
if zone.type == 'SECONDARY' and zone.serial is None:
zone.serial = 1
@ -1036,8 +962,7 @@ class Service(service.RPCService, service.Service):
policy.check('get_zone_ns_records', context, target)
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, pool_id)
@ -1064,7 +989,10 @@ class Service(service.RPCService, service.Service):
@notification('dns.zone.update')
@synchronized_zone()
def update_zone(self, context, zone, increment_serial=True):
# TODO(kiall): Refactor this method into *MUCH* smaller chunks.
"""Update zone. Perform checks and then call _update_zone()
:returns: updated zone
"""
target = {
'zone_id': zone.obj_get_original_value('id'),
'zone_name': zone.obj_get_original_value('name'),
@ -1091,6 +1019,11 @@ class Service(service.RPCService, service.Service):
if ttl is not None:
self._is_valid_ttl(context, ttl)
return self._update_zone(context, zone, increment_serial, changes)
def _update_zone(self, context, zone, increment_serial, changes):
"""Update zone
"""
zone = self._update_zone_in_storage(
context, zone, increment_serial=increment_serial)
@ -1874,10 +1807,8 @@ class Service(service.RPCService, service.Service):
"""
tenant_id = tenant_id or context.tenant
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context.edit_managed_records = True
elevated_context = context.elevated(all_tenants=True,
edit_managed_records=True)
criterion = {
'managed': True,
'managed_resource_type': 'ptr:floatingip',
@ -1915,10 +1846,8 @@ class Service(service.RPCService, service.Service):
"""
Utility method to delete a list of records.
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context.edit_managed_records = True
elevated_context = context.elevated(all_tenants=True,
edit_managed_records=True)
if len(records) > 0:
for r in records:
msg = 'Deleting record %s for FIP %s'
@ -1931,8 +1860,7 @@ class Service(service.RPCService, service.Service):
Given a list of FloatingIP and Record tuples we look through creating
a new dict of FloatingIPs
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
fips = objects.FloatingIPList()
for key, value in data.items():
@ -2006,9 +1934,8 @@ class Service(service.RPCService, service.Service):
token pr Neutron in the SC
B) We lookup FIPs using the configured values for this deployment.
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context.edit_managed_records = True
elevated_context = context.elevated(all_tenants=True,
edit_managed_records=True)
tenant_fips = self._list_floatingips(context)
@ -2023,8 +1950,7 @@ class Service(service.RPCService, service.Service):
"""
Get Floating IP PTR
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
tenant_fips = self._list_floatingips(context, region=region)
@ -2042,9 +1968,8 @@ class Service(service.RPCService, service.Service):
Set the FloatingIP's PTR record based on values.
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context.edit_managed_records = True
elevated_context = context.elevated(all_tenants=True,
edit_managed_records=True)
tenant_fips = self._list_floatingips(context, region=region)
@ -2143,10 +2068,8 @@ class Service(service.RPCService, service.Service):
We find the record based on the criteria and delete it or raise.
"""
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context.edit_managed_records = True
elevated_context = context.elevated(all_tenants=True,
edit_managed_records=True)
criterion = {
'managed_resource_id': floatingip_id,
'managed_tenant_id': context.tenant
@ -2277,15 +2200,14 @@ class Service(service.RPCService, service.Service):
# Since these are treated as mutable objects, we're only going to
# be comparing the nameserver.value which is the FQDN
if pool.obj_attr_is_set('ns_records'):
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
# TODO(kiall): ListObjects should be able to give you their
# original set of values.
original_pool = self.storage.get_pool(elevated_context, pool.id)
original_pool_ns_records = self._get_pool_ns_records(context,
pool.id)
# Find the current NS hostnames
existing_ns = set([n.hostname for n in original_pool.ns_records])
existing_ns = set([n.hostname for n in original_pool_ns_records])
# Find the desired NS hostnames
request_ns = set([n.hostname for n in pool.ns_records])
@ -2331,8 +2253,7 @@ class Service(service.RPCService, service.Service):
policy.check('delete_pool', context)
# Make sure that there are no existing zones in the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
zones = self.find_zones(
context=elevated_context,
criterion={'pool_id': pool_id, 'action': '!DELETE'})
@ -2480,9 +2401,6 @@ class Service(service.RPCService, service.Service):
@transaction
def create_zone_transfer_request(self, context, zone_transfer_request):
elevated_context = context.elevated()
elevated_context.all_tenants = True
# get zone
zone = self.get_zone(context, zone_transfer_request.zone_id)
@ -2508,8 +2426,7 @@ class Service(service.RPCService, service.Service):
def get_zone_transfer_request(self, context, zone_transfer_request_id):
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
# Get zone transfer request
zone_transfer_request = self.storage.get_zone_transfer_request(
@ -2576,8 +2493,7 @@ class Service(service.RPCService, service.Service):
@notification('dns.zone_transfer_accept.create')
@transaction
def create_zone_transfer_accept(self, context, zone_transfer_accept):
elevated_context = context.elevated()
elevated_context.all_tenants = True
elevated_context = context.elevated(all_tenants=True)
zone_transfer_request = self.get_zone_transfer_request(
context, zone_transfer_accept.zone_transfer_request_id)

View File

@ -107,8 +107,11 @@ class DesignateContext(context.RequestContext):
def from_dict(cls, values):
return cls(**values)
def elevated(self, show_deleted=None):
"""Return a version of this context with admin flag set."""
def elevated(self, show_deleted=None, all_tenants=False,
edit_managed_records=False):
"""Return a version of this context with admin flag set.
Optionally set all_tenants and edit_managed_records
"""
context = self.deepcopy()
context.is_admin = True
@ -118,6 +121,12 @@ class DesignateContext(context.RequestContext):
if show_deleted is not None:
context.show_deleted = show_deleted
if all_tenants:
context.all_tenants = True
if edit_managed_records:
context.edit_managed_records = True
return context
def sudo(self, tenant):

View File

@ -20,8 +20,8 @@ from oslo_log import log as logging
from designate import exceptions
from designate import storage
from designate import objects
from designate.central import service as central_service
from designate.quota.base import Quota
from designate.storage import transaction
LOG = logging.getLogger(__name__)
@ -55,7 +55,7 @@ class StorageQuota(Quota):
return {resource: quota['hard_limit']}
@central_service.transaction
@transaction
def set_quota(self, context, tenant_id, resource, hard_limit):
context = context.deepcopy()
context.all_tenants = True
@ -87,7 +87,7 @@ class StorageQuota(Quota):
return {resource: hard_limit}
@central_service.transaction
@transaction
def reset_quotas(self, context, tenant_id):
context = context.deepcopy()
context.all_tenants = True

View File

@ -13,11 +13,22 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.storage.base import Storage
import copy
import functools
import threading
import time
from oslo_log import log as logging
from oslo_db import exception as db_exception
from oslo_utils import excutils
from designate.storage.base import Storage
from designate.i18n import _LW
LOG = logging.getLogger(__name__)
RETRY_STATE = threading.local()
def get_storage(storage_driver):
@ -25,3 +36,81 @@ def get_storage(storage_driver):
cls = Storage.get_driver(storage_driver)
return cls()
def _retry_on_deadlock(exc):
"""Filter to trigger retry a when a Deadlock is received."""
# TODO(kiall): This is a total leak of the SQLA Driver, we'll need a better
# way to handle this.
if isinstance(exc, db_exception.DBDeadlock):
LOG.warning(_LW("Deadlock detected. Retrying..."))
return True
return False
def retry(cb=None, retries=50, delay=150):
"""A retry decorator that ignores attempts at creating nested retries"""
def outer(f):
@functools.wraps(f)
def retry_wrapper(self, *args, **kwargs):
if not hasattr(RETRY_STATE, 'held'):
# Create the state vars if necessary
RETRY_STATE.held = False
RETRY_STATE.retries = 0
if not RETRY_STATE.held:
# We're the outermost retry decorator
RETRY_STATE.held = True
try:
while True:
try:
result = f(self, *copy.deepcopy(args),
**copy.deepcopy(kwargs))
break
except Exception as exc:
RETRY_STATE.retries += 1
if RETRY_STATE.retries >= retries:
# Exceeded retry attempts, raise.
raise
elif cb is not None and cb(exc) is False:
# We're not setup to retry on this exception.
raise
else:
# Retry, with a delay.
time.sleep(delay / float(1000))
finally:
RETRY_STATE.held = False
RETRY_STATE.retries = 0
else:
# We're an inner retry decorator, just pass on through.
result = f(self, *copy.deepcopy(args), **copy.deepcopy(kwargs))
return result
retry_wrapper.__wrapped_function = f
retry_wrapper.__wrapper_name = 'retry'
return retry_wrapper
return outer
def transaction(f):
"""Transaction decorator, to be used on class instances with a
self.storage attribute
"""
@retry(cb=_retry_on_deadlock)
@functools.wraps(f)
def transaction_wrapper(self, *args, **kwargs):
self.storage.begin()
try:
result = f(self, *args, **kwargs)
self.storage.commit()
return result
except Exception:
with excutils.save_and_reraise_exception():
self.storage.rollback()
transaction_wrapper.__wrapped_function = f
transaction_wrapper.__wrapper_name = 'transaction'
return transaction_wrapper