Add tsig key support to python-designateclient

This fix adds support for the designate tsig keys api to
python-designateclient.

It will add tsigkey related crud commands to the openstackclient.

Change-Id: I84336c3aca85ca62771fd2115481eda32ee980d2
Closes-Bug: #1702506
This commit is contained in:
Rudolf Vriend 2017-07-07 14:00:17 +02:00
parent 3bb401758c
commit b741282eaf
9 changed files with 479 additions and 1 deletions

View File

@ -273,8 +273,44 @@ class TLDCommands(object):
return self.parsed_cmd(cmd, FieldValueModel, *args, **kwargs)
class BlacklistCommands(object):
class TSIGKeyCommands(object):
def tsigkey_list(self, *args, **kwargs):
return self.parsed_cmd('tsigkey list', ListModel, *args, **kwargs)
def tsigkey_show(self, id, *args, **kwargs):
return self.parsed_cmd('tsigkey show {0}'.format(id), FieldValueModel,
*args, **kwargs)
def tsigkey_delete(self, id, *args, **kwargs):
return self.parsed_cmd('tsigkey delete {0}'.format(id), *args,
**kwargs)
def tsigkey_create(self, name, algorithm, secret, scope, resource_id,
*args, **kwargs):
options_str = build_option_string({
'--name': name,
'--algorithm': algorithm,
'--secret': secret,
'--scope': scope,
'--resource-id': resource_id,
})
cmd = 'tsigkey create {0}'.format(options_str)
return self.parsed_cmd(cmd, FieldValueModel, *args, **kwargs)
def tsigkey_set(self, id, name=None, algorithm=None, secret=None,
scope=None,
*args, **kwargs):
options_str = build_option_string({
'--name': name,
'--algorithm': algorithm,
'--secret': secret,
'--scope': scope,
})
cmd = 'tsigkey set {0} {1}'.format(id, options_str)
return self.parsed_cmd(cmd, FieldValueModel, *args, **kwargs)
class BlacklistCommands(object):
def zone_blacklist_list(self, *args, **kwargs):
cmd = 'zone blacklist list'
return self.parsed_cmd(cmd, ListModel, *args, **kwargs)

View File

@ -25,6 +25,14 @@ def random_tld(name='testtld'):
return "{0}{1}".format(name, random_digits())
def random_tsigkey_name(name='testtsig'):
return "{0}{1}".format(name, random_digits())
def random_tsigkey_secret(name='test-secret'):
return "{0}-{1}".format(name, random_digits(254 - len(name)))
def random_zone_name(name='testdomain', tld='com'):
return "{0}{1}.{2}.".format(name, random_digits(), tld)

View File

@ -193,6 +193,25 @@ class TLDFixture(BaseFixture):
pass
class TSIGKeyFixture(BaseFixture):
"""See DesignateCLI.tsigkey_create for __init__ args"""
def __init__(self, user='admin', *args, **kwargs):
super(TSIGKeyFixture, self).__init__(user=user, *args, **kwargs)
def _setUp(self):
super(TSIGKeyFixture, self)._setUp()
self.tsigkey = self.client.tsigkey_create(*self.args, **self.kwargs)
self.addCleanup(self.cleanup_tsigkey(self.client, self.tsigkey.id))
@classmethod
def cleanup_tsigkey(cls, client, tsigkey_id):
try:
client.tsigkey_delete(tsigkey_id)
except CommandFailed:
pass
class BlacklistFixture(BaseFixture):
"""See DesignateCLI.zone_blacklist_create for __init__ args"""

View File

@ -0,0 +1,90 @@
"""
Copyright 2017 SAP SE
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from tempest.lib.exceptions import CommandFailed
from designateclient.functionaltests.base import BaseDesignateTest
from designateclient.functionaltests.datagen import random_tsigkey_name
from designateclient.functionaltests.datagen import random_tsigkey_secret
from designateclient.functionaltests.datagen import random_zone_name
from designateclient.functionaltests.v2.fixtures import TSIGKeyFixture
from designateclient.functionaltests.v2.fixtures import ZoneFixture
class TestTSIGKey(BaseDesignateTest):
def setUp(self):
super(TestTSIGKey, self).setUp()
self.ensure_tsigkey_exists('com')
self.zone = self.useFixture(ZoneFixture(
name=random_zone_name(),
email='test@example.com',
)).zone
tsig_name = random_tsigkey_name()
tsig_algorithm = "hmac-sha256"
tsig_secret = random_tsigkey_secret()
tsig_scope = 'ZONE'
self.tsig = self.useFixture(TSIGKeyFixture(
name=tsig_name,
algorithm=tsig_algorithm,
secret=tsig_secret,
scope=tsig_scope,
resource_id=self.zone.id
)).tsig
self.assertEqual(self.tsig.name, tsig_name)
self.assertEqual(self.tsig.algorithm, tsig_algorithm)
self.assertEqual(self.tsig.secret, tsig_secret)
self.assertEqual(self.tsig.scope, tsig_scope)
self.assertEqual(self.tsig.resource_id, self.zone.id)
def test_tsigkey_list(self):
tsigkeys = self.clients.as_user('admin').tsigkey_list()
self.assertGreater(len(tsigkeys), 0)
def test_tsigkey_create_and_show(self):
tsigkey = self.clients.as_user('admin').tsigkey_show(self.tsigkey.id)
self.assertEqual(tsigkey.name, self.tsigkey.name)
self.assertEqual(tsigkey.created_at, self.tsigkey.created_at)
self.assertEqual(tsigkey.id, self.tsigkey.id)
self.assertEqual(self.tsig.algorithm, self.tsig_algorithm)
self.assertEqual(self.tsig.secret, self.tsig_secret)
self.assertEqual(self.tsig.scope, self.tsig_scope)
self.assertEqual(self.tsig.resource_id, self.zone.id)
self.assertEqual(tsigkey.updated_at, self.tsigkey.updated_at)
def test_tsigkey_delete(self):
client = self.clients.as_user('admin')
client.tsigkey_delete(self.tsigkey.id)
self.assertRaises(CommandFailed, client.tsigkey_show, self.tsigkey.id)
def test_tsigkey_set(self):
client = self.clients.as_user('admin')
updated_name = random_tsigkey_name('updated')
tsigkey = client.tsigkey_set(self.tsigkey.id, name=updated_name,
secret='An updated tsigsecret')
self.assertEqual(tsigkey.secret, 'An updated tsigsecret')
self.assertEqual(tsigkey.name, updated_name)
class TestTSIGKeyNegative(BaseDesignateTest):
def test_tsigkey_invalid_commmand(self):
client = self.clients.as_user('admin')
self.assertRaises(CommandFailed, client.openstack,
'tsigkey notacommand')
def test_tsigkey_create_invalid_flag(self):
client = self.clients.as_user('admin')
self.assertRaises(CommandFailed, client.openstack,
'tsigkey create --notanoption "junk"')

View File

@ -0,0 +1,96 @@
# Copyright 2017 SAP SE
#
# Author: Rudolf Vriend <rudolf.vriend@sap.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import v2
class TestTSIGKeys(v2.APIV2TestCase, v2.CrudMixin):
RESOURCE = 'tsigkeys'
def new_ref(self, **kwargs):
ref = super(TestTSIGKeys, self).new_ref(**kwargs)
ref.setdefault("name", uuid.uuid4().hex)
ref.setdefault("algorithm", "hmac-sha256")
ref.setdefault("secret", uuid.uuid4().hex)
ref.setdefault("scope", "POOL")
ref.setdefault("resource_id", uuid.uuid4().hex)
return ref
def test_create(self):
ref = self.new_ref()
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.tsigkeys.create(**values)
self.assertRequestBodyIs(json=values)
def test_get(self):
ref = self.new_ref()
self.stub_entity("GET", entity=ref, id=ref["id"])
response = self.client.tsigkeys.get(ref["id"])
self.assertEqual(ref, response)
def test_get_by_name(self):
ref = self.new_ref(name="www")
self.stub_entity("GET", entity=ref, id=ref["id"])
self.stub_url("GET", parts=[self.RESOURCE], json={"tsigkeys": [ref]})
response = self.client.tsigkeys.get(ref['name'])
self.assertEqual("GET", self.requests.request_history[0].method)
self.assertEqual(
"http://127.0.0.1:9001/v2/tsigkeys?name=www",
self.requests.request_history[0].url)
self.assertEqual(ref, response)
def test_list(self):
items = [
self.new_ref(),
self.new_ref()
]
self.stub_url("GET", parts=[self.RESOURCE], json={"tsigkeys": items})
listed = self.client.tsigkeys.list()
self.assertList(items, listed)
self.assertQueryStringIs("")
def test_update(self):
ref = self.new_ref()
self.stub_entity("PATCH", entity=ref, id=ref["id"])
values = ref.copy()
del values["id"]
self.client.tsigkeys.update(ref["id"], values)
self.assertRequestBodyIs(json=values)
def test_delete(self):
ref = self.new_ref()
self.stub_entity("DELETE", id=ref["id"])
self.client.tsigkeys.delete(ref["id"])
self.assertRequestBodyIs(None)

View File

@ -0,0 +1,170 @@
# Copyright 2017 SAP SE
#
# Author: Rudolf Vriend <rudolf.vriend@sap.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.command import command
import six
from designateclient import utils
from designateclient.v2.cli import common
from designateclient.v2.utils import get_all
LOG = logging.getLogger(__name__)
def _format_tsigkey(tsigkey):
# Remove unneeded fields for display output formatting
tsigkey.pop('links', None)
class ListTSIGKeysCommand(command.Lister):
"""List tsigkeys"""
columns = ['id', 'name', 'algorithm', 'secret', 'scope', 'resource_id']
def get_parser(self, prog_name):
parser = super(ListTSIGKeysCommand, self).get_parser(prog_name)
parser.add_argument('--name', help="TSIGKey NAME", required=False)
parser.add_argument('--algorithm', help="TSIGKey algorithm",
required=False)
parser.add_argument('--scope', help="TSIGKey scope", required=False)
common.add_all_common_options(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.dns
common.set_all_common_headers(client, parsed_args)
criterion = {}
if parsed_args.name is not None:
criterion["name"] = parsed_args.name
if parsed_args.algorithm is not None:
criterion["algorithm"] = parsed_args.algorithm
if parsed_args.scope is not None:
criterion["scope"] = parsed_args.scope
data = get_all(client.tsigkeys.list, criterion)
cols = self.columns
return cols, (utils.get_item_properties(s, cols) for s in data)
class ShowTSIGKeyCommand(command.ShowOne):
"""Show tsigkey details"""
def get_parser(self, prog_name):
parser = super(ShowTSIGKeyCommand, self).get_parser(prog_name)
parser.add_argument('id', help="TSIGKey ID")
common.add_all_common_options(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.dns
common.set_all_common_headers(client, parsed_args)
data = client.tsigkeys.get(parsed_args.id)
_format_tsigkey(data)
return six.moves.zip(*sorted(six.iteritems(data)))
class CreateTSIGKeyCommand(command.ShowOne):
"""Create new tsigkey"""
def get_parser(self, prog_name):
parser = super(CreateTSIGKeyCommand, self).get_parser(prog_name)
parser.add_argument('--name', help="TSIGKey Name", required=True)
parser.add_argument('--algorithm', help="TSIGKey algorithm",
required=True)
parser.add_argument('--secret', help="TSIGKey secret", required=True)
parser.add_argument('--scope', help="TSIGKey scope", required=True)
parser.add_argument('--resource-id', help="TSIGKey resource_id",
required=True)
common.add_all_common_options(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.dns
common.set_all_common_headers(client, parsed_args)
data = client.tsigkeys.create(parsed_args.name, parsed_args.algorithm,
parsed_args.secret, parsed_args.scope,
parsed_args.resource_id)
_format_tsigkey(data)
return six.moves.zip(*sorted(six.iteritems(data)))
class SetTSIGKeyCommand(command.ShowOne):
"""Set tsigkey properties"""
def get_parser(self, prog_name):
parser = super(SetTSIGKeyCommand, self).get_parser(prog_name)
parser.add_argument('id', help="TSIGKey ID")
parser.add_argument('--name', help="TSIGKey Name")
parser.add_argument('--algorithm', help="TSIGKey algorithm")
parser.add_argument('--secret', help="TSIGKey secret")
parser.add_argument('--scope', help="TSIGKey scope")
common.add_all_common_options(parser)
return parser
def take_action(self, parsed_args):
data = {}
if parsed_args.name:
data['name'] = parsed_args.name
if parsed_args.algorithm:
data['algorithm'] = parsed_args.algorithm
if parsed_args.secret:
data['secret'] = parsed_args.secret
if parsed_args.scope:
data['scope'] = parsed_args.scope
client = self.app.client_manager.dns
common.set_all_common_headers(client, parsed_args)
data = client.tsigkeys.update(parsed_args.id, data)
_format_tsigkey(data)
return six.moves.zip(*sorted(six.iteritems(data)))
class DeleteTSIGKeyCommand(command.Command):
"""Delete tsigkey"""
def get_parser(self, prog_name):
parser = super(DeleteTSIGKeyCommand, self).get_parser(prog_name)
parser.add_argument('id', help="TSIGKey ID")
common.add_all_common_options(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.dns
common.set_all_common_headers(client, parsed_args)
client.tsigkeys.delete(parsed_args.id)
LOG.info('TSIGKey %s was deleted', parsed_args.id)

View File

@ -25,6 +25,7 @@ from designateclient.v2.recordsets import RecordSetController
from designateclient.v2.reverse import FloatingIPController
from designateclient.v2.service_statuses import ServiceStatusesController
from designateclient.v2.tlds import TLDController
from designateclient.v2.tsigkeys import TSIGKeysController
from designateclient.v2.zones import ZoneController
from designateclient.v2.zones import ZoneExportsController
from designateclient.v2.zones import ZoneImportsController
@ -138,3 +139,4 @@ class Client(object):
self.zone_imports = ZoneImportsController(self)
self.pools = PoolController(self)
self.quotas = QuotasController(self)
self.tsigkeys = TSIGKeysController(self)

View File

@ -0,0 +1,50 @@
# Copyright 2017 SAP SE
#
# Author: Rudolf Vriend <rudolf.vriend@sap.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient.v2.base import V2Controller
from designateclient.v2 import utils as v2_utils
class TSIGKeysController(V2Controller):
def create(self, name, algorithm, secret, scope, resource_id):
data = {
'name': name,
'algorithm': algorithm,
'secret': secret,
'scope': scope,
'resource_id': resource_id
}
return self._post('/tsigkeys', data=data)
def list(self, criterion=None, marker=None, limit=None):
url = self.build_url('/tsigkeys', criterion, marker, limit)
return self._get(url, response_key='tsigkeys')
def get(self, tsigkey):
tsigkey = v2_utils.resolve_by_name(self.list, tsigkey)
return self._get('/tsigkeys/%s' % tsigkey)
def update(self, tsigkey, values):
tsigkey = v2_utils.resolve_by_name(self.list, tsigkey)
return self._patch('/tsigkeys/%s' % tsigkey, data=values)
def delete(self, tsigkey):
tsigkey = v2_utils.resolve_by_name(self.list, tsigkey)
return self._delete('/tsigkeys/%s' % tsigkey)

View File

@ -147,6 +147,13 @@ openstack.dns.v2 =
dns_quota_set = designateclient.v2.cli.quotas:SetQuotasCommand
dns_quota_reset = designateclient.v2.cli.quotas:ResetQuotasCommand
tsigkey_create = designateclient.v2.cli.tsigkeys:CreateTSIGKeyCommand
tsigkey_list = designateclient.v2.cli.tsigkeys:ListTSIGKeysCommand
tsigkey_show = designateclient.v2.cli.tsigkeys:ShowTSIGKeyCommand
tsigkey_set = designateclient.v2.cli.tsigkeys:SetTSIGKeyCommand
tsigkey_delete = designateclient.v2.cli.tsigkeys:DeleteTSIGKeyCommand
[build_sphinx]
builders = html,man
all-files = 1