Fix TsigKeyring issues with dnspython 2.x

- Fixed issues in TsigKeyring.
- Fixed tsgi issues in mdns handler.
- Fixed invalid secret used in tests.
- Added additional test coverage.
- Re-enabled previously broken test.

Additionally modified unit test to provide a storage provider,
as this does not exist in the next release.

Closes-Bug: #1982252
Change-Id: I04c104cfc9ee2f03d0c5adca3c80bbfff20afb70
(cherry picked from commit 38c591eaa1)
This commit is contained in:
Erik Olof Gunnar Andersson 2023-06-27 22:06:41 +02:00
parent 6e7b7e898e
commit a99367cff1
5 changed files with 101 additions and 16 deletions

View File

@ -167,10 +167,11 @@ class TsigInfoMiddleware(DNSMiddleware):
return None
class TsigKeyring(object):
class TsigKeyring(dict):
"""Implements the DNSPython KeyRing API, backed by the Designate DB"""
def __init__(self, storage):
super(TsigKeyring, self).__init__()
self.storage = storage
def __getitem__(self, key):

View File

@ -414,19 +414,22 @@ class RequestHandler(object):
if request.had_tsig:
# Make the space we reserved for TSIG available for use
renderer.max_size += TSIG_RRSIZE
if multi_messages:
# The first message context will be None then the
# context for the prev message is used for the next
multi_messages_context = renderer.add_multi_tsig(
multi_messages_context, request.keyname,
request.keyring[request.keyname], request.fudge,
request.original_id, request.tsig_error,
request.other_data, request.mac, request.keyalgorithm)
request.keyring.secret, 300,
request.id, request.tsig_error,
b'', request.mac, request.keyalgorithm
)
else:
renderer.add_tsig(request.keyname,
request.keyring[request.keyname], request.fudge,
request.original_id, request.tsig_error,
request.other_data, request.mac, request.keyalgorithm)
renderer.add_tsig(
request.keyname, request.keyring.secret, 300,
request.id, request.tsig_error,
b'', request.mac, request.keyalgorithm
)
return renderer, multi_messages_context
@staticmethod

View File

@ -117,7 +117,7 @@ class TestCase(base.BaseTestCase):
tsigkey_fixtures = [{
'name': 'test-key-one',
'algorithm': 'hmac-md5',
'secret': 'SomeSecretKey',
'secret': 'SomeOldSecretKey',
'scope': 'POOL',
'resource_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
}, {

View File

@ -17,11 +17,14 @@ from unittest import mock
import dns
import dns.query
import dns.tsigkeyring
from oslo_config import cfg
from designate import dnsutils
from designate import exceptions
from designate.mdns import handler
from designate import objects
from designate import storage
import designate.tests
CONF = cfg.CONF
@ -81,6 +84,82 @@ SAMPLES = {
}
class TestSerializationMiddleware(designate.tests.TestCase):
def setUp(self):
super(TestSerializationMiddleware, self).setUp()
storage_driver = CONF['service:central'].storage_driver
self.storage = storage.get_storage(storage_driver)
self.tg = mock.Mock()
def test_with_tsigkeyring(self):
self.create_tsigkey(fixture=1)
query = dns.message.make_query(
'example.com.', dns.rdatatype.SOA,
)
query.use_tsig(dns.tsigkeyring.from_text(
{'test-key-two': 'AnotherSecretKey'})
)
payload = query.to_wire()
application = handler.RequestHandler(self.storage, self.tg)
application = dnsutils.SerializationMiddleware(
application, dnsutils.TsigKeyring(self.storage)
)
self.assertTrue(next(application(
{'payload': payload, 'addr': ['192.0.2.1', 5353]}
)))
def test_without_tsigkeyring(self):
query = dns.message.make_query(
'example.com.', dns.rdatatype.SOA,
)
payload = query.to_wire()
application = handler.RequestHandler(self.storage, self.tg)
application = dnsutils.SerializationMiddleware(
application, dnsutils.TsigKeyring(self.storage)
)
self.assertTrue(next(application(
{'payload': payload, 'addr': ['192.0.2.1', 5353]}
)))
class TestTsigUtils(designate.tests.TestCase):
def setUp(self):
super(TestTsigUtils, self).setUp()
storage_driver = CONF['service:central'].storage_driver
self.storage = storage.get_storage(storage_driver)
self.tsig_keyring = dnsutils.TsigKeyring(self.storage)
def test_tsig_keyring(self):
expected_result = b'J\x89\x9e:WRy\xca\xde\xb4\xa7\xb2'
self.create_tsigkey(fixture=0)
query = dns.message.make_query(
'example.com.', dns.rdatatype.SOA,
)
query.use_tsig(dns.tsigkeyring.from_text(
{'test-key-one': 'SomeOldSecretKey'})
)
self.assertEqual(expected_result, self.tsig_keyring.get(query.keyname))
self.assertEqual(expected_result, self.tsig_keyring[query.keyname])
def test_tsig_keyring_not_found(self):
query = dns.message.make_query(
'example.com.', dns.rdatatype.SOA,
)
query.use_tsig(dns.tsigkeyring.from_text(
{'test-key-one': 'SomeOldSecretKey'})
)
self.assertIsNone(self.tsig_keyring.get(query.keyname))
class TestUtils(designate.tests.TestCase):
def setUp(self):
super(TestUtils, self).setUp()

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
from unittest import expectedFailure
from unittest import mock
import dns
@ -22,6 +21,7 @@ import dns.rdataclass
import dns.rdatatype
import dns.resolver
import dns.rrset
import dns.tsigkeyring
from oslo_config import cfg
import testtools
@ -638,7 +638,6 @@ class MdnsRequestHandlerTest(MdnsTestCase):
self.assertEqual(
expected_response[1], binascii.b2a_hex(response_two))
@expectedFailure
@mock.patch.object(dns.renderer.Renderer, 'add_multi_tsig')
def test_dispatch_opcode_query_AXFR_multiple_messages_with_tsig(self,
mock_multi_tsig):
@ -698,11 +697,14 @@ class MdnsRequestHandlerTest(MdnsTestCase):
side_effect=_find_recordsets_axfr):
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
request.keyring = {request.keyname: ''}
request.had_tsig = True
args = [request.keyname, request.keyring[request.keyname],
request.fudge, request.original_id, request.tsig_error,
request.other_data, request.mac, request.keyalgorithm]
request.use_tsig(dns.tsigkeyring.from_text(
{'test-key-two': 'AnotherSecretKey'})
)
args = [request.keyname, request.keyring.secret,
300, request.id, request.tsig_error,
b'', request.mac, request.keyalgorithm]
response_generator = self.handler(request)
# Validate the first response
response_one = next(response_generator).get_wire()