From 67cc68e903dbfca16d6d35cd4f2e89795c61f519 Mon Sep 17 00:00:00 2001 From: Vinod Mangalpally Date: Wed, 25 Jun 2014 10:16:17 -0500 Subject: [PATCH] mdns: Add support for NOTIFY's designate-mdns will be notified by Central when changes are made to a zone. The service will issue a NOTIFY to all slave nameservers. Currently this is a pre-configured list of slaves to NOTIFY. After Pools, the list of slaves to NOTIFY will be loaded dynamically. Change-Id: Iead2287f79726eddbb225538de961332cda49f24 Implements: blueprint mdns-designate-mdns-notify --- designate/central/__init__.py | 18 ++++ designate/central/service.py | 8 ++ designate/mdns/__init__.py | 23 +++- designate/mdns/handler.py | 6 +- designate/mdns/notify.py | 131 +++++++++++++++++++++++ designate/mdns/rpcapi.py | 51 +++++++++ designate/mdns/service.py | 18 +++- designate/tests/test_mdns/test_notify.py | 69 ++++++++++++ 8 files changed, 313 insertions(+), 11 deletions(-) create mode 100644 designate/mdns/notify.py create mode 100644 designate/mdns/rpcapi.py create mode 100644 designate/tests/test_mdns/test_notify.py diff --git a/designate/central/__init__.py b/designate/central/__init__.py index 832c7f7d..3b7cb4d8 100644 --- a/designate/central/__init__.py +++ b/designate/central/__init__.py @@ -13,6 +13,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from designate.mdns import rpcapi as mdns_rpcapi + from oslo.config import cfg cfg.CONF.register_group(cfg.OptGroup( @@ -39,3 +41,19 @@ cfg.CONF.register_opts([ help="The Tenant ID that will own any managed resources."), cfg.StrOpt('min_ttl', default="None", help="Minimum TTL allowed") ], group='service:central') + +MDNS_API = None + + +def get_mdns_api(): + """ + The rpc.get_client() which is called upon the API object initialization + will cause a assertion error if the designate.rpc.TRANSPORT isn't setup by + rpc.init() before. + + This fixes that by creating the rpcapi when demanded. + """ + global MDNS_API + if not MDNS_API: + MDNS_API = mdns_rpcapi.MdnsAPI() + return MDNS_API diff --git a/designate/central/service.py b/designate/central/service.py index 613af8d7..7dea86e7 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -24,6 +24,7 @@ from designate.openstack.common import log as logging from designate.openstack.common.gettextutils import _LI from designate.openstack.common.gettextutils import _LC from designate import backend +from designate import central from designate import exceptions from designate import network_api from designate import objects @@ -91,6 +92,10 @@ class Service(service.Service): self.backend.stop() + @property + def mdns_api(self): + return central.get_mdns_api() + def _is_valid_domain_name(self, context, domain_name): # Validate domain name length if len(domain_name) > cfg.CONF['service:central'].max_domain_name_len: @@ -621,6 +626,7 @@ class Service(service.Service): self.backend.update_domain(context, domain) self.notifier.info(context, 'dns.domain.update', domain) + self.mdns_api.notify_zone_changed(context, domain.name) return domain @@ -797,6 +803,7 @@ class Service(service.Service): # Send RecordSet update notification self.notifier.info(context, 'dns.recordset.update', recordset) + self.mdns_api.notify_zone_changed(context, domain.name) return recordset @@ -952,6 +959,7 @@ class Service(service.Service): # Send Record update notification self.notifier.info(context, 'dns.record.update', record) + self.mdns_api.notify_zone_changed(context, domain.name) return record diff --git a/designate/mdns/__init__.py b/designate/mdns/__init__.py index 5afdac8a..8e489252 100644 --- a/designate/mdns/__init__.py +++ b/designate/mdns/__init__.py @@ -19,15 +19,30 @@ cfg.CONF.register_group(cfg.OptGroup( name='service:mdns', title="Configuration for mDNS Service" )) -cfg.CONF.register_opts([ +OPTS = [ cfg.IntOpt('workers', default=None, - help='Number of worker processes to spawn'), + help='Number of mdns worker processes to spawn'), cfg.StrOpt('host', default='0.0.0.0', help='mDNS Bind Host'), + cfg.ListOpt('slave-nameserver-ips-and-ports', default=[], + help='Ips and ports of slave nameservers that are notified of ' + 'zone changes. The format of each item in the list is' + '"ipaddress:port"'), + cfg.IntOpt('notify-timeout', default=60, + help='The number of seconds to wait before the notify query ' + 'times out.'), + cfg.IntOpt('notify-retries', default=0, + help='The number of retries of a notify to a slave ' + 'nameserver. A notify-retries of 0 implies that on an ' + 'error after sending a NOTIFY, there would not be any ' + 'retries. A -ve number implies that NOTIFYs are not sent ' + 'at all'), cfg.IntOpt('port', default=5354, help='mDNS Port Number'), - cfg.IntOpt('tcp_backlog', default=100, + cfg.IntOpt('tcp-backlog', default=100, help='mDNS TCP Backlog'), cfg.StrOpt('storage-driver', default='sqlalchemy', help='The storage driver to use'), -], group='service:mdns') +] + +cfg.CONF.register_opts(OPTS, group='service:mdns') diff --git a/designate/mdns/handler.py b/designate/mdns/handler.py index 88bea079..dd9de3db 100644 --- a/designate/mdns/handler.py +++ b/designate/mdns/handler.py @@ -33,8 +33,8 @@ class RequestHandler(object): def handle(self, payload): request = dns.message.from_wire(payload) - # As we move furthur with the implementation, we'll want to: - # 1) Decord the payload using DNSPython + # As we move further with the implementation, we'll want to: + # 1) Decode the payload using DNSPython # 2) Hand off to either _handle_query or _handle_unsupported # based on the OpCode # 3) Gather the query results from storage @@ -58,7 +58,7 @@ class RequestHandler(object): """ Handle Unsupported DNS OpCode's - Unspoorted OpCode's include STATUS, IQUERY, NOTIFY, UPDATE + Unsupported OpCode's include STATUS, IQUERY, NOTIFY, UPDATE """ response = dns.message.make_response(request) response.set_rcode(dns.rcode.REFUSED) diff --git a/designate/mdns/notify.py b/designate/mdns/notify.py new file mode 100644 index 00000000..4b98f640 --- /dev/null +++ b/designate/mdns/notify.py @@ -0,0 +1,131 @@ +# Copyright (c) 2014 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import dns +from oslo import messaging +from oslo.config import cfg + +from designate import exceptions +from designate.openstack.common import log as logging +from designate.openstack.common.gettextutils import _LI +from designate.openstack.common.gettextutils import _LW + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class NotifyEndpoint(object): + RPC_NOTIFY_API_VERSION = '0.1' + + target = messaging.Target( + namespace='notify', version=RPC_NOTIFY_API_VERSION) + + def __init__(self, *args, **kwargs): + # Parse the slave-nameserver-ips-and-ports. + self._slave_server_ips = [] + self._slave_server_ports = [] + self._total_slave_nameservers = 0 + for slave in CONF['service:mdns'].slave_nameserver_ips_and_ports: + slave_details = slave.split(':') + + # Check each entry to ensure that it has an IP and port. + if (len(slave_details) != 2): + raise exceptions.ConfigurationError( + "'slave-nameserver-ips-and-ports' in ['service:mdns'] is " + "not in the correct format. Expected format 'ipaddress:" + "port'. Got %(list_item)s" % {'list_item': slave}) + + self._slave_server_ips.append(slave_details[0]) + self._slave_server_ports.append(int(slave_details[1])) + self._total_slave_nameservers += 1 + + LOG.info(_LI("slave nameserver ips = %(slave_server_ips)s") % + {"slave_server_ips": self._slave_server_ips}) + LOG.info(_LI("slave nameserver ports = %(slave_server_ports)s") % + {"slave_server_ports": self._slave_server_ports}) + LOG.info(_LI("started mdns notify endpoint")) + + def notify_zone_changed(self, context, zone_name): + """ + :param context: The user context. + :param zone_name: The zone name for which some data changed. + :return: None + """ + notify_message = self._get_notify_message(context, zone_name) + for current in range(0, self._total_slave_nameservers): + retry = -1 + + # retry sending NOTIFY if specified by configuration file. + while retry < CONF['service:mdns'].notify_retries: + retry = retry + 1 + response = self._send_notify_message( + context, zone_name, notify_message, + self._slave_server_ips[current], + self._slave_server_ports[current], + timeout=CONF['service:mdns'].notify_timeout) + if isinstance(response, dns.exception.Timeout): + # retry sending the message if we get a Timeout. + continue + else: + break + + def _get_notify_message(self, context, zone_name): + """ + :param context: The user context. + :param zone_name: The zone name for which a NOTIFY needs to be sent. + :return: The constructed notify message. + """ + notify_message = dns.message.make_query(zone_name, dns.rdatatype.SOA) + notify_message.flags = 0 + notify_message.set_opcode(dns.opcode.NOTIFY) + notify_message.set_rcode(dns.rcode.NOERROR) + notify_message.flags = notify_message.flags | dns.flags.AA + + return notify_message + + def _send_notify_message(self, context, zone_name, notify_message, dest_ip, + dest_port, timeout): + """ + :param context: The user context. + :param zone_name: The zone name for which a NOTIFY needs to be sent. + :param notify_message: The notify message that needs to be sent to the + slave name servers. + :param dest_ip: The destination ip. + :param dest_port: The destination port. + :param timeout: The timeout in seconds to wait for a response. + :return: None + """ + try: + response = dns.query.udp( + notify_message, dest_ip, port=dest_port, timeout=timeout) + + # Check that we actually got a NOERROR in the rcode + if dns.rcode.from_flags( + response.flags, response.ednsflags) != dns.rcode.NOERROR: + LOG.warn(_LW("Failed to get NOERROR while trying to notify " + "change in %(zone)s to %(server)s:%(port)d. " + "Response message = %(resp)s") % + {'zone': zone_name, 'server': dest_ip, + 'port': dest_port, 'resp': str(response)}) + return response + except dns.exception.Timeout as timeout: + LOG.warn(_LW("Got Timeout while trying to notify change in" + " %(zone)s to %(server)s:%(port)d. ") % + {'zone': zone_name, 'server': dest_ip, 'port': dest_port}) + return timeout + except dns.query.BadResponse as badResponse: + LOG.warn(_LW("Got BadResponse while trying to notify " + "change in %(zone)s to %(server)s:%(port)d") % + {'zone': zone_name, 'server': dest_ip, 'port': dest_port}) + return badResponse diff --git a/designate/mdns/rpcapi.py b/designate/mdns/rpcapi.py new file mode 100644 index 00000000..54bb9652 --- /dev/null +++ b/designate/mdns/rpcapi.py @@ -0,0 +1,51 @@ +# Copyright (c) 2014 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo.config import cfg +from oslo import messaging + +from designate.openstack.common import log as logging +from designate.openstack.common.gettextutils import _LI +from designate import rpc + + +LOG = logging.getLogger(__name__) + + +class MdnsAPI(object): + """ + Client side of the mdns RPC API. + + Notify API version history: + + 0.1 - Initial version under development. This will be bumped to 1.0 + after a reasonably usable version is implemented. + """ + RPC_NOTIFY_API_VERSION = '0.1' + + def __init__(self, topic=None): + topic = topic if topic else cfg.CONF.mdns_topic + + notify_target = messaging.Target(topic=topic, + namespace='notify', + version=self.RPC_NOTIFY_API_VERSION) + self.notify_client = rpc.get_client(notify_target, version_cap='0.1') + + def notify_zone_changed(self, context, zone_name): + LOG.info(_LI("notify_zone_changed: Calling mdns's notify_zone_changed " + "for zone '%(zone_name)s'") % {'zone_name': zone_name}) + # The notify_zone_changed method is a cast rather than a call since the + # caller need not wait for the notify to complete. + return self.notify_client.cast( + context, 'notify_zone_changed', zone_name=zone_name) diff --git a/designate/mdns/service.py b/designate/mdns/service.py index 60a5f928..506ae1ce 100644 --- a/designate/mdns/service.py +++ b/designate/mdns/service.py @@ -17,12 +17,12 @@ import socket from oslo.config import cfg +from designate import service +from designate.mdns import handler +from designate.mdns import notify from designate.openstack.common import log as logging from designate.openstack.common.gettextutils import _LI from designate.openstack.common.gettextutils import _LW -from designate import service -from designate.mdns import handler - LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -30,6 +30,8 @@ CONF = cfg.CONF class Service(service.Service): def __init__(self, *args, **kwargs): + notify_endpoint = notify.NotifyEndpoint() + kwargs['endpoints'] = [notify_endpoint] super(Service, self).__init__(*args, **kwargs) # Create an instance of the RequestHandler class @@ -55,8 +57,15 @@ class Service(service.Service): self.tg.add_thread(self._handle_tcp) self.tg.add_thread(self._handle_udp) + LOG.info(_LI("started mdns service")) + + def stop(self): + # When the service is stopped, the threads for _handle_tcp and + # _handle_udp are stopped too. + super(Service, self).stop() def _handle_tcp(self): + LOG.info(_LI("_handle_tcp thread started")) while True: client, addr = self._sock_tcp.accept() LOG.warn(_LW("Handling TCP Request from: %s") % addr) @@ -66,8 +75,9 @@ class Service(service.Service): self.tg.add_thread(self._handle, addr, payload, client) def _handle_udp(self): + LOG.info(_LI("_handle_udp thread started")) while True: - # TODO(kiall): Determine the approperiate default value for + # TODO(kiall): Determine the appropriate default value for # UDP recvfrom. payload, addr = self._sock_udp.recvfrom(8192) LOG.warn(_LW("Handling UDP Request from: %s") % addr) diff --git a/designate/tests/test_mdns/test_notify.py b/designate/tests/test_mdns/test_notify.py new file mode 100644 index 00000000..6ed02f03 --- /dev/null +++ b/designate/tests/test_mdns/test_notify.py @@ -0,0 +1,69 @@ +# Copyright (c) 2014 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import binascii + +import dns +from mock import patch + +from designate.tests.test_mdns import MdnsTestCase +from designate.mdns import notify + + +class MdnsNotifyTest(MdnsTestCase): + def setUp(self): + super(MdnsNotifyTest, self).setUp() + + # Ensure that notify options are set + self.config(slave_nameserver_ips_and_ports=['127.0.0.1:65255'], + group='service:mdns') + self.notify = notify.NotifyEndpoint() + + @patch.object(notify.NotifyEndpoint, '_send_notify_message') + def test_notify_opcode(self, mock): + context = self.get_context() + self.notify.notify_zone_changed(context, 'example.com') + self.assertTrue(mock.called) + + def test_get_notify_message(self): + context = self.get_context() + # DNS message with NOTIFY opcode + ref_message = \ + "4d2824000001000000000000076578616d706c6503636f6d0000060001" + msg = self.notify._get_notify_message(context, 'example.com') + # The first 11 characters of the on wire message change on every run. + msg_tail = binascii.b2a_hex(msg.to_wire())[11:] + self.assertEqual(ref_message[11:], msg_tail) + + @patch.object(dns.query, 'udp', side_effect=dns.exception.Timeout()) + def test_send_notify_message_timeout(self, _): + context = self.get_context() + # DNS message with NOTIFY opcode + notify_message = dns.message.from_wire(binascii.a2b_hex( + "4d2824000001000000000000076578616d706c6503636f6d0000060001")) + + msg = self.notify._send_notify_message( + context, 'example.com', notify_message, '127.0.0.1', 65255, 1) + self.assertIsInstance(msg, dns.exception.Timeout) + + @patch.object(dns.query, 'udp', side_effect=dns.query.BadResponse) + def test_send_notify_message_badresponse(self, _): + context = self.get_context() + # DNS message with NOTIFY opcode + notify_message = dns.message.from_wire(binascii.a2b_hex( + "4d2824000001000000000000076578616d706c6503636f6d0000060001")) + + msg = self.notify._send_notify_message( + context, 'example.com', notify_message, '127.0.0.1', 65255, 1) + self.assertIsInstance(msg, dns.query.BadResponse)