Agent cleanup/refactoring

- Take reusable code out of the Agent and into dnsutils
- Make use of the code already in dnsutils
- Delete some unnecessary code

Change-Id: Id2ba1ff1bcc86d1b3b1c30ddafe48eebdefb2039
This commit is contained in:
Tim Simmons 2015-01-23 06:48:44 +00:00
parent a80e6d0430
commit 1fc119d34d
9 changed files with 125 additions and 303 deletions

View File

@ -28,6 +28,8 @@ OPTS = [
help='mDNS Port Number'),
cfg.IntOpt('tcp-backlog', default=100,
help='The Agent TCP Backlog'),
cfg.FloatOpt('tcp-recv-timeout', default=0.5,
help='Agent TCP Receive Timeout'),
cfg.ListOpt('allow-notify', default=[],
help='List of IP addresses allowed to NOTIFY The Agent'),
cfg.ListOpt('masters', default=[],

View File

@ -1,63 +0,0 @@
# Copyright 2014 Rackspace Inc.
#
# Author: Tim Simmons <tim.simmons@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dns
import dns.zone
from oslo.config import cfg
from oslo_log import log as logging
from designate.i18n import _LI
from designate.i18n import _LE
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class AXFR(object):
def __init__(self):
self.masters = []
for server in CONF['service:agent'].masters:
raw_server = server.split(':')
master = {'ip': raw_server[0], 'port': int(raw_server[1])}
self.masters.append(master)
LOG.info(_LI("Agent masters: %(masters)s") %
{'masters': self.masters})
def do_axfr(self, zone_name):
"""
Performs an AXFR for a given zone name
"""
# TODO(Tim): Try the first master, try others if they exist
master = self.masters[0]
LOG.info(_LI("Doing AXFR for %(name)s from %(host)s") %
{'name': zone_name, 'host': master})
xfr = dns.query.xfr(master['ip'], zone_name, relativize=False,
port=master['port'])
try:
# TODO(Tim): Add a timeout to this function
raw_zone = dns.zone.from_xfr(xfr, relativize=False)
except Exception:
LOG.exception(_LE("There was a problem with the AXFR"))
raise
LOG.debug("AXFR Successful for %s" % raw_zone.origin.to_text())
return raw_zone

View File

@ -17,9 +17,11 @@ import dns
from oslo.config import cfg
from oslo_log import log as logging
from designate.agent import axfr
from designate import dnsutils
from designate.backend import agent_backend
from designate.i18n import _LW
from designate.i18n import _LI
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -39,7 +41,15 @@ DELETE = 65283
class RequestHandler(object):
def __init__(self):
self.xfr = axfr.AXFR()
self.masters = []
for server in CONF['service:agent'].masters:
raw_server = server.split(':')
master = {'ip': raw_server[0], 'port': int(raw_server[1])}
self.masters.append(master)
LOG.info(_LI("Agent masters: %(masters)s") %
{'masters': self.masters})
self.allow_notify = CONF['service:agent'].allow_notify
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
@ -108,7 +118,7 @@ class RequestHandler(object):
{'verb': "CREATE", 'name': domain_name, 'host': requester})
try:
zone = self.xfr.do_axfr(domain_name)
zone = dnsutils.do_axfr(domain_name, self.masters)
self.backend.create_domain(zone)
except Exception:
response.set_rcode(dns.rcode.from_text("SERVFAIL"))
@ -158,7 +168,7 @@ class RequestHandler(object):
# Check that the serial is < serial above
try:
zone = self.xfr.do_axfr(domain_name)
zone = dnsutils.do_axfr(domain_name, self.masters)
self.backend.update_domain(zone)
except Exception:
response.set_rcode(dns.rcode.from_text("SERVFAIL"))

View File

@ -1,42 +0,0 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Middleware(object):
def __init__(self, application):
self.application = application
def process_request(self, request):
"""Called on each request.
If this returns None, the next application down the stack will be
executed. If it returns a response then that response will be returned
and execution will stop here.
"""
return None
def process_response(self, response):
"""Do whatever you'd like to the response."""
return response
def __call__(self, request):
response = self.process_request(request)
if response:
return response
response = self.application(request)
return self.process_response(response)

