Move Central notifications to a decorator

We now emit the notifications outside of the database transaction,
which removes one place where eventlet may choose to context
switch.

Change-Id: I95bf89d0a0605e63c29380961df483686ffb3092
Partial-Bug: 1392762
This commit is contained in:
Kiall Mac Innes
2014-11-15 14:03:27 +00:00
parent 768ee18830
commit 2c781c694e
2 changed files with 81 additions and 54 deletions

View File

@@ -16,6 +16,7 @@
# under the License.
import re
import contextlib
import collections
import functools
import threading
import itertools
@@ -30,6 +31,7 @@ from designate.i18n import _LI
from designate.i18n import _LC
from designate import backend
from designate import central
from designate import context as dcontext
from designate import exceptions
from designate import network_api
from designate import objects
@@ -42,6 +44,7 @@ from designate import storage
LOG = logging.getLogger(__name__)
DOMAIN_LOCKS = threading.local()
NOTIFICATON_BUFFER = threading.local()
@contextlib.contextmanager
@@ -146,6 +149,52 @@ def synchronized_domain(domain_arg=1, new_domain=False):
return outer
def notification(notification_type):
def outer(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
if not hasattr(NOTIFICATON_BUFFER, 'queue'):
# Create the notifications queue if necessary
NOTIFICATON_BUFFER.stack = 0
NOTIFICATON_BUFFER.queue = collections.deque()
NOTIFICATON_BUFFER.stack += 1
try:
# Find the context argument
context = dcontext.DesignateContext.\
get_context_from_function_and_args(f, args, kwargs)
# Call the wrapped function
result = f(self, *args, **kwargs)
# Enqueue the notification
LOG.debug('Queueing notification for %(type)s ',
{'type': notification_type})
NOTIFICATON_BUFFER.queue.appendleft(
(context, notification_type, result,))
return result
finally:
NOTIFICATON_BUFFER.stack -= 1
if NOTIFICATON_BUFFER.stack == 0:
LOG.debug('Emitting %(count)d notifications',
{'count': len(NOTIFICATON_BUFFER.queue)})
# Send the queued notifications, in order.
for value in NOTIFICATON_BUFFER.queue:
LOG.debug('Emitting %(type)s notification',
{'type': value[1]})
self.notifier.info(value[0], value[1], value[2])
# Reset the queue
NOTIFICATON_BUFFER.queue.clear()
return wrapper
return outer
class Service(service.RPCService):
RPC_API_VERSION = '4.2'
@@ -555,6 +604,7 @@ class Service(service.RPCService):
self.quota.reset_quotas(context, tenant_id)
# Server Methods
@notification('dns.server.create')
@transaction
def create_server(self, context, server):
policy.check('create_server', context)
@@ -565,11 +615,10 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.create_server(context, created_server)
self.notifier.info(context, 'dns.server.create', created_server)
# Update NS recordsets for all zones
elevated_context = context.elevated()
elevated_context.all_tenants = True
zones = self.find_domains(elevated_context)
# Create a new NS recordset for for every zone
for z in zones:
@@ -589,6 +638,7 @@ class Service(service.RPCService):
return self.storage.get_server(context, server_id)
@notification('dns.server.update')
@transaction
def update_server(self, context, server):
target = {
@@ -604,8 +654,6 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.update_server(context, server)
self.notifier.info(context, 'dns.server.update', server)
# Update NS recordsets for all zones
elevated_context = context.elevated()
elevated_context.all_tenants = True
@@ -616,6 +664,7 @@ class Service(service.RPCService):
return server
@notification('dns.server.delete')
@transaction
def delete_server(self, context, server_id):
policy.check('delete_server', context, {'server_id': server_id})
@@ -639,9 +688,10 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.delete_server(context, server)
self.notifier.info(context, 'dns.server.delete', server)
return server
# TLD Methods
@notification('dns.tld.create')
@transaction
def create_tld(self, context, tld):
policy.check('create_tld', context)
@@ -649,8 +699,6 @@ class Service(service.RPCService):
# The TLD is only created on central's storage and not on the backend.
created_tld = self.storage.create_tld(context, tld)
self.notifier.info(context, 'dns.tld.create', created_tld)
# Set check for tlds to be true
self.check_for_tlds = True
return created_tld
@@ -667,6 +715,7 @@ class Service(service.RPCService):
return self.storage.get_tld(context, tld_id)
@notification('dns.tld.update')
@transaction
def update_tld(self, context, tld):
target = {
@@ -676,10 +725,9 @@ class Service(service.RPCService):
tld = self.storage.update_tld(context, tld)
self.notifier.info(context, 'dns.tld.update', tld)
return tld
@notification('dns.tld.delete')
@transaction
def delete_tld(self, context, tld_id):
# Known issue - self.check_for_tld is not reset here. So if the last
@@ -691,9 +739,10 @@ class Service(service.RPCService):
tld = self.storage.delete_tld(context, tld_id)
self.notifier.info(context, 'dns.tld.delete', tld)
return tld
# TSIG Key Methods
@notification('dns.tsigkey.create')
@transaction
def create_tsigkey(self, context, tsigkey):
policy.check('create_tsigkey', context)
@@ -703,8 +752,6 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.create_tsigkey(context, created_tsigkey)
self.notifier.info(context, 'dns.tsigkey.create', created_tsigkey)
return created_tsigkey
def find_tsigkeys(self, context, criterion=None, marker=None, limit=None,
@@ -719,6 +766,7 @@ class Service(service.RPCService):
return self.storage.get_tsigkey(context, tsigkey_id)
@notification('dns.tsigkey.update')
@transaction
def update_tsigkey(self, context, tsigkey):
target = {
@@ -731,10 +779,9 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.update_tsigkey(context, tsigkey)
self.notifier.info(context, 'dns.tsigkey.update', tsigkey)
return tsigkey
@notification('dns.tsigkey.delete')
@transaction
def delete_tsigkey(self, context, tsigkey_id):
policy.check('delete_tsigkey', context, {'tsigkey_id': tsigkey_id})
@@ -744,7 +791,7 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.delete_tsigkey(context, tsigkey)
self.notifier.info(context, 'dns.tsigkey.delete', tsigkey)
return tsigkey
# Tenant Methods
def find_tenants(self, context):
@@ -765,6 +812,7 @@ class Service(service.RPCService):
return self.storage.count_tenants(context)
# Domain Methods
@notification('dns.domain.create')
@synchronized_domain(new_domain=True)
@transaction
def create_domain(self, context, domain):
@@ -848,8 +896,6 @@ class Service(service.RPCService):
self.create_recordset(context, created_domain['id'], rrset,
increment_serial=False)
self.notifier.info(context, 'dns.domain.create', created_domain)
# If domain is a superdomain, update subdomains
# with new parent IDs
for subdomain in subdomains:
@@ -911,6 +957,7 @@ class Service(service.RPCService):
return self.storage.find_domain(context, criterion)
@notification('dns.domain.update')
@synchronized_domain()
@transaction
def update_domain(self, context, domain, increment_serial=True):
@@ -954,8 +1001,6 @@ class Service(service.RPCService):
# Update the SOA Record
self._update_soa(context, domain)
self.notifier.info(context, 'dns.domain.update', domain)
# TODO(vinod): Remove the following call to mdns once pool manager
# calls mdns.
self.mdns_api.notify_zone_changed(context, domain, None, None, None,
@@ -963,6 +1008,7 @@ class Service(service.RPCService):
return domain
@notification('dns.domain.delete')
@synchronized_domain()
@transaction
def delete_domain(self, context, domain_id):
@@ -988,8 +1034,6 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.delete_domain(context, domain)
self.notifier.info(context, 'dns.domain.delete', domain)
return domain
def count_domains(self, context, criterion=None):
@@ -1024,6 +1068,7 @@ class Service(service.RPCService):
return reports
@notification('dns.domain.touch')
@synchronized_domain()
@transaction
def touch_domain(self, context, domain_id):
@@ -1039,11 +1084,10 @@ class Service(service.RPCService):
domain = self._increment_domain_serial(context, domain_id)
self.notifier.info(context, 'dns.domain.touch', domain)
return domain
# RecordSet Methods
@notification('dns.recordset.create')
@synchronized_domain()
@transaction
def create_recordset(self, context, domain_id, recordset,
@@ -1080,9 +1124,6 @@ class Service(service.RPCService):
with wrap_backend_call():
self.backend.create_recordset(context, domain, created_recordset)
# Send RecordSet creation notification
self.notifier.info(context, 'dns.recordset.create', created_recordset)
# Only increment the serial # if records exist and
# increment_serial = True
if increment_serial:
@@ -1130,6 +1171,7 @@ class Service(service.RPCService):
return recordset
@notification('dns.recordset.update')
@synchronized_domain()
@transaction
def update_recordset(self, context, recordset, increment_serial=True):
@@ -1181,11 +1223,9 @@ class Service(service.RPCService):
if increment_serial:
self._increment_domain_serial(context, domain.id)
# Send RecordSet update notification
self.notifier.info(context, 'dns.recordset.update', recordset)
return recordset
@notification('dns.recordset.create')
@synchronized_domain()
@transaction
def delete_recordset(self, context, domain_id, recordset_id,
@@ -1214,9 +1254,6 @@ class Service(service.RPCService):
if increment_serial:
self._increment_domain_serial(context, domain_id)
# Send Record deletion notification
self.notifier.info(context, 'dns.recordset.delete', recordset)
return recordset
def count_recordsets(self, context, criterion=None):
@@ -1232,6 +1269,7 @@ class Service(service.RPCService):
return self.storage.count_recordsets(context, criterion)
# Record Methods
@notification('dns.record.create')
@synchronized_domain()
@transaction
def create_record(self, context, domain_id, recordset_id, record,
@@ -1268,9 +1306,6 @@ class Service(service.RPCService):
if increment_serial:
self._increment_domain_serial(context, domain_id)
# Send Record creation notification
self.notifier.info(context, 'dns.record.create', created_record)
return created_record
def get_record(self, context, domain_id, recordset_id, record_id):
@@ -1313,6 +1348,7 @@ class Service(service.RPCService):
return self.storage.find_record(context, criterion)
@notification('dns.record.update')
@synchronized_domain()
@transaction
def update_record(self, context, record, increment_serial=True):
@@ -1357,11 +1393,9 @@ class Service(service.RPCService):
if increment_serial:
self._increment_domain_serial(context, domain.id)
# Send Record update notification
self.notifier.info(context, 'dns.record.update', record)
return record
@notification('dns.record.delete')
@synchronized_domain()
@transaction
def delete_record(self, context, domain_id, recordset_id, record_id,
@@ -1397,9 +1431,6 @@ class Service(service.RPCService):
if increment_serial:
self._increment_domain_serial(context, domain_id)
# Send Record deletion notification
self.notifier.info(context, 'dns.record.delete', record)
return record
def count_records(self, context, criterion=None):
@@ -1801,14 +1832,13 @@ class Service(service.RPCService):
context, region, floatingip_id, values)
# Blacklisted Domains
@notification('dns.blacklist.create')
@transaction
def create_blacklist(self, context, blacklist):
policy.check('create_blacklist', context)
created_blacklist = self.storage.create_blacklist(context, blacklist)
self.notifier.info(context, 'dns.blacklist.create', created_blacklist)
return created_blacklist
def get_blacklist(self, context, blacklist_id):
@@ -1835,6 +1865,7 @@ class Service(service.RPCService):
return blacklist
@notification('dns.blacklist.update')
@transaction
def update_blacklist(self, context, blacklist):
target = {
@@ -1844,19 +1875,19 @@ class Service(service.RPCService):
blacklist = self.storage.update_blacklist(context, blacklist)
self.notifier.info(context, 'dns.blacklist.update', blacklist)
return blacklist
@notification('dns.blacklist.delete')
@transaction
def delete_blacklist(self, context, blacklist_id):
policy.check('delete_blacklist', context)
blacklist = self.storage.delete_blacklist(context, blacklist_id)
self.notifier.info(context, 'dns.blacklist.delete', blacklist)
return blacklist
# Server Pools
@notification('dns.pool.create')
@transaction
def create_pool(self, context, pool):
# Verify that there is a tenant_id
@@ -1867,8 +1898,6 @@ class Service(service.RPCService):
created_pool = self.storage.create_pool(context, pool)
self.notifier.info(context, 'dns.pool.create', created_pool)
return created_pool
def find_pools(self, context, criterion=None, marker=None, limit=None,
@@ -1891,6 +1920,7 @@ class Service(service.RPCService):
return self.storage.get_pool(context, pool_id)
@notification('dns.pool.update')
@transaction
def update_pool(self, context, pool):
@@ -1898,10 +1928,9 @@ class Service(service.RPCService):
updated_pool = self.storage.update_pool(context, pool)
self.notifier.info(context, 'dns.pool.update', updated_pool)
return updated_pool
@notification('dns.pool.delete')
@transaction
def delete_pool(self, context, pool_id):
@@ -1909,7 +1938,7 @@ class Service(service.RPCService):
pool = self.storage.delete_pool(context, pool_id)
self.notifier.info(context, 'dns.pool.delete', pool)
return pool
# Pool Manager Integration
def update_status(self, context, domain_id, status, serial):

View File

@@ -479,7 +479,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(len(notifications), 3)
# Ensure the notification wrapper contains the correct info
ctxt, message, priority, retry = notifications[0]
self.assertEqual(message['event_type'], 'dns.domain.create')
@@ -857,8 +856,7 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(len(notifications), 2)
# Ensure the notification wrapper contains the correct info
# The SOA is the first notification, so test the second one
ctxt, message, priority, retry = notifications[1]
ctxt, message, priority, retry = notifications[0]
self.assertEqual(message['event_type'], 'dns.domain.update')
self.assertEqual(message['priority'], 'INFO')