Add utils for managing DNS operations

Currently, there is no support in os_win for
managing Microsoft DNS Server related operations.

This patch addresses this issue by adding support
for DNS operations in os_win.

Change-Id: I029747555a58e0a8e362b65e6c0c470cf2774e42
Implements: blueprint os-win-dnsutils
This commit is contained in:
Alin Balutoiu 2016-06-09 21:09:39 +03:00
parent 3d48e1de3f
commit efb482181d
8 changed files with 464 additions and 0 deletions

View File

@ -141,3 +141,22 @@ IPV4_DEFAULT = '0.0.0.0'
FSK_COMPUTERNAME = 'ComputerName'
VTPM_SUPPORTED_OS = ['windows']
# DNSUtils constants
DNS_ZONE_TYPE_PRIMARY = 0
DNS_ZONE_TYPE_SECONDARY = 1
DNS_ZONE_TYPE_STUB = 2
DNS_ZONE_TYPE_FORWARD = 3
DNS_ZONE_NO_UPDATES_ALLOWED = 0
DNS_ZONE_SECURE_NONSECURE_UPDATES = 1
DNS_ZONE_SECURE_UPDATES_ONLY = 2
DNS_ZONE_DO_NOT_NOTIFY = 0
DNS_ZONE_NOTIFY_NAME_SERVERS_TAB = 1
DNS_ZONE_NOTIFY_SPECIFIED_SERVERS = 2
DNS_ZONE_TRANSFER_ALLOWED_ANY_HOST = 0
DNS_ZONE_TRANSFER_ALLOWED_NAME_SERVERS = 1
DNS_ZONE_TRANSFER_ALLOWED_SECONDARY_SERVERS = 2
DNS_ZONE_TRANSFER_NOT_ALLOWED = 3

View File

@ -140,3 +140,15 @@ class HyperVRemoteFXException(HyperVException):
class HyperVClusterException(HyperVException):
pass
class DNSException(OSWinException):
pass
class DNSZoneNotFound(NotFound, DNSException):
msg_fmt = _("DNS Zone not found: %(zone_name)s")
class DNSZoneAlreadyExists(DNSException):
msg_fmt = _("DNS Zone already exists: %(zone_name)s")

View File

@ -26,6 +26,7 @@ from os_win.utils.compute import clusterutils
from os_win.utils.compute import livemigrationutils
from os_win.utils.compute import rdpconsoleutils
from os_win.utils.compute import vmutils
from os_win.utils.dns import dnsutils
from os_win.utils import hostutils
from os_win.utils.network import networkutils
from os_win.utils import pathutils
@ -136,3 +137,8 @@ class TestHyperVUtilsFactory(test_base.OsWinBaseTestCase):
self._check_get_class(
expected_class=clusterutils.ClusterUtils,
class_type='clusterutils')
def test_get_dnsutils(self):
self._check_get_class(
expected_class=dnsutils.DNSUtils,
class_type='dnsutils')

View File

View File

