mdns: Add support for NOTIFY's

designate-mdns will be notified by Central when changes are made
to a zone. The service will issue a NOTIFY to all slave nameservers.

Currently this is a pre-configured list of slaves to NOTIFY.  After
Pools, the list of slaves to NOTIFY will be loaded dynamically.

Change-Id: Iead2287f79726eddbb225538de961332cda49f24
Implements: blueprint mdns-designate-mdns-notify
This commit is contained in:
Vinod Mangalpally 2014-06-25 10:16:17 -05:00
parent 0f05754412
commit 67cc68e903
8 changed files with 313 additions and 11 deletions

View File

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from designate.mdns import rpcapi as mdns_rpcapi
from oslo.config import cfg from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup( cfg.CONF.register_group(cfg.OptGroup(
@ -39,3 +41,19 @@ cfg.CONF.register_opts([
help="The Tenant ID that will own any managed resources."), help="The Tenant ID that will own any managed resources."),
cfg.StrOpt('min_ttl', default="None", help="Minimum TTL allowed") cfg.StrOpt('min_ttl', default="None", help="Minimum TTL allowed")
], group='service:central') ], group='service:central')
MDNS_API = None
def get_mdns_api():
"""
The rpc.get_client() which is called upon the API object initialization
will cause a assertion error if the designate.rpc.TRANSPORT isn't setup by
rpc.init() before.
This fixes that by creating the rpcapi when demanded.
"""
global MDNS_API
if not MDNS_API:
MDNS_API = mdns_rpcapi.MdnsAPI()
return MDNS_API

View File

@ -24,6 +24,7 @@ from designate.openstack.common import log as logging
from designate.openstack.common.gettextutils import _LI from designate.openstack.common.gettextutils import _LI
from designate.openstack.common.gettextutils import _LC from designate.openstack.common.gettextutils import _LC
from designate import backend from designate import backend
from designate import central
from designate import exceptions from designate import exceptions
from designate import network_api from designate import network_api
from designate import objects from designate import objects
@ -91,6 +92,10 @@ class Service(service.Service):
self.backend.stop() self.backend.stop()
@property
def mdns_api(self):
return central.get_mdns_api()
def _is_valid_domain_name(self, context, domain_name): def _is_valid_domain_name(self, context, domain_name):
# Validate domain name length # Validate domain name length
if len(domain_name) > cfg.CONF['service:central'].max_domain_name_len: if len(domain_name) > cfg.CONF['service:central'].max_domain_name_len:
@ -621,6 +626,7 @@ class Service(service.Service):
self.backend.update_domain(context, domain) self.backend.update_domain(context, domain)
self.notifier.info(context, 'dns.domain.update', domain) self.notifier.info(context, 'dns.domain.update', domain)
self.mdns_api.notify_zone_changed(context, domain.name)
return domain return domain
@ -797,6 +803,7 @@ class Service(service.Service):
# Send RecordSet update notification # Send RecordSet update notification
self.notifier.info(context, 'dns.recordset.update', recordset) self.notifier.info(context, 'dns.recordset.update', recordset)
self.mdns_api.notify_zone_changed(context, domain.name)
return recordset return recordset
@ -952,6 +959,7 @@ class Service(service.Service):
# Send Record update notification # Send Record update notification
self.notifier.info(context, 'dns.record.update', record) self.notifier.info(context, 'dns.record.update', record)
self.mdns_api.notify_zone_changed(context, domain.name)
return record return record

View File

