Make notifications pluggable

- Defines a plugin interface for what's actually emitted as part
  of designate "notifications".
- The default plugin emits the same thing as notifications did prior
  to this patch.
- The "audit" notification plugin emits recordset data changes and
  zone/recordset names, if they exist, the notifications with this
  plugin look like http://paste.openstack.org/show/545210/
- Adds support for multiple notifications for a single change
- Also adds client IP to the context object, as it's a field that
  may be of interest to some types of notifications
- Many tests

Change-Id: I01118fae8ce6e38ccc61b0ce763fd759affd9a86
This commit is contained in:
Tim Simmons 2016-07-25 17:45:08 +01:00
parent 8966e7166b
commit 93c0161242
8 changed files with 772 additions and 7 deletions

39
contrib/consume.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright 2016 Rackspace, Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
This dumb script allows you to see what's being dumped onto
the notifications.info queue
nabbed from:
https://pika.readthedocs.io/en/latest/examples/blocking_consume.html
"""
import pika
def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume(on_message, 'notifications.info')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()

View File

@ -107,6 +107,10 @@ class ContextMiddleware(base.Middleware):
strutils.bool_from_string(
request.headers.get('X-Designate-Edit-Managed-Records'))
def _extract_client_addr(self, ctxt, request):
if hasattr(request, 'client_addr'):
ctxt.client_addr = request.client_addr
def make_context(self, request, *args, **kwargs):
req_id = request.environ.get(request_id.ENV_REQUEST_ID)
kwargs.setdefault('request_id', req_id)
@ -118,6 +122,7 @@ class ContextMiddleware(base.Middleware):
self._extract_all_projects(ctxt, request)
self._extract_edit_managed_records(ctxt, request)
self._extract_dns_hide_counts(ctxt, request)
self._extract_client_addr(ctxt, request)
finally:
request.environ['context'] = ctxt
return ctxt

View File

@ -42,6 +42,7 @@ from designate import context as dcontext
from designate import exceptions
from designate import dnsutils
from designate import network_api
from designate import notifications
from designate import objects
from designate import policy
from designate import quota
@ -157,11 +158,17 @@ def notification(notification_type):
# Call the wrapped function
result = f(self, *args, **kwargs)
# Feed the args/result to a notification plugin
# to determine what is emitted
payloads = notifications.get_plugin().emit(
notification_type, context, result, args, kwargs)
# Enqueue the notification
LOG.debug('Queueing notification for %(type)s ',
{'type': notification_type})
NOTIFICATION_BUFFER.queue.appendleft(
(context, notification_type, result,))
for payload in payloads:
LOG.debug('Queueing notification for %(type)s ',
{'type': notification_type})
NOTIFICATION_BUFFER.queue.appendleft(
(context, notification_type, payload,))
return result

View File

@ -32,10 +32,12 @@ class DesignateContext(context.RequestContext):
_abandon = None
original_tenant = None
_edit_managed_records = False
_client_addr = None
def __init__(self, service_catalog=None, all_tenants=False, abandon=None,
tsigkey_id=None, user_identity=None, original_tenant=None,
edit_managed_records=False, hide_counts=False, **kwargs):
edit_managed_records=False, hide_counts=False,
client_addr=None, **kwargs):
# NOTE: user_identity may be passed in, but will be silently dropped as
# it is a generated field based on several others.
@ -50,6 +52,7 @@ class DesignateContext(context.RequestContext):
self.abandon = abandon
self.edit_managed_records = edit_managed_records
self.hide_counts = hide_counts
self.client_addr = client_addr
def deepcopy(self):
d = self.to_dict()
@ -83,7 +86,8 @@ class DesignateContext(context.RequestContext):
'abandon': self.abandon,
'edit_managed_records': self.edit_managed_records,
'tsigkey_id': self.tsigkey_id,
'hide_counts': self.hide_counts
'hide_counts': self.hide_counts,
'client_addr': self.client_addr,
})
return copy.deepcopy(d)
@ -184,6 +188,14 @@ class DesignateContext(context.RequestContext):
policy.check('edit_managed_records', self)
self._edit_managed_records = value
@property
def client_addr(self):
return self._client_addr
@client_addr.setter
def client_addr(self, value):
self._client_addr = value
def get_current():
return context.get_current()

View File

@ -15,21 +15,28 @@
# under the License.
#
# Copied: nova.notifications
import abc
from oslo_config import cfg
from oslo_log import log as logging
from designate.i18n import _LW
from designate.plugin import DriverPlugin
from designate import objects
from designate import rpc
LOG = logging.getLogger(__name__)
notify_opts = [
cfg.BoolOpt('notify_api_faults', default=False,
help='Send notifications if there\'s a failure in the API.')
help='Send notifications if there\'s a failure in the API.'),
cfg.StrOpt('notification-plugin', default='default',
help='The notification plugin to use'),
]
CONF = cfg.CONF
CONF.register_opts(notify_opts)
NOTIFICATION_PLUGIN = None
def send_api_fault(context, url, status, exception):
@ -41,3 +48,176 @@ def send_api_fault(context, url, status, exception):
payload = {'url': url, 'exception': str(exception), 'status': status}
rpc.get_notifier('api').error(context, 'dns.api.fault', payload)
def init_notification_plugin():
LOG.debug("Loading notification plugin: %s" % cfg.CONF.notification_plugin)
cls = NotificationPlugin.get_driver(cfg.CONF.notification_plugin)
global NOTIFICATION_PLUGIN
NOTIFICATION_PLUGIN = cls()
def get_plugin():
if NOTIFICATION_PLUGIN is None:
init_notification_plugin()
return NOTIFICATION_PLUGIN
class NotificationPlugin(DriverPlugin):
"""Base class for Notification Driver implementations"""
__plugin_type__ = 'notification'
__plugin_ns__ = 'designate.notification.plugin'
def __init__(self):
super(NotificationPlugin, self).__init__()
@abc.abstractmethod
def emit(self, notification_type, context, result, *args, **kwargs):
"""Return a payload to emit as part of the notification"""
class Default(NotificationPlugin):
"""Returns the result, as implemented in the base class"""
__plugin_name__ = 'default'
def emit(self, notification_type, context, result, *args, **kwargs):
"""Return the result of the function called"""
return [result]
class Audit(NotificationPlugin):
"""Grabs Zone/Recordset names and RRData changes"""
__plugin_name__ = 'audit'
def zone_name(self, arglist, result):
for arg in arglist + [result]:
if isinstance(arg, objects.Zone):
if arg.name is not None:
return arg.name
if hasattr(arg, 'zone_name'):
if arg.zone_name is not None:
return arg.zone_name
return None
def zone_id(self, arglist, result):
for arg in arglist + [result]:
if isinstance(arg, objects.Zone):
if arg.id is not None:
return arg.id
if hasattr(arg, 'zone_id'):
if arg.zone_id is not None:
return arg.zone_id
return None
def recordset_name(self, arglist, result):
for arg in arglist + [result]:
if isinstance(arg, objects.RecordSet):
if arg.name is not None:
return arg.name
return None
def recordset_data(self, arglist, result):
if not isinstance(result, objects.RecordSet):
return []
for arg in arglist:
if isinstance(arg, objects.RecordSet):
if 'records' not in arg.obj_what_changed():
return []
original_rrs = arg.obj_get_original_value('records')
old_value = ' '.join(
[obj['data'] for obj in original_rrs])
new_value = ' '.join(
[rr.data for rr in result.records])
if old_value == new_value:
return []
return [{
'change': 'records',
'old_value': old_value,
'new_value': new_value,
}]
return []
def other_data(self, arglist, result):
changes = []
for arg in arglist:
if isinstance(arg, objects.DesignateObject):
for change in arg.obj_what_changed():
if change != 'records':
old_value = arg.obj_get_original_value(change)
new_value = getattr(arg, change)
# Just in case something odd makes it here
if any(type(val) not in
(int, float, bool, str, type(None))
for val in (old_value, new_value)):
old_value, new_value = None, None
msg = _LW("Nulling notification values after "
"unexpected values %s")
LOG.warning(msg, (old_value, new_value))
if old_value == new_value:
continue
changes.append({
'change': change,
'old_value': str(old_value),
'new_value': str(new_value),
})
return changes
def blank_event(self):
return [{
'change': None,
'old_value': None,
'new_value': None,
}]
def gather_changes(self, arglist, result, notification_type):
changes = []
if 'update' in notification_type:
changes.extend(self.other_data(arglist, result))
if notification_type == 'dns.recordset.update':
changes.extend(self.recordset_data(arglist, result))
elif 'create' in notification_type:
if notification_type == 'dns.recordset.create':
changes.extend(self.recordset_data(arglist, result))
else:
changes.extend(self.blank_event())
else:
changes.extend(self.blank_event())
return changes
def emit(self, notification_type, context, result, *args, **kwargs):
arglist = []
for item in args:
if isinstance(item, tuple) or isinstance(item, list):
arglist.extend(item)
if isinstance(item, dict):
arglist.extend(list(item.values()))
payloads = []
for change in self.gather_changes(arglist, result, notification_type):
payloads.append({
'zone_name': self.zone_name(arglist, result),
'zone_id': self.zone_id(arglist, result),
'recordset_name': self.recordset_name(arglist, result),
'old_data': change['old_value'],
'new_data': change['new_value'],
'changed_field': change['change'],
})
return payloads

View File

@ -0,0 +1,498 @@
# Copyright 2016 Rackspace
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from oslo_log import log as logging
from designate import objects
from designate import notifications
LOG = logging.getLogger(__name__)
class DefaultNotificationTest(unittest.TestCase):
def setUp(self):
self.driver = notifications.Default()
self.context = mock.Mock()
def test_default_notifications(self):
result = 'result'
event = 'dns.zone.create'
args = ('foo', 'bar',)
kwargs = {'wumbo': 'mumbo'}
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, ['result'])
class AuditNotificationTest(unittest.TestCase):
def setUp(self):
self.driver = notifications.Audit()
self.context = mock.Mock()
self.maxDiff = None
#
# Zone changes
#
def test_audit_zone_name(self):
zone = objects.Zone(
name='example.com.',
type='PRIMARY',
)
result = zone
event = 'dns.zone.create'
args = (zone,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': None,
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_zone_id(self):
zone = objects.Zone(
id='123',
name='example.com.',
type='PRIMARY',
)
result = zone
event = 'dns.zone.create'
args = (zone,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_zone_update(self):
zone = objects.Zone(
id='123',
name='example.com.',
type='PRIMARY',
ttl=1
)
zone.ttl = 300
result = zone
event = 'dns.zone.update'
args = (zone,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'ttl',
'new_data': '300',
'old_data': '1',
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_zone_delete(self):
zone = objects.Zone(
id='123',
name='example.com.',
type='PRIMARY',
ttl=1
)
result = zone
event = 'dns.zone.delete'
args = ('123',)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
#
# Recordset Changes
#
def test_audit_rrset_name(self):
rrset = objects.RecordSet(
name='foo.example.com.',
type='PRIMARY',
records=objects.RecordList.from_list([])
)
rrset.records = objects.RecordList.from_list(
[{'data': '192.168.1.1'}])
result = rrset
event = 'dns.recordset.create'
args = (rrset,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'records',
'new_data': '192.168.1.1',
'old_data': '',
'recordset_name': 'foo.example.com.',
'zone_id': None,
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_rrset_create(self):
rrset = objects.RecordSet(
name='foo.example.com.',
type='PRIMARY',
records=[],
zone_id='123',
zone_name='example.com.'
)
rrset.records = objects.RecordList.from_list(
[{'data': '192.168.1.1'}])
result = rrset
event = 'dns.recordset.create'
args = (rrset,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'records',
'new_data': '192.168.1.1',
'old_data': '',
'recordset_name': 'foo.example.com.',
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_rrset_update_records(self):
rrset = objects.RecordSet(
name='foo.example.com.',
type='PRIMARY',
records=objects.RecordList.from_list(
[{'data': '192.168.1.1'}]),
zone_id='123',
zone_name='example.com.'
)
rrset.records = objects.RecordList.from_list(
[{'data': '192.168.1.2'}])
result = rrset
event = 'dns.recordset.update'
args = (rrset,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'records',
'new_data': '192.168.1.2',
'old_data': '192.168.1.1',
'recordset_name': 'foo.example.com.',
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_rrset_update_other(self):
rrset = objects.RecordSet(
name='foo.example.com.',
type='PRIMARY',
records=objects.RecordList.from_list(
[{'data': '192.168.1.1'}]),
zone_id='123',
zone_name='example.com.',
ttl=300
)
rrset.ttl = 400
result = rrset
event = 'dns.recordset.update'
args = (rrset,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'ttl',
'new_data': '400',
'old_data': '300',
'recordset_name': 'foo.example.com.',
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_rrset_delete(self):
rrset = objects.RecordSet(
name='foo.example.com.',
type='PRIMARY',
records=objects.RecordList.from_list([]),
zone_id='123',
zone_name='example.com.',
id='1',
)
result = rrset
event = 'dns.recordset.delete'
args = ('123', '1',)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': 'foo.example.com.',
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
#
# Zone Imports
#
def test_audit_import_create(self):
zimport = objects.ZoneImport(
zone_id='123',
)
result = zimport
event = 'dns.zone_import.create'
args = (zimport,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_import_delete(self):
zimport = objects.ZoneImport(
zone_id='123',
)
result = zimport
event = 'dns.zone_import.create'
args = ('1')
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
#
# Zone Exports
#
def test_audit_export_create(self):
zexport = objects.ZoneExport(
zone_id='123',
)
result = zexport
event = 'dns.zone_export.create'
args = (zexport,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_export_delete(self):
zexport = objects.ZoneExport(
zone_id='123',
)
result = zexport
event = 'dns.zone_export.create'
args = ('1')
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
#
# Zone Transfer Requests
#
def test_audit_transfer_request_create(self):
ztransfer_request = objects.ZoneTransferRequest(
zone_id='123',
zone_name='example.com.',
target_tenant_id='tenant_a',
)
result = ztransfer_request
event = 'dns.zone_transfer_request.create'
args = (ztransfer_request,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_transfer_request_update(self):
ztransfer_request = objects.ZoneTransferRequest(
zone_id='123',
zone_name='example.com.',
target_tenant_id='tenant_a',
)
ztransfer_request.target_tenant_id = 'tenant_b'
result = ztransfer_request
event = 'dns.zone_transfer_request.update'
args = (ztransfer_request,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': 'target_tenant_id',
'new_data': 'tenant_b',
'old_data': 'tenant_a',
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
def test_audit_transfer_request_delete(self):
ztransfer_request = objects.ZoneTransferRequest(
zone_id='123',
zone_name='example.com.',
target_tenant_id='tenant_a',
)
result = ztransfer_request
event = 'dns.zone_transfer_request.create'
args = ('1')
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': 'example.com.'
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)
#
# Zone Transfer Requests
#
def test_audit_transfer_accept_create(self):
ztransfer_accept = objects.ZoneTransferAccept(
zone_id='123',
)
result = ztransfer_accept
event = 'dns.zone_transfer_accept.create'
args = (ztransfer_accept,)
kwargs = {'wumbo': 'mumbo'}
expected = [{
'changed_field': None,
'new_data': None,
'old_data': None,
'recordset_name': None,
'zone_id': '123',
'zone_name': None
}]
driver_result = self.driver.emit(
event, self.context, result, args, kwargs)
self.assertEqual(driver_result, expected)

View File

@ -0,0 +1,21 @@
---
features:
- Operators now have a choice in the type of notification payload
content that Designate can emit via oslo.messaging's Notifier.
The default plugin is configured to emit the same information
that the notifications previous to this patch emitted. So there
is no functional change.
Operators can write their own notification plugins that exist
in designate/notifications.py.
An "audit" plugin is included. This plugin emits object changes,
if they exist, along with zone ids, zone/recordset names.
A plugin can define multiple payloads from a single notification
to be emitted at once, if desirable.
The selection of a plugin is defined by python entrypoints (for
driver availability) and the new "notification_plugin" option
in the "DEFAULT" config section.

View File

@ -137,6 +137,9 @@ designate.heartbeat_emitter =
noop = designate.service_status:NoopEmitter
rpc = designate.service_status:RpcEmitter
designate.notification.plugin =
default = designate.notifications:Default
audit = designate.notifications:Audit
[build_sphinx]
all_files = 1