@ -0,0 +1,245 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from os_win import constants
from os_win import exceptions
from os_win.tests import test_base
from os_win.utils.dns import dnsutils
class DNSUtilsTestCase(test_base.OsWinBaseTestCase):
"""Unit tests for the Hyper-V DNSUtils class."""
def setUp(self):
super(DNSUtilsTestCase, self).setUp()
self._dnsutils = dnsutils.DNSUtils()
self._dnsutils._dns_manager_attr = mock.MagicMock()
@mock.patch.object(dnsutils.DNSUtils, '_get_wmi_obj')
def test_dns_manager(self, mock_get_wmi_obj):
self._dnsutils._dns_manager_attr = None
self.assertEqual(mock_get_wmi_obj.return_value,
self._dnsutils._dns_manager)
mock_get_wmi_obj.assert_called_once_with(
self._dnsutils._DNS_NAMESPACE % self._dnsutils._host)
@mock.patch.object(dnsutils.DNSUtils, '_get_wmi_obj')
def test_dns_manager_fail(self, mock_get_wmi_obj):
self._dnsutils._dns_manager_attr = None
expected_exception = exceptions.DNSException
mock_get_wmi_obj.side_effect = expected_exception
self.assertRaises(expected_exception,
lambda: self._dnsutils._dns_manager)
mock_get_wmi_obj.assert_called_once_with(
self._dnsutils._DNS_NAMESPACE % self._dnsutils._host)
def test_get_zone(self):
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.return_value = [mock.sentinel.zone]
zone_found = self._dnsutils._get_zone(mock.sentinel.zone_name)
zone_manager.assert_called_once_with(Name=mock.sentinel.zone_name)
self.assertEqual(mock.sentinel.zone, zone_found)
def test_get_zone_ignore_missing(self):
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.return_value = []
zone_found = self._dnsutils._get_zone(mock.sentinel.zone_name)
zone_manager.assert_called_once_with(Name=mock.sentinel.zone_name)
self.assertIsNone(zone_found)
def test_get_zone_missing(self):
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.return_value = []
self.assertRaises(exceptions.DNSZoneNotFound,
self._dnsutils._get_zone,
mock.sentinel.zone_name,
ignore_missing=False)
zone_manager.assert_called_once_with(Name=mock.sentinel.zone_name)
def test_zone_list(self):
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.return_value = [mock.Mock(Name=mock.sentinel.fake_name1),
mock.Mock(Name=mock.sentinel.fake_name2)]
zone_list = self._dnsutils.zone_list()
expected_zone_list = [mock.sentinel.fake_name1,
mock.sentinel.fake_name2]
self.assertEqual(expected_zone_list, zone_list)
zone_manager.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_exists(self, mock_get_zone):
zone_already_exists = self._dnsutils.zone_exists(
mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(mock.sentinel.zone_name)
self.assertTrue(zone_already_exists)
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_exists_false(self, mock_get_zone):
mock_get_zone.return_value = None
zone_already_exists = self._dnsutils.zone_exists(
mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(mock.sentinel.zone_name)
self.assertFalse(zone_already_exists)
@mock.patch.object(dnsutils.DNSUtils, 'zone_exists')
def test_zone_create(self, mock_zone_exists):
mock_zone_exists.return_value = False
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.CreateZone.return_value = (mock.sentinel.zone_path,)
zone_path = self._dnsutils.zone_create(
zone_name=mock.sentinel.zone_name,
zone_type=mock.sentinel.zone_type,
ds_integrated=mock.sentinel.ds_integrated,
data_file_name=mock.sentinel.data_file_name,
ip_addrs=mock.sentinel.ip_addrs,
admin_email_name=mock.sentinel.admin_email_name)
zone_manager.CreateZone.assert_called_once_with(
ZoneName=mock.sentinel.zone_name,
ZoneType=mock.sentinel.zone_type,
DsIntegrated=mock.sentinel.ds_integrated,
DataFileName=mock.sentinel.data_file_name,
IpAddr=mock.sentinel.ip_addrs,
AdminEmailname=mock.sentinel.admin_email_name)
mock_zone_exists.assert_called_once_with(mock.sentinel.zone_name)
self.assertEqual(mock.sentinel.zone_path, zone_path)
@mock.patch.object(dnsutils.DNSUtils, 'zone_exists')
def test_zone_create_existing_zone(self, mock_zone_exists):
mock_zone_exists.return_value = True
zone_manager = self._dnsutils._dns_manager.MicrosoftDNS_Zone
zone_manager.CreateZone.return_value = (mock.sentinel.zone_path,)
self.assertRaises(exceptions.DNSZoneAlreadyExists,
self._dnsutils.zone_create,
zone_name=mock.sentinel.zone_name,
zone_type=mock.sentinel.zone_type,
ds_integrated=mock.sentinel.ds_integrated)
mock_zone_exists.assert_called_once_with(mock.sentinel.zone_name)
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_delete(self, mock_get_zone):
self._dnsutils.zone_delete(mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(mock.sentinel.zone_name)
mock_get_zone.return_value.Delete_.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_modify(self, mock_get_zone):
mock_zone = mock.MagicMock(
AllowUpdate=mock.sentinel.allowupdate,
DisableWINSRecordReplication=mock.sentinel.disablewins,
Notify=mock.sentinel.notify,
SecureSecondaries=mock.sentinel.securesecondaries)
mock_get_zone.return_value = mock_zone
self._dnsutils.zone_modify(
mock.sentinel.zone_name,
allow_update=None,
disable_wins=mock.sentinel.disable_wins,
notify=None,
reverse=mock.sentinel.reverse,
secure_secondaries=None)
self.assertEqual(mock.sentinel.allowupdate, mock_zone.AllowUpdate)
self.assertEqual(mock.sentinel.disable_wins,
mock_zone.DisableWINSRecordReplication)
self.assertEqual(mock.sentinel.notify, mock_zone.Notify)
self.assertEqual(mock.sentinel.reverse,
mock_zone.Reverse)
self.assertEqual(mock.sentinel.securesecondaries,
mock_zone.SecureSecondaries)
mock_zone.put.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_update_force_refresh(self, mock_get_zone):
mock_zone = mock.MagicMock(DsIntegrated=False,
ZoneType=constants.DNS_ZONE_TYPE_SECONDARY)
mock_get_zone.return_value = mock_zone
self._dnsutils.zone_update(mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(
mock.sentinel.zone_name,
ignore_missing=False)
mock_zone.ForceRefresh.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_update_from_ds(self, mock_get_zone):
mock_zone = mock.MagicMock(DsIntegrated=True,
ZoneType=constants.DNS_ZONE_TYPE_PRIMARY)
mock_get_zone.return_value = mock_zone
self._dnsutils.zone_update(mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(
mock.sentinel.zone_name,
ignore_missing=False)
mock_zone.UpdateFromDS.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, '_get_zone')
def test_zone_update_reload_zone(self, mock_get_zone):
mock_zone = mock.MagicMock(DsIntegrated=False,
ZoneType=constants.DNS_ZONE_TYPE_PRIMARY)
mock_get_zone.return_value = mock_zone
self._dnsutils.zone_update(mock.sentinel.zone_name)
mock_get_zone.assert_called_once_with(
mock.sentinel.zone_name,
ignore_missing=False)
mock_zone.ReloadZone.assert_called_once_with()
@mock.patch.object(dnsutils.DNSUtils, 'zone_exists')
def test_get_zone_serial(self, mock_zone_exists):
mock_zone_exists.return_value = True
fake_serial_number = 1
msdns_soatype = self._dnsutils._dns_manager.MicrosoftDNS_SOAType
msdns_soatype.return_value = [
mock.Mock(SerialNumber=fake_serial_number)]
serial_number = self._dnsutils.get_zone_serial(mock.sentinel.zone_name)
expected_serial_number = fake_serial_number
self.assertEqual(expected_serial_number, serial_number)
msdns_soatype.assert_called_once_with(
ContainerName=mock.sentinel.zone_name)
mock_zone_exists.assert_called_once_with(mock.sentinel.zone_name)
@mock.patch.object(dnsutils.DNSUtils, 'zone_exists')
def test_get_zone_serial_zone_not_found(self, mock_zone_exists):
mock_zone_exists.return_value = False
serial_number = self._dnsutils.get_zone_serial(mock.sentinel.zone_name)
self.assertIsNone(serial_number)
mock_zone_exists.assert_called_once_with(mock.sentinel.zone_name)

View File

View File

@ -0,0 +1,173 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from os_win._i18n import _
from os_win import constants
from os_win import exceptions
from os_win.utils import baseutils
LOG = logging.getLogger(__name__)
class DNSUtils(baseutils.BaseUtils):
_DNS_NAMESPACE = '//%s/root/MicrosoftDNS'
def __init__(self, host='.'):
self._dns_manager_attr = None
self._host = host
@property
def _dns_manager(self):
if not self._dns_manager_attr:
try:
namespace = self._DNS_NAMESPACE % self._host
self._dns_manager_attr = self._get_wmi_obj(namespace)
except Exception:
raise exceptions.DNSException(
_("Namespace %(namespace)s not found. Make sure "
"DNS Server feature is installed.") %
{'namespace': namespace})
return self._dns_manager_attr
def _get_zone(self, zone_name, ignore_missing=True):
zones = self._dns_manager.MicrosoftDNS_Zone(Name=zone_name)
if zones:
return zones[0]
if not ignore_missing:
raise exceptions.DNSZoneNotFound(zone_name=zone_name)
def zone_list(self):
"""Returns the current list of DNS Zones.
"""
zones = self._dns_manager.MicrosoftDNS_Zone()
return [x.Name for x in zones]
def zone_exists(self, zone_name):
return self._get_zone(zone_name) is not None
def zone_create(self, zone_name, zone_type, ds_integrated,
data_file_name=None, ip_addrs=None,
admin_email_name=None):
"""Creates a DNS Zone and returns the path to the associated object.
:param zone_name: string representing the name of the zone.
:param zone_type: type of zone
0 = Primary zone
1 = Secondary zone, MUST include at least one master IP
2 = Stub zone, MUST include at least one master IP
3 = Zone forwarder, MUST include at least one master IP
:param ds_integrated: Only Primary zones cand be stored in AD
True = the zone data is stored in the Active Directory
False = the data zone is stored in files
:param data_file_name(Optional): name of the data file associated
with the zone.
:param ip_addrs(Optional): IP addresses of the master DNS servers
for this zone. Parameter type MUST be list
:param admin_email_name(Optional): email address of the administrator
responsible for the zone.
"""
LOG.debug("Creating DNS Zone '%s'" % zone_name)
if self.zone_exists(zone_name):
raise exceptions.DNSZoneAlreadyExists(zone_name=zone_name)
dns_zone_manager = self._dns_manager.MicrosoftDNS_Zone
(zone_path,) = dns_zone_manager.CreateZone(
ZoneName=zone_name,
ZoneType=zone_type,
DsIntegrated=ds_integrated,
DataFileName=data_file_name,
IpAddr=ip_addrs,
AdminEmailname=admin_email_name)
return zone_path
def zone_delete(self, zone_name):
LOG.debug("Deleting DNS Zone '%s'" % zone_name)
zone_to_be_deleted = self._get_zone(zone_name)
if zone_to_be_deleted:
zone_to_be_deleted.Delete_()
def zone_modify(self, zone_name, allow_update=None, disable_wins=None,
notify=None, reverse=None, secure_secondaries=None):
"""Modifies properties of an existing zone. If any parameter is None,
then that parameter will be skipped and will not be taken into
consideration.
:param zone_name: string representing the name of the zone.
:param allow_update:
0 = No updates allowed.
1 = Zone accepts both secure and nonsecure updates.
2 = Zone accepts secure updates only.
:param disable_wins: Indicates whether the WINS record is replicated.
If set to TRUE, WINS record replication is disabled.
:param notify:
0 = Do not notify secondaries
1 = Notify Servers listed on the Name Servers Tab
2 = Notify the specified servers
:param reverse: Indicates whether the Zone is reverse (TRUE)
or forward (FALSE).
:param securese_condaries:
0 = Allowed to Any host
1 = Only to the Servers listed on the Name Servers tab
2 = To the following servers (destination servers IP addresses
are specified in SecondaryServers value)
3 = Zone tranfers not allowed
"""
zone = self._get_zone(zone_name, ignore_missing=False)
if allow_update is not None:
zone.AllowUpdate = allow_update
if disable_wins is not None:
zone.DisableWINSRecordReplication = disable_wins
if notify is not None:
zone.Notify = notify
if reverse is not None:
zone.Reverse = reverse
if secure_secondaries is not None:
zone.SecureSecondaries = secure_secondaries
zone.put()
def zone_update(self, zone_name):
LOG.debug("Updating DNS Zone '%s'" % zone_name)
zone = self._get_zone(zone_name, ignore_missing=False)
if (zone.DsIntegrated and
zone.ZoneType == constants.DNS_ZONE_TYPE_PRIMARY):
zone.UpdateFromDS()
elif zone.ZoneType in [constants.DNS_ZONE_TYPE_SECONDARY,
constants.DNS_ZONE_TYPE_STUB]:
zone.ForceRefresh()
elif zone.ZoneType in [constants.DNS_ZONE_TYPE_PRIMARY,
constants.DNS_ZONE_TYPE_FORWARD]:
zone.ReloadZone()
def get_zone_serial(self, zone_name):
# Performing a manual check to make sure the zone exists before
# trying to retrieve the MicrosoftDNS_SOAType object. Otherwise,
# the query for MicrosoftDNS_SOAType will fail with "Generic Failure"
if not self.zone_exists(zone_name):
# Return None if zone was not found
return None
zone_soatype = self._dns_manager.MicrosoftDNS_SOAType(
ContainerName=zone_name)
# Serial number of the SOA record
SOA = zone_soatype[0].SerialNumber
return int(SOA)

View File

@ -134,6 +134,11 @@ utils_map = {
'min_version': 6.2,
'max_version': None,
'path': 'os_win.utils.compute.clusterutils.ClusterUtils'}},
'dnsutils': {
'DNSUtils': {
'min_version': 6.2,
'max_version': None,
'path': 'os_win.utils.dns.dnsutils.DNSUtils'}}
}
@ -224,3 +229,7 @@ def get_diskutils():
def get_clusterutils():
return _get_class(class_type='clusterutils')
def get_dnsutils():
return _get_class(class_type='dnsutils')