@ -19,15 +19,30 @@ cfg.CONF.register_group(cfg.OptGroup(
name='service:mdns', title="Configuration for mDNS Service" name='service:mdns', title="Configuration for mDNS Service"
)) ))
cfg.CONF.register_opts([ OPTS = [
cfg.IntOpt('workers', default=None, cfg.IntOpt('workers', default=None,
help='Number of worker processes to spawn'), help='Number of mdns worker processes to spawn'),
cfg.StrOpt('host', default='0.0.0.0', cfg.StrOpt('host', default='0.0.0.0',
help='mDNS Bind Host'), help='mDNS Bind Host'),
cfg.ListOpt('slave-nameserver-ips-and-ports', default=[],
help='Ips and ports of slave nameservers that are notified of '
'zone changes. The format of each item in the list is'
'"ipaddress:port"'),
cfg.IntOpt('notify-timeout', default=60,
help='The number of seconds to wait before the notify query '
'times out.'),
cfg.IntOpt('notify-retries', default=0,
help='The number of retries of a notify to a slave '
'nameserver. A notify-retries of 0 implies that on an '
'error after sending a NOTIFY, there would not be any '
'retries. A -ve number implies that NOTIFYs are not sent '
'at all'),
cfg.IntOpt('port', default=5354, cfg.IntOpt('port', default=5354,
help='mDNS Port Number'), help='mDNS Port Number'),
cfg.IntOpt('tcp_backlog', default=100, cfg.IntOpt('tcp-backlog', default=100,
help='mDNS TCP Backlog'), help='mDNS TCP Backlog'),
cfg.StrOpt('storage-driver', default='sqlalchemy', cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use'), help='The storage driver to use'),
], group='service:mdns') ]
cfg.CONF.register_opts(OPTS, group='service:mdns')

View File

@ -33,8 +33,8 @@ class RequestHandler(object):
def handle(self, payload): def handle(self, payload):
request = dns.message.from_wire(payload) request = dns.message.from_wire(payload)
# As we move furthur with the implementation, we'll want to: # As we move further with the implementation, we'll want to:
# 1) Decord the payload using DNSPython # 1) Decode the payload using DNSPython
# 2) Hand off to either _handle_query or _handle_unsupported # 2) Hand off to either _handle_query or _handle_unsupported
# based on the OpCode # based on the OpCode
# 3) Gather the query results from storage # 3) Gather the query results from storage
@ -58,7 +58,7 @@ class RequestHandler(object):
""" """
Handle Unsupported DNS OpCode's Handle Unsupported DNS OpCode's
Unspoorted OpCode's include STATUS, IQUERY, NOTIFY, UPDATE Unsupported OpCode's include STATUS, IQUERY, NOTIFY, UPDATE
""" """
response = dns.message.make_response(request) response = dns.message.make_response(request)
response.set_rcode(dns.rcode.REFUSED) response.set_rcode(dns.rcode.REFUSED)

131
designate/mdns/notify.py Normal file
View File

@ -0,0 +1,131 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dns
from oslo import messaging
from oslo.config import cfg
from designate import exceptions
from designate.openstack.common import log as logging
from designate.openstack.common.gettextutils import _LI
from designate.openstack.common.gettextutils import _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class NotifyEndpoint(object):
RPC_NOTIFY_API_VERSION = '0.1'
target = messaging.Target(
namespace='notify', version=RPC_NOTIFY_API_VERSION)
def __init__(self, *args, **kwargs):
# Parse the slave-nameserver-ips-and-ports.
self._slave_server_ips = []
self._slave_server_ports = []
self._total_slave_nameservers = 0
for slave in CONF['service:mdns'].slave_nameserver_ips_and_ports:
slave_details = slave.split(':')
# Check each entry to ensure that it has an IP and port.
if (len(slave_details) != 2):
raise exceptions.ConfigurationError(
"'slave-nameserver-ips-and-ports' in ['service:mdns'] is "
"not in the correct format. Expected format 'ipaddress:"
"port'. Got %(list_item)s" % {'list_item': slave})
self._slave_server_ips.append(slave_details[0])
self._slave_server_ports.append(int(slave_details[1]))
self._total_slave_nameservers += 1
LOG.info(_LI("slave nameserver ips = %(slave_server_ips)s") %
{"slave_server_ips": self._slave_server_ips})
LOG.info(_LI("slave nameserver ports = %(slave_server_ports)s") %
{"slave_server_ports": self._slave_server_ports})
LOG.info(_LI("started mdns notify endpoint"))
def notify_zone_changed(self, context, zone_name):
"""
:param context: The user context.
:param zone_name: The zone name for which some data changed.
:return: None
"""
notify_message = self._get_notify_message(context, zone_name)
for current in range(0, self._total_slave_nameservers):
retry = -1
# retry sending NOTIFY if specified by configuration file.
while retry < CONF['service:mdns'].notify_retries:
retry = retry + 1
response = self._send_notify_message(
context, zone_name, notify_message,
self._slave_server_ips[current],
self._slave_server_ports[current],
timeout=CONF['service:mdns'].notify_timeout)
if isinstance(response, dns.exception.Timeout):
# retry sending the message if we get a Timeout.
continue
else:
break
def _get_notify_message(self, context, zone_name):
"""
:param context: The user context.
:param zone_name: The zone name for which a NOTIFY needs to be sent.
:return: The constructed notify message.
"""
notify_message = dns.message.make_query(zone_name, dns.rdatatype.SOA)
notify_message.flags = 0
notify_message.set_opcode(dns.opcode.NOTIFY)
notify_message.set_rcode(dns.rcode.NOERROR)
notify_message.flags = notify_message.flags | dns.flags.AA
return notify_message
def _send_notify_message(self, context, zone_name, notify_message, dest_ip,
dest_port, timeout):
"""
:param context: The user context.
:param zone_name: The zone name for which a NOTIFY needs to be sent.
:param notify_message: The notify message that needs to be sent to the
slave name servers.
:param dest_ip: The destination ip.
:param dest_port: The destination port.
:param timeout: The timeout in seconds to wait for a response.
:return: None
"""
try:
response = dns.query.udp(
notify_message, dest_ip, port=dest_port, timeout=timeout)
# Check that we actually got a NOERROR in the rcode
if dns.rcode.from_flags(
response.flags, response.ednsflags) != dns.rcode.NOERROR:
LOG.warn(_LW("Failed to get NOERROR while trying to notify "
"change in %(zone)s to %(server)s:%(port)d. "
"Response message = %(resp)s") %
{'zone': zone_name, 'server': dest_ip,
'port': dest_port, 'resp': str(response)})
return response
except dns.exception.Timeout as timeout:
LOG.warn(_LW("Got Timeout while trying to notify change in"
" %(zone)s to %(server)s:%(port)d. ") %
{'zone': zone_name, 'server': dest_ip, 'port': dest_port})
return timeout
except dns.query.BadResponse as badResponse:
LOG.warn(_LW("Got BadResponse while trying to notify "
"change in %(zone)s to %(server)s:%(port)d") %
{'zone': zone_name, 'server': dest_ip, 'port': dest_port})
return badResponse

51
designate/mdns/rpcapi.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from designate.openstack.common import log as logging
from designate.openstack.common.gettextutils import _LI
from designate import rpc
LOG = logging.getLogger(__name__)
class MdnsAPI(object):
"""
Client side of the mdns RPC API.
Notify API version history:
0.1 - Initial version under development. This will be bumped to 1.0
after a reasonably usable version is implemented.
"""
RPC_NOTIFY_API_VERSION = '0.1'
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.mdns_topic
notify_target = messaging.Target(topic=topic,
namespace='notify',
version=self.RPC_NOTIFY_API_VERSION)
self.notify_client = rpc.get_client(notify_target, version_cap='0.1')
def notify_zone_changed(self, context, zone_name):
LOG.info(_LI("notify_zone_changed: Calling mdns's notify_zone_changed "
"for zone '%(zone_name)s'") % {'zone_name': zone_name})
# The notify_zone_changed method is a cast rather than a call since the
# caller need not wait for the notify to complete.
return self.notify_client.cast(
context, 'notify_zone_changed', zone_name=zone_name)

View File

@ -17,12 +17,12 @@ import socket
from oslo.config import cfg from oslo.config import cfg
from designate import service
from designate.mdns import handler
from designate.mdns import notify
from designate.openstack.common import log as logging from designate.openstack.common import log as logging
from designate.openstack.common.gettextutils import _LI from designate.openstack.common.gettextutils import _LI
from designate.openstack.common.gettextutils import _LW from designate.openstack.common.gettextutils import _LW
from designate import service
from designate.mdns import handler
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -30,6 +30,8 @@ CONF = cfg.CONF
class Service(service.Service): class Service(service.Service):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
notify_endpoint = notify.NotifyEndpoint()
kwargs['endpoints'] = [notify_endpoint]
super(Service, self).__init__(*args, **kwargs) super(Service, self).__init__(*args, **kwargs)
# Create an instance of the RequestHandler class # Create an instance of the RequestHandler class
@ -55,8 +57,15 @@ class Service(service.Service):
self.tg.add_thread(self._handle_tcp) self.tg.add_thread(self._handle_tcp)
self.tg.add_thread(self._handle_udp) self.tg.add_thread(self._handle_udp)
LOG.info(_LI("started mdns service"))
def stop(self):
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(Service, self).stop()
def _handle_tcp(self): def _handle_tcp(self):
LOG.info(_LI("_handle_tcp thread started"))
while True: while True:
client, addr = self._sock_tcp.accept() client, addr = self._sock_tcp.accept()
LOG.warn(_LW("Handling TCP Request from: %s") % addr) LOG.warn(_LW("Handling TCP Request from: %s") % addr)
@ -66,8 +75,9 @@ class Service(service.Service):
self.tg.add_thread(self._handle, addr, payload, client) self.tg.add_thread(self._handle, addr, payload, client)
def _handle_udp(self): def _handle_udp(self):
LOG.info(_LI("_handle_udp thread started"))
while True: while True:
# TODO(kiall): Determine the approperiate default value for # TODO(kiall): Determine the appropriate default value for
# UDP recvfrom. # UDP recvfrom.
payload, addr = self._sock_udp.recvfrom(8192) payload, addr = self._sock_udp.recvfrom(8192)
LOG.warn(_LW("Handling UDP Request from: %s") % addr) LOG.warn(_LW("Handling UDP Request from: %s") % addr)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import dns
from mock import patch
from designate.tests.test_mdns import MdnsTestCase
from designate.mdns import notify
class MdnsNotifyTest(MdnsTestCase):
def setUp(self):
super(MdnsNotifyTest, self).setUp()
# Ensure that notify options are set
self.config(slave_nameserver_ips_and_ports=['127.0.0.1:65255'],
group='service:mdns')
self.notify = notify.NotifyEndpoint()
@patch.object(notify.NotifyEndpoint, '_send_notify_message')
def test_notify_opcode(self, mock):
context = self.get_context()
self.notify.notify_zone_changed(context, 'example.com')
self.assertTrue(mock.called)
def test_get_notify_message(self):
context = self.get_context()
# DNS message with NOTIFY opcode
ref_message = \
"4d2824000001000000000000076578616d706c6503636f6d0000060001"
msg = self.notify._get_notify_message(context, 'example.com')
# The first 11 characters of the on wire message change on every run.
msg_tail = binascii.b2a_hex(msg.to_wire())[11:]
self.assertEqual(ref_message[11:], msg_tail)
@patch.object(dns.query, 'udp', side_effect=dns.exception.Timeout())
def test_send_notify_message_timeout(self, _):
context = self.get_context()
# DNS message with NOTIFY opcode
notify_message = dns.message.from_wire(binascii.a2b_hex(
"4d2824000001000000000000076578616d706c6503636f6d0000060001"))
msg = self.notify._send_notify_message(
context, 'example.com', notify_message, '127.0.0.1', 65255, 1)
self.assertIsInstance(msg, dns.exception.Timeout)
@patch.object(dns.query, 'udp', side_effect=dns.query.BadResponse)
def test_send_notify_message_badresponse(self, _):
context = self.get_context()
# DNS message with NOTIFY opcode
notify_message = dns.message.from_wire(binascii.a2b_hex(
"4d2824000001000000000000076578616d706c6503636f6d0000060001"))
msg = self.notify._send_notify_message(
context, 'example.com', notify_message, '127.0.0.1', 65255, 1)
self.assertIsInstance(msg, dns.query.BadResponse)