View File

@ -13,29 +13,24 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import struct
import dns
from oslo.config import cfg
from oslo_log import log as logging
from designate import dnsutils
from designate import service
from designate.agent import handler
from designate.agent import middleware
from designate.backend import agent_backend
from designate.i18n import _LE
from designate.i18n import _LI
from designate.i18n import _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.TCPService):
class Service(service.DNSService):
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
super(Service, self).__init__(cfg.CONF['service:agent'], *args,
**kwargs)
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
@ -43,138 +38,13 @@ class Service(service.TCPService):
# Create an instance of the RequestHandler class
self.application = handler.RequestHandler()
# Wrap the application in any middleware required
# TODO(kiall): In the future, we want to allow users to pick+choose
# the middleware to be applied, similar to how we do this
# in the API.
self.application = middleware.Middleware(self.application)
# Bind to the TCP port
LOG.info(_LI('Opening TCP Listening Socket on %(host)s:%(port)d') %
{'host': CONF['service:agent'].host,
'port': CONF['service:agent'].port})
self._sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._sock_tcp.bind((CONF['service:agent'].host,
CONF['service:agent'].port))
self._sock_tcp.listen(CONF['service:agent'].tcp_backlog)
# Bind to the UDP port
LOG.info(_LI('Opening UDP Listening Socket on %(host)s:%(port)d') %
{'host': CONF['service:agent'].host,
'port': CONF['service:agent'].port})
self._sock_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock_udp.bind((CONF['service:agent'].host,
CONF['service:agent'].port))
self.application = dnsutils.DNSMiddleware(self.application)
def start(self):
super(Service, self).start()
self.backend.start()
self.tg.add_thread(self._handle_tcp)
self.tg.add_thread(self._handle_udp)
LOG.info(_LI("Started Agent Service"))
def stop(self):
super(Service, self).stop()
LOG.info(_LI("Stopped Agent Service"))
def _deserialize_request(self, payload, addr):
"""
Deserialize a DNS Request Packet
:param payload: Raw DNS query payload
:param addr: Tuple of the client's (IP, Port)
"""
try:
request = dns.message.from_wire(payload)
except dns.exception.DNSException:
LOG.error(_LE("Failed to deserialize packet from "
"%(host)s:%(port)d") %
{'host': addr[0], 'port': addr[1]})
return None
else:
# Create + Attach the initial "environ" dict. This is similar to
# the environ dict used in typical WSGI middleware.
request.environ = {'addr': addr}
return request
def _serialize_response(self, response):
"""
Serialize a DNS Response Packet
:param response: DNS Response Message
"""
return response.to_wire()
def _handle_tcp(self):
LOG.info(_LI("_handle_tcp thread started"))
while True:
client, addr = self._sock_tcp.accept()
LOG.debug("Handling TCP Request from: %(host)s:%(port)d" %
{'host': addr[0], 'port': addr[1]})
payload = client.recv(65535)
(expected_length,) = struct.unpack('!H', payload[0:2])
actual_length = len(payload[2:])
# For now we assume all requests are one packet
# TODO(vinod): Handle multipacket requests
if (expected_length != actual_length):
LOG.warn(_LW("got a packet with unexpected length from "
"%(host)s:%(port)d. Expected length=%(elen)d. "
"Actual length=%(alen)d.") %
{'host': addr[0], 'port': addr[1],
'elen': expected_length, 'alen': actual_length})
client.close()
else:
self.tg.add_thread(self._handle, addr, payload[2:], client)
def _handle_udp(self):
LOG.info(_LI("_handle_udp thread started"))
while True:
# TODO(kiall): Determine the appropriate default value for
# UDP recvfrom.
payload, addr = self._sock_udp.recvfrom(8192)
LOG.debug("Handling UDP Request from: %(host)s:%(port)d" %
{'host': addr[0], 'port': addr[1]})
self.tg.add_thread(self._handle, addr, payload)
def _handle(self, addr, payload, client=None):
"""
Handle a DNS Query
:param addr: Tuple of the client's (IP, Port)
:param payload: Raw DNS query payload
:param client: Client socket (for TCP only)
"""
try:
request = self._deserialize_request(payload, addr)
if request is None:
# We failed to deserialize the request, generate a failure
# response using a made up request.
response = dns.message.make_response(
dns.message.make_query('unknown', dns.rdatatype.A))
response.set_rcode(dns.rcode.FORMERR)
else:
response = self.application(request)
# send back a response only if present
if response:
response = self._serialize_response(response)
if client is not None:
# Handle TCP Responses
msg_length = len(response)
tcp_response = struct.pack("!H", msg_length) + response
client.send(tcp_response)
client.close()
else:
# Handle UDP Responses
self._sock_udp.sendto(response, addr)
except Exception:
LOG.exception(_LE("Unhandled exception while processing request "
"from %(host)s:%(port)d") %
{'host': addr[0], 'port': addr[1]})

View File

@ -17,6 +17,7 @@ import socket
import struct
import dns
import dns.zone
from dns import rdatatype
from oslo_log import log as logging
@ -29,6 +30,33 @@ from designate.i18n import _LW
LOG = logging.getLogger(__name__)
class DNSMiddleware(object):
def __init__(self, application):
self.application = application
def process_request(self, request):
"""Called on each request.
If this returns None, the next application down the stack will be
executed. If it returns a response then that response will be returned
and execution will stop here.
"""
return None
def process_response(self, response):
"""Do whatever you'd like to the response."""
return response
def __call__(self, request):
response = self.process_request(request)
if response:
return response
response = self.application(request)
return self.process_response(response)
def from_dnspython_zone(dnspython_zone):
# dnspython never builds a zone with more than one SOA, even if we give
# it a zonefile that contains more than one
@ -144,7 +172,7 @@ def handle_tcp(sock_tcp, tg, handle, application, timeout=None):
if timeout:
client.settimeout(timeout)
LOG.info(_LI("Handling TCP Request from: %(host)s:%(port)d") %
LOG.debug("Handling TCP Request from: %(host)s:%(port)d" %
{'host': addr[0], 'port': addr[1]})
# Prepare a variable for the payload to be buffered
@ -177,7 +205,7 @@ def handle_udp(sock_udp, tg, handle, application):
# TODO(kiall): Determine the appropriate default value for
# UDP recvfrom.
payload, addr = sock_udp.recvfrom(8192)
LOG.info(_LI("Handling UDP Request from: %(host)s:%(port)d") %
LOG.debug("Handling UDP Request from: %(host)s:%(port)d" %
{'host': addr[0], 'port': addr[1]})
tg.add_thread(handle, addr, payload, application, sock_udp=sock_udp)
@ -222,3 +250,28 @@ def handle(addr, payload, application, sock_udp=None, client=None):
LOG.exception(_LE("Unhandled exception while processing request "
"from %(host)s:%(port)d") %
{'host': addr[0], 'port': addr[1]})
def do_axfr(zone_name, masters):
"""
Performs an AXFR for a given zone name
"""
# TODO(Tim): Try the first master, try others if they exist
master = masters[0]
LOG.info(_LI("Doing AXFR for %(name)s from %(host)s") %
{'name': zone_name, 'host': master})
xfr = dns.query.xfr(master['ip'], zone_name, relativize=False,
port=master['port'])
try:
# TODO(Tim): Add a timeout to this function
raw_zone = dns.zone.from_xfr(xfr, relativize=False)
except Exception:
LOG.exception(_LE("There was a problem with the AXFR"))
raise
LOG.debug("AXFR Successful for %s" % raw_zone.origin.to_text())
return raw_zone

View File

@ -14,36 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate import context
from designate import dnsutils
class Middleware(object):
def __init__(self, application):
self.application = application
def process_request(self, request):
"""Called on each request.
If this returns None, the next application down the stack will be
executed. If it returns a response then that response will be returned
and execution will stop here.
"""
return None
def process_response(self, response):
"""Do whatever you'd like to the response."""
return response
def __call__(self, request):
response = self.process_request(request)
if response:
return response
response = self.application(request)
return self.process_response(response)
class ContextMiddleware(Middleware):
class ContextMiddleware(dnsutils.DNSMiddleware):
"""Temporary ContextMiddleware which attaches an admin context to every
request

View File

@ -27,6 +27,7 @@ from designate.i18n import _
from designate import rpc
from designate import policy
from designate import version
from designate import dnsutils
CONF = cfg.CONF
@ -128,19 +129,49 @@ class RPCService(Service):
super(RPCService, self).wait()
class TCPService(Service):
class WSGIService(wsgi.Service, Service):
"""
Service class to be shared by all Designate WSGI Services
"""
def __init__(self, application, port, host='0.0.0.0', backlog=4096,
threads=1000):
# NOTE(kiall): We avoid calling super(cls, self) here, as our parent
# classes have different argspecs. Additionally, if we
# manually call both parent's __init__, the openstack
# common Service class's __init__ method will be called
# twice. As a result, we only call the designate base
# Service's __init__ method, and duplicate the
# wsgi.Service's constructor functionality here.
#
Service.__init__(self, threads)
self.application = application
self._port = port
self._host = host
self._backlog = backlog if backlog else CONF.backlog
class DNSService(Service):
"""
Service class to be used for a service that only works in TCP
"""
def __init__(self, host=None, binary=None, service_name=None,
def __init__(self, config, host=None, binary=None, service_name=None,
endpoints=None, threads=1000):
super(TCPService, self).__init__(threads)
super(DNSService, self).__init__(threads)
self.host = host
self.binary = binary
self.service_name = service_name
self.endpoints = endpoints or [self]
self.config = config
self._sock_tcp = dnsutils.bind_tcp(
self.config.host, self.config.port,
self.config.tcp_backlog)
self._sock_udp = dnsutils.bind_udp(
self.config.host, self.config.port)
@classmethod
def create(cls, host=None, binary=None, service_name=None,
@ -164,36 +195,23 @@ class TCPService(Service):
if e != self and hasattr(e, 'start'):
e.start()
super(TCPService, self).start()
self.tg.add_thread(
dnsutils.handle_tcp, self._sock_tcp, self.tg, dnsutils.handle,
self.application, timeout=self.config.tcp_recv_timeout)
self.tg.add_thread(
dnsutils.handle_udp, self._sock_udp, self.tg, dnsutils.handle,
self.application)
super(DNSService, self).start()
def stop(self):
for e in self.endpoints:
if e != self and hasattr(e, 'stop'):
e.stop()
super(TCPService, self).stop()
class WSGIService(wsgi.Service, Service):
"""
Service class to be shared by all Designate WSGI Services
"""
def __init__(self, application, port, host='0.0.0.0', backlog=4096,
threads=1000):
# NOTE(kiall): We avoid calling super(cls, self) here, as our parent
# classes have different argspecs. Additionally, if we
# manually call both parent's __init__, the openstack
# common Service class's __init__ method will be called
# twice. As a result, we only call the designate base
# Service's __init__ method, and duplicate the
# wsgi.Service's constructor functionality here.
#
Service.__init__(self, threads)
self.application = application
self._port = port
self._host = host
self._backlog = backlog if backlog else CONF.backlog
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(DNSService, self).stop()
_launcher = None

View File

@ -33,7 +33,7 @@ class AgentRequestHandlerTest(AgentTestCase):
self.addr = ["0.0.0.0", 5558]
@mock.patch.object(dns.resolver.Resolver, 'query')
@mock.patch('designate.agent.axfr.AXFR.do_axfr')
@mock.patch('designate.dnsutils.do_axfr')
def test_receive_notify(self, func, axfrfunc):
"""
Get a NOTIFY and ensure the response is right,
@ -81,7 +81,7 @@ class AgentRequestHandlerTest(AgentTestCase):
self.assertEqual(expected_response, binascii.b2a_hex(response))
@mock.patch.object(dns.resolver.Resolver, 'query')
@mock.patch('designate.agent.axfr.AXFR.do_axfr')
@mock.patch('designate.dnsutils.do_axfr')
def test_receive_create(self, func, func2):
"""
Get a CREATE and ensure the response is right,