Add Designate recordsets support
Change-Id: Ica8531c402cb10cee5aae38690ff95ebd80b21f2
This commit is contained in:
parent
983f8e1420
commit
31ac451e12
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- Add support for Designate recordsets resources, with the
|
||||
usual methods (search/list/get/create/update/delete).
|
@ -797,3 +797,28 @@ class ZoneUpdate(task_manager.Task):
|
||||
class ZoneDelete(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.zones.delete(**self.args)
|
||||
|
||||
|
||||
class RecordSetList(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.recordsets.list(**self.args)
|
||||
|
||||
|
||||
class RecordSetGet(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.recordsets.get(**self.args)
|
||||
|
||||
|
||||
class RecordSetCreate(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.recordsets.create(**self.args)
|
||||
|
||||
|
||||
class RecordSetUpdate(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.recordsets.update(**self.args)
|
||||
|
||||
|
||||
class RecordSetDelete(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.designate_client.recordsets.delete(**self.args)
|
||||
|
@ -5423,3 +5423,124 @@ class OpenStackCloud(object):
|
||||
_tasks.ZoneDelete(zone=zone['id']))
|
||||
|
||||
return True
|
||||
|
||||
def list_recordsets(self, zone):
|
||||
"""List all available recordsets.
|
||||
|
||||
:param zone: Name or id of the zone managing the recordset
|
||||
|
||||
:returns: A list of recordsets.
|
||||
|
||||
"""
|
||||
with _utils.shade_exceptions("Error fetching recordsets list"):
|
||||
return self.manager.submitTask(_tasks.RecordSetList(zone=zone))
|
||||
|
||||
def get_recordset(self, zone, name_or_id):
|
||||
"""Get a recordset by name or ID.
|
||||
|
||||
:param zone: Name or ID of the zone managing the recordset
|
||||
:param name_or_id: Name or ID of the recordset
|
||||
|
||||
:returns: A recordset dict or None if no matching recordset is
|
||||
found.
|
||||
|
||||
"""
|
||||
try:
|
||||
return self.manager.submitTask(_tasks.RecordSetGet(
|
||||
zone=zone,
|
||||
recordset=name_or_id))
|
||||
except:
|
||||
return None
|
||||
|
||||
def search_recordsets(self, zone, name_or_id=None, filters=None):
|
||||
recordsets = self.list_recordsets(zone=zone)
|
||||
return _utils._filter_list(recordsets, name_or_id, filters)
|
||||
|
||||
def create_recordset(self, zone, name, recordset_type, records,
|
||||
description=None, ttl=None):
|
||||
"""Create a recordset.
|
||||
|
||||
:param zone: Name or ID of the zone managing the recordset
|
||||
:param name: Name of the recordset
|
||||
:param recordset_type: Type of the recordset
|
||||
:param records: List of the recordset definitions
|
||||
:param description: Description of the recordset
|
||||
:param ttl: TTL value of the recordset
|
||||
|
||||
:returns: a dict representing the created recordset.
|
||||
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
|
||||
"""
|
||||
if self.get_zone(zone) is None:
|
||||
raise OpenStackCloudException(
|
||||
"Zone %s not found." % zone)
|
||||
|
||||
# We capitalize the type in case the user sends in lowercase
|
||||
recordset_type = recordset_type.upper()
|
||||
|
||||
with _utils.shade_exceptions(
|
||||
"Unable to create recordset {name}".format(name=name)):
|
||||
return self.manager.submitTask(_tasks.RecordSetCreate(
|
||||
zone=zone, name=name, type_=recordset_type, records=records,
|
||||
description=description, ttl=ttl))
|
||||
|
||||
@_utils.valid_kwargs('email', 'description', 'ttl', 'masters')
|
||||
def update_recordset(self, zone, name_or_id, **kwargs):
|
||||
"""Update a recordset.
|
||||
|
||||
:param zone: Name or id of the zone managing the recordset
|
||||
:param name_or_id: Name or ID of the recordset being updated.
|
||||
:param records: List of the recordset definitions
|
||||
:param description: Description of the recordset
|
||||
:param ttl: TTL (Time to live) value in seconds of the recordset
|
||||
|
||||
:returns: a dict representing the updated recordset.
|
||||
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
zone_obj = self.get_zone(zone)
|
||||
if zone_obj is None:
|
||||
raise OpenStackCloudException(
|
||||
"Zone %s not found." % zone)
|
||||
|
||||
recordset_obj = self.get_recordset(zone, name_or_id)
|
||||
if recordset_obj is None:
|
||||
raise OpenStackCloudException(
|
||||
"Recordset %s not found." % name_or_id)
|
||||
|
||||
with _utils.shade_exceptions(
|
||||
"Error updating recordset {0}".format(name_or_id)):
|
||||
new_recordset = self.manager.submitTask(
|
||||
_tasks.RecordSetUpdate(
|
||||
zone=zone, recordset=name_or_id, values=kwargs))
|
||||
|
||||
return new_recordset
|
||||
|
||||
def delete_recordset(self, zone, name_or_id):
|
||||
"""Delete a recordset.
|
||||
|
||||
:param zone: Name or ID of the zone managing the recordset.
|
||||
:param name_or_id: Name or ID of the recordset being deleted.
|
||||
|
||||
:returns: True if delete succeeded, False otherwise.
|
||||
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
|
||||
zone = self.get_zone(zone)
|
||||
if zone is None:
|
||||
self.log.debug("Zone %s not found for deleting" % zone)
|
||||
return False
|
||||
|
||||
recordset = self.get_recordset(zone['id'], name_or_id)
|
||||
if recordset is None:
|
||||
self.log.debug("Recordset %s not found for deleting" % name_or_id)
|
||||
return False
|
||||
|
||||
with _utils.shade_exceptions(
|
||||
"Error deleting recordset {0}".format(name_or_id)):
|
||||
self.manager.submitTask(
|
||||
_tasks.RecordSetDelete(zone=zone['id'], recordset=name_or_id))
|
||||
|
||||
return True
|
||||
|
@ -260,3 +260,15 @@ class FakeZone(object):
|
||||
self.description = description
|
||||
self.ttl = ttl
|
||||
self.masters = masters
|
||||
|
||||
|
||||
class FakeRecordset(object):
|
||||
def __init__(self, zone, id, name, type_, description,
|
||||
ttl, records):
|
||||
self.zone = zone
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.type_ = type_
|
||||
self.description = description
|
||||
self.ttl = ttl
|
||||
self.records = records
|
||||
|
95
shade/tests/functional/test_recordset.py
Normal file
95
shade/tests/functional/test_recordset.py
Normal file
@ -0,0 +1,95 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
test_recordset
|
||||
----------------------------------
|
||||
|
||||
Functional tests for `shade` recordset methods.
|
||||
"""
|
||||
|
||||
from testtools import content
|
||||
|
||||
from shade.tests.functional import base
|
||||
|
||||
|
||||
class TestRecordset(base.BaseFunctionalTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRecordset, self).setUp()
|
||||
if not self.demo_cloud.has_service('dns'):
|
||||
self.skipTest('dns service not supported by cloud')
|
||||
|
||||
def test_recordsets(self):
|
||||
'''Test DNS recordsets functionality'''
|
||||
zone = 'example2.net.'
|
||||
email = 'test@example2.net'
|
||||
name = 'www'
|
||||
type_ = 'a'
|
||||
description = 'Test recordset'
|
||||
ttl = 3600
|
||||
records = ['192.168.1.1']
|
||||
|
||||
self.addDetail('zone', content.text_content(zone))
|
||||
self.addDetail('recordset', content.text_content(name))
|
||||
self.addCleanup(self.cleanup, zone, name)
|
||||
|
||||
# Create a zone to hold the tested recordset
|
||||
zone_obj = self.demo_cloud.create_zone(name=zone, email=email)
|
||||
|
||||
# Test we can create a recordset and we get it returned
|
||||
created_recordset = self.demo_cloud.create_recordset(zone, name, type_,
|
||||
records,
|
||||
description, ttl)
|
||||
self.assertEquals(created_recordset['zone_id'], zone_obj['id'])
|
||||
self.assertEquals(created_recordset['name'], name + '.' + zone)
|
||||
self.assertEquals(created_recordset['type'], type_.upper())
|
||||
self.assertEquals(created_recordset['records'], records)
|
||||
self.assertEquals(created_recordset['description'], description)
|
||||
self.assertEquals(created_recordset['ttl'], ttl)
|
||||
|
||||
# Test that we can list recordsets
|
||||
recordsets = self.demo_cloud.list_recordsets(zone)
|
||||
self.assertIsNotNone(recordsets)
|
||||
|
||||
# Test we get the same recordset with the get_recordset method
|
||||
get_recordset = self.demo_cloud.get_recordset(zone,
|
||||
created_recordset['id'])
|
||||
self.assertEquals(get_recordset['id'], created_recordset['id'])
|
||||
|
||||
# Test the get method also works by name
|
||||
get_recordset = self.demo_cloud.get_recordset(zone, name + '.' + zone)
|
||||
self.assertEquals(get_recordset['id'], created_recordset['id'])
|
||||
|
||||
# Test we can update a field on the recordset and only that field
|
||||
# is updated
|
||||
updated_recordset = self.demo_cloud.update_recordset(zone_obj['id'],
|
||||
name + '.' + zone,
|
||||
ttl=7200)
|
||||
self.assertEquals(updated_recordset['id'], created_recordset['id'])
|
||||
self.assertEquals(updated_recordset['name'], name + '.' + zone)
|
||||
self.assertEquals(updated_recordset['type'], type_.upper())
|
||||
self.assertEquals(updated_recordset['records'], records)
|
||||
self.assertEquals(updated_recordset['description'], description)
|
||||
self.assertEquals(updated_recordset['ttl'], 7200)
|
||||
|
||||
# Test we can delete and get True returned
|
||||
deleted_recordset = self.demo_cloud.delete_recordset(
|
||||
zone, name + '.' + zone)
|
||||
self.assertTrue(deleted_recordset)
|
||||
|
||||
def cleanup(self, zone_name, recordset_name):
|
||||
self.demo_cloud.delete_recordset(
|
||||
zone_name, recordset_name + '.' + zone_name)
|
||||
self.demo_cloud.delete_zone(zone_name)
|
114
shade/tests/unit/test_recordset.py
Normal file
114
shade/tests/unit/test_recordset.py
Normal file
@ -0,0 +1,114 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
import shade
|
||||
from shade.tests.unit import base
|
||||
from shade.tests import fakes
|
||||
|
||||
zone_obj = fakes.FakeZone(
|
||||
id='1',
|
||||
name='example.net.',
|
||||
type_='PRIMARY',
|
||||
email='test@example.net',
|
||||
description='Example zone',
|
||||
ttl=3600,
|
||||
masters=None
|
||||
)
|
||||
|
||||
recordset_obj = fakes.FakeRecordset(
|
||||
zone='1',
|
||||
id='1',
|
||||
name='www.example.net.',
|
||||
type_='A',
|
||||
description='Example zone',
|
||||
ttl=3600,
|
||||
records=['192.168.1.1']
|
||||
)
|
||||
|
||||
|
||||
class TestRecordset(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRecordset, self).setUp()
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_create_recordset(self, mock_designate):
|
||||
mock_designate.zones.list.return_value = [zone_obj]
|
||||
self.cloud.create_recordset(zone=recordset_obj.zone,
|
||||
name=recordset_obj.name,
|
||||
recordset_type=recordset_obj.type_,
|
||||
records=recordset_obj.records,
|
||||
description=recordset_obj.description,
|
||||
ttl=recordset_obj.ttl)
|
||||
mock_designate.recordsets.create.assert_called_once_with(
|
||||
zone=recordset_obj.zone, name=recordset_obj.name,
|
||||
type_=recordset_obj.type_.upper(),
|
||||
records=recordset_obj.records,
|
||||
description=recordset_obj.description,
|
||||
ttl=recordset_obj.ttl
|
||||
)
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_create_recordset_exception(self, mock_designate):
|
||||
mock_designate.zones.list.return_value = [zone_obj]
|
||||
mock_designate.recordsets.create.side_effect = Exception()
|
||||
with testtools.ExpectedException(
|
||||
shade.OpenStackCloudException,
|
||||
"Unable to create recordset www2.example.net."
|
||||
):
|
||||
self.cloud.create_recordset('1', 'www2.example.net.',
|
||||
'a', ['192.168.1.2'])
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_update_recordset(self, mock_designate):
|
||||
new_ttl = 7200
|
||||
mock_designate.zones.list.return_value = [zone_obj]
|
||||
mock_designate.recordsets.list.return_value = [recordset_obj]
|
||||
self.cloud.update_recordset('1', '1', ttl=new_ttl)
|
||||
mock_designate.recordsets.update.assert_called_once_with(
|
||||
zone='1', recordset='1', values={'ttl': new_ttl}
|
||||
)
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_delete_recordset(self, mock_designate):
|
||||
mock_designate.zones.list.return_value = [zone_obj]
|
||||
mock_designate.recordsets.list.return_value = [recordset_obj]
|
||||
self.cloud.delete_recordset('1', '1')
|
||||
mock_designate.recordsets.delete.assert_called_once_with(
|
||||
zone='1', recordset='1'
|
||||
)
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_get_recordset_by_id(self, mock_designate):
|
||||
mock_designate.recordsets.get.return_value = recordset_obj
|
||||
recordset = self.cloud.get_recordset('1', '1')
|
||||
self.assertTrue(mock_designate.recordsets.get.called)
|
||||
self.assertEqual(recordset['id'], '1')
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_get_recordset_by_name(self, mock_designate):
|
||||
mock_designate.recordsets.get.return_value = recordset_obj
|
||||
recordset = self.cloud.get_recordset('1', 'www.example.net.')
|
||||
self.assertTrue(mock_designate.recordsets.get.called)
|
||||
self.assertEqual(recordset['name'], 'www.example.net.')
|
||||
|
||||
@mock.patch.object(shade.OpenStackCloud, 'designate_client')
|
||||
def test_get_recordset_not_found_returns_false(self, mock_designate):
|
||||
mock_designate.recordsets.get.return_value = None
|
||||
recordset = self.cloud.get_recordset('1', 'www.nonexistingrecord.net.')
|
||||
self.assertFalse(recordset)
|
Loading…
Reference in New Issue
Block a user