Implement mDNS Service

This service listens on a TCP/UDP port, and dispatches requests to a
single method for parsing and handling.

Each incoming request is handled in a greenthread spawned specifically
for that request, allow for >1 concurrent requests.

Additionally, the "workers" config option allows for multiple distict
processes to be spawned for the service, all sharing the same socket,
to allow for >1 CPU core usage.

Change-Id: I173f4640a50b56f52c0371bae47b84ce44301263
This commit is contained in:
Kiall Mac Innes 2014-06-01 12:55:29 +01:00
parent 68728c019f
commit 36e501c6f7
18 changed files with 416 additions and 15 deletions

30
contrib/dns_dump_dnspy.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dns
import dns.message
import dns.rdatatype
import binascii
# Prepare a Packet
request = dns.message.make_query(
qname='example.com.',
rdtype=dns.rdatatype.A,
)
request.set_opcode(dns.opcode.UPDATE)
# Print the hex representation of the Request
print(binascii.b2a_hex(request.to_wire()))

31
contrib/dns_dump_server.py Executable file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import binascii
# Bind to UDP 5355
sock_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_udp.bind(('', 5355))
# Receive a Packet
payload, addr = sock_udp.recvfrom(8192)
# Print the hex representation of the packet
print(binascii.b2a_hex(payload))
# The request just happens to be a kinda-ish valid response
sock_udp.sendto(payload, addr)

View File

@ -33,6 +33,7 @@ cfg.CONF.register_opts([
cfg.StrOpt('central-topic', default='central', help='Central Topic'),
cfg.StrOpt('agent-topic', default='agent', help='Agent Topic'),
cfg.StrOpt('mdns-topic', default='mdns', help='mDNS Topic'),
# Default TTL
cfg.IntOpt('default-ttl', default=3600),

View File

@ -60,7 +60,8 @@ class Service(service.Service):
policy.init()
# Get a storage connection
self.storage_api = storage_api.StorageAPI()
storage_driver = cfg.CONF['service:central'].storage_driver
self.storage_api = storage_api.StorageAPI(storage_driver)
# Get a quota manager instance
self.quota = quota.get_quota()

34
designate/cmd/mdns.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import service
from designate import utils
from designate.mdns import service as mdns_service
CONF = cfg.CONF
CONF.import_opt('workers', 'designate.mdns', group='service:mdns')
def main():
utils.read_config('designate', sys.argv)
logging.setup('designate')
server = mdns_service.Service.create(
binary='designate-mdns')
service.serve(server, workers=CONF['service:mdns'].workers)
service.wait()

View File

@ -0,0 +1,33 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='service:mdns', title="Configuration for mDNS Service"
))
cfg.CONF.register_opts([
cfg.IntOpt('workers', default=None,
help='Number of worker processes to spawn'),
cfg.StrOpt('host', default='0.0.0.0',
help='mDNS Bind Host'),
cfg.IntOpt('port', default=5354,
help='mDNS Port Number'),
cfg.IntOpt('tcp_backlog', default=100,
help='mDNS TCP Backlog'),
cfg.StrOpt('storage-driver', default='sqlalchemy',
help='The storage driver to use'),
], group='service:mdns')

65
designate/mdns/handler.py Normal file
View File

@ -0,0 +1,65 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dns
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.storage import api as storage_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class RequestHandler(object):
def __init__(self):
# Get a storage connection
storage_driver = cfg.CONF['service:mdns'].storage_driver
self.storage_api = storage_api.StorageAPI(storage_driver)
def handle(self, payload):
request = dns.message.from_wire(payload)
# As we move furthur with the implementation, we'll want to:
# 1) Decord the payload using DNSPython
# 2) Hand off to either _handle_query or _handle_unsupported
# based on the OpCode
# 3) Gather the query results from storage
# 4) Build and return response using DNSPython.
if request.opcode() == dns.opcode.QUERY:
response = self._handle_query(request)
else:
response = self._handle_unsupported(request)
return response.to_wire()
def _handle_query(self, request):
""" Handle a DNS QUERY request """
response = dns.message.make_response(request)
response.set_rcode(dns.rcode.SERVFAIL)
return response
def _handle_unsupported(self, request):
"""
Handle Unsupported DNS OpCode's
Unspoorted OpCode's include STATUS, IQUERY, NOTIFY, UPDATE
"""
response = dns.message.make_response(request)
response.set_rcode(dns.rcode.REFUSED)
return response

89
designate/mdns/service.py Normal file
View File

@ -0,0 +1,89 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import service
from designate.mdns import handler
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.Service):
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
# Create an instance of the RequestHandler class
self.handler = handler.RequestHandler()
# Bind to the TCP port
LOG.info('Opening TCP Listening Socket')
self._sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._sock_tcp.bind((CONF['service:mdns'].host,
CONF['service:mdns'].port))
self._sock_tcp.listen(CONF['service:mdns'].tcp_backlog)
# Bind to the UDP port
LOG.info('Opening UDP Listening Socket')
self._sock_udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock_udp.bind((CONF['service:mdns'].host,
CONF['service:mdns'].port))
def start(self):
super(Service, self).start()
self.tg.add_thread(self._handle_tcp)
self.tg.add_thread(self._handle_udp)
def _handle_tcp(self):
while True:
client, addr = self._sock_tcp.accept()
LOG.warn("Handling TCP Request from: %s", addr)
payload = client.recv(65535)
self.tg.add_thread(self._handle, addr, payload, client)
def _handle_udp(self):
while True:
# TODO(kiall): Determine the approperiate default value for
# UDP recvfrom.
payload, addr = self._sock_udp.recvfrom(8192)
LOG.warn("Handling UDP Request from: %s", addr)
self.tg.add_thread(self._handle, addr, payload)
def _handle(self, addr, payload, client=None):
"""
Handle a DNS Query
:param addr: Tuple of the client's (IP, Port)
:param payload: Raw DNS query payload
:param client: Client socket (for TCP only)
"""
response = self.handler.handle(payload)
if client is not None:
# Handle TCP Responses
client.send(response)
client.close()
else:
# Handle UDP Responses
self._sock_udp.sendto(response, addr)

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate import exceptions
from designate.openstack.common import log as logging
from designate.quota.base import Quota
@ -28,7 +29,9 @@ class StorageQuota(Quota):
super(StorageQuota, self).__init__()
if storage_api is None:
storage_api = sapi.StorageAPI()
# TODO(kiall): Should this be tied to central's config?
storage_driver = cfg.CONF['service:central'].storage_driver
storage_api = sapi.StorageAPI(storage_driver)
self.storage_api = storage_api

View File

@ -13,17 +13,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.storage.base import Storage
LOG = logging.getLogger(__name__)
def get_storage():
def get_storage(storage_driver):
""" Return the engine class from the provided engine name """
storage_driver = cfg.CONF['service:central'].storage_driver
cls = Storage.get_driver(storage_driver)
return cls()

View File

@ -21,8 +21,8 @@ from designate.openstack.common import excutils
class StorageAPI(object):
""" Storage API """
def __init__(self):
self.storage = storage.get_storage()
def __init__(self, storage_driver):
self.storage = storage.get_storage(storage_driver)
def _extract_dict_subset(self, d, keys):
return dict([(k, d[k]) for k in keys if k in d])

View File

@ -0,0 +1,20 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests import TestCase
class MdnsTestCase(TestCase):
pass

View File

@ -0,0 +1,66 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from mock import patch
from designate.tests.test_mdns import MdnsTestCase
from designate.mdns import handler
class MdnsRequestHandlerTest(MdnsTestCase):
def setUp(self):
super(MdnsRequestHandlerTest, self).setUp()
self.handler = handler.RequestHandler()
@patch.object(handler.RequestHandler, '_handle_query')
def test_dispatch_opcode_query(self, mock):
# DNS packet with QUERY opcode
payload = ("abbe01200001000000000001076578616d706c6503636f6d0000010001"
"0000291000000000000000")
self.handler.handle(binascii.a2b_hex(payload))
self.assertTrue(mock.called)
@patch.object(handler.RequestHandler, '_handle_unsupported')
def test_dispatch_opcode_iquery(self, mock):
# DNS packet with IQUERY opcode
payload = "60e509000001000000000000076578616d706c6503636f6d0000010001"
self.handler.handle(binascii.a2b_hex(payload))
self.assertTrue(mock.called)
@patch.object(handler.RequestHandler, '_handle_unsupported')
def test_dispatch_opcode_status(self, mock):
# DNS packet with STATUS opcode
payload = "5e0811000001000000000000076578616d706c6503636f6d0000010001"
self.handler.handle(binascii.a2b_hex(payload))
self.assertTrue(mock.called)
@patch.object(handler.RequestHandler, '_handle_unsupported')
def test_dispatch_opcode_notify(self, mock):
# DNS packet with NOTIFY opcode`
payload = "93e121000001000000000000076578616d706c6503636f6d0000010001"
self.handler.handle(binascii.a2b_hex(payload))
self.assertTrue(mock.called)
@patch.object(handler.RequestHandler, '_handle_unsupported')
def test_dispatch_opcode_update(self, mock):
# DNS packet with UPDATE opcode`
payload = "5a7029000001000000000000076578616d706c6503636f6d0000010001"
self.handler.handle(binascii.a2b_hex(payload))
self.assertTrue(mock.called)

View File

@ -0,0 +1,26 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_mdns import MdnsTestCase
class MdnsServiceTest(MdnsTestCase):
def setUp(self):
super(MdnsServiceTest, self).setUp()
self.service = self.start_service('mdns')
def test_stop(self):
# NOTE: Start is already done by the fixture in start_service()
self.service.stop()

View File

@ -29,7 +29,7 @@ class SentinelException(Exception):
class StorageAPITest(TestCase):
def setUp(self):
super(StorageAPITest, self).setUp()
self.storage_api = storage_api.StorageAPI()
self.storage_api = storage_api.StorageAPI('sqlalchemy')
self.storage_mock = mock.Mock()
self.storage_api.storage = self.storage_mock

View File

@ -25,9 +25,4 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase):
def setUp(self):
super(SqlalchemyStorageTest, self).setUp()
self.config(
storage_driver='sqlalchemy',
group='service:central'
)
self.storage = storage.get_storage()
self.storage = storage.get_storage('sqlalchemy')

View File

@ -116,6 +116,15 @@ debug = False
# correspond to a [handler:my_driver] section below or else in the config
#enabled_notification_handlers = nova_fixed
#-----------------------
# mDNS Service
#-----------------------
[service:mdns]
#workers = None
#host = 0.0.0.0
#port = 5354
#tcp_backlog =1 00
##############
## Network API
##############

View File

@ -35,6 +35,7 @@ console_scripts =
designate-api = designate.cmd.api:main
designate-central = designate.cmd.central:main
designate-manage = designate.cmd.manage:main
designate-mdns = designate.cmd.mdns:main
designate-sink = designate.cmd.sink:main
designate.api.v1 =