Add X509KeyPair controller and conductor.

This patch adds X509KeyPair controller and conductor to handle all
x509keypair related operation and also add test for it.

Change-Id: I5773fcd5bdf8a30fd195714e3e0fdc9d1b0c962d
Partially-Implements: bp secure-kubernetes
This commit is contained in:
Madhuri 2015-07-23 09:52:57 +09:00
parent 09d8ef4e04
commit 4eb9425185
10 changed files with 697 additions and 1 deletions

View File

@ -32,6 +32,7 @@ from magnum.api.controllers.v1 import node
from magnum.api.controllers.v1 import pod
from magnum.api.controllers.v1 import replicationcontroller as rc
from magnum.api.controllers.v1 import service
from magnum.api.controllers.v1 import x509keypair
from magnum.api import expose
from magnum.i18n import _
@ -102,6 +103,8 @@ class V1(controllers_base.APIBase):
services = [link.Link]
"""Links to the services resource"""
x509keypairs = [link.Link]
@staticmethod
def convert():
v1 = V1()
@ -151,6 +154,12 @@ class V1(controllers_base.APIBase):
pecan.request.host_url,
'services', '',
bookmark=True)]
v1.x509keypairs = [link.Link.make_link('self', pecan.request.host_url,
'x509keypairs', ''),
link.Link.make_link('bookmark',
pecan.request.host_url,
'x509keypairs', '',
bookmark=True)]
return v1
@ -164,6 +173,7 @@ class Controller(rest.RestController):
pods = pod.PodsController()
rcs = rc.ReplicationControllersController()
services = service.ServicesController()
x509keypairs = x509keypair.X509KeyPairController()
@expose.expose(V1)
def get(self):

View File

@ -0,0 +1,268 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import pecan
from pecan import rest
import wsme
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from magnum.api.controllers import base
from magnum.api.controllers import link
from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types
from magnum.api.controllers.v1 import utils as api_utils
from magnum.common import exception
from magnum import objects
class X509KeyPairPatchType(types.JsonPatchType):
@staticmethod
def mandatory_attrs():
return ['/bay_uuid']
class X509KeyPair(base.APIBase):
"""API representation of a x509keypair.
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of a
x509keypair.
"""
_bay_uuid = None
def _get_bay_uuid(self):
return self._bay_uuid
def _set_bay_uuid(self, value):
if value and self._bay_uuid != value:
try:
bay = api_utils.get_rpc_resource('Bay', value)
self._bay_uuid = bay.uuid
except exception.BayNotFound as e:
# Change error code because 404 (NotFound) is inappropriate
# response for a POST request to create a Bay
e.code = 400 # BadRequest
raise e
elif value == wtypes.Unset:
self._bay_uuid = wtypes.Unset
uuid = types.uuid
"""Unique UUID for this x509keypair"""
name = wtypes.StringType(min_length=1, max_length=255)
"""Name of the x509keypair"""
bay_uuid = wsme.wsproperty(wtypes.text, _get_bay_uuid,
_set_bay_uuid, mandatory=True)
"""The bay UUID or id"""
links = wsme.wsattr([link.Link], readonly=True)
"""A list containing a self link and associated x509keypair links"""
ca_cert = wtypes.StringType(min_length=1)
""""The CA certificate"""
certificate = wtypes.StringType(min_length=1)
"""The certificate"""
private_key = wtypes.StringType(min_length=1)
"""The private key"""
def __init__(self, **kwargs):
super(X509KeyPair, self).__init__()
self.fields = []
for field in objects.X509KeyPair.fields:
# Skip fields we do not expose.
if not hasattr(self, field):
continue
self.fields.append(field)
setattr(self, field, kwargs.get(field, wtypes.Unset))
@staticmethod
def _convert_with_links(x509keypair, url, expand=True):
if not expand:
x509keypair.unset_fields_except(['uuid', 'name', 'bay_uuid',
'ca_cert', 'certificate',
'private_key'])
x509keypair.links = [link.Link.make_link('self', url,
'x509keypairs',
x509keypair.uuid),
link.Link.make_link('bookmark', url,
'x509keypairs',
x509keypair.uuid,
bookmark=True)]
return x509keypair
@classmethod
def convert_with_links(cls, rpc_x509keypair, expand=True):
x509keypair = X509KeyPair(**rpc_x509keypair.as_dict())
return cls._convert_with_links(x509keypair,
pecan.request.host_url, expand)
@classmethod
def sample(cls, expand=True):
sample = cls(uuid='f978db47-9a37-4e9f-8572-804a10abc0aa',
name='MyX509KeyPair',
bay_uuid='7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
created_at=datetime.datetime.utcnow(),
ca_cert='AAA....AAA',
certificate='BBB....BBB',
private_key='CCC....CCC')
return cls._convert_with_links(sample, 'http://localhost:9511', expand)
class X509KeyPairCollection(collection.Collection):
"""API representation of a collection of x509keypairs."""
x509keypairs = [X509KeyPair]
"""A list containing x509keypairs objects"""
def __init__(self, **kwargs):
self._type = 'x509keypairs'
@staticmethod
def convert_with_links(rpc_x509keypairs, limit, url=None,
expand=False, **kwargs):
collection = X509KeyPairCollection()
collection.x509keypairs = [X509KeyPair.convert_with_links(p, expand)
for p in rpc_x509keypairs]
collection.next = collection.get_next(limit, url=url, **kwargs)
return collection
@classmethod
def sample(cls):
sample = cls()
sample.x509keypairs = [X509KeyPair.sample(expand=False)]
return sample
class X509KeyPairController(rest.RestController):
"""REST controller for X509KeyPair."""
def __init__(self):
super(X509KeyPairController, self).__init__()
_custom_actions = {
'detail': ['GET'],
}
def _get_x509keypairs_collection(self, marker, limit,
sort_key, sort_dir, expand=False,
resource_url=None):
limit = api_utils.validate_limit(limit)
sort_dir = api_utils.validate_sort_dir(sort_dir)
marker_obj = None
if marker:
marker_obj = objects.X509KeyPair.get_by_uuid(pecan.request.context,
marker)
x509keypairs = pecan.request.rpcapi.x509keypair_list(
pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
return X509KeyPairCollection.convert_with_links(x509keypairs, limit,
url=resource_url,
expand=expand,
sort_key=sort_key,
sort_dir=sort_dir)
@wsme_pecan.wsexpose(X509KeyPairCollection, types.uuid,
types.uuid, int, wtypes.text, wtypes.text)
def get_all(self, x509keypair_uuid=None, marker=None, limit=None,
sort_key='id', sort_dir='asc'):
"""Retrieve a list of x509keypairs.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
return self._get_x509keypairs_collection(marker, limit, sort_key,
sort_dir)
@wsme_pecan.wsexpose(X509KeyPairCollection, types.uuid,
types.uuid, int, wtypes.text, wtypes.text)
def detail(self, x509keypair_uuid=None, marker=None, limit=None,
sort_key='id', sort_dir='asc'):
"""Retrieve a list of x509keypairs with detail.
:param x509keypair_uuid: UUID of a x509keypair, to get onlyi
x509keypairs for that x509keypair.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
# NOTE(lucasagomes): /detail should only work agaist collections
parent = pecan.request.path.split('/')[:-1][-1]
if parent != "x509keypairs":
raise exception.HTTPNotFound
expand = True
resource_url = '/'.join(['x509keypairs', 'detail'])
return self._get_x509keypairs_collection(marker, limit,
sort_key, sort_dir, expand,
resource_url)
@wsme_pecan.wsexpose(X509KeyPair, types.uuid_or_name)
def get_one(self, x509keypair_ident):
"""Retrieve information about the given x509keypair.
:param x509keypair_ident: UUID of a x509keypair or
logical name of the x509keypair.
"""
rpc_x509keypair = api_utils.get_rpc_resource('X509KeyPair',
x509keypair_ident)
return X509KeyPair.convert_with_links(rpc_x509keypair)
@wsme_pecan.wsexpose(X509KeyPair, body=X509KeyPair, status_code=201)
def post(self, x509keypair):
"""Create a new x509keypair.
:param x509keypair: a x509keypair within the request body.
"""
x509keypair_dict = x509keypair.as_dict()
context = pecan.request.context
auth_token = context.auth_token_info['token']
x509keypair_dict['project_id'] = auth_token['project']['id']
x509keypair_dict['user_id'] = auth_token['user']['id']
x509keypair_obj = objects.X509KeyPair(context, **x509keypair_dict)
new_x509keypair = pecan.request.rpcapi.x509keypair_create(
x509keypair_obj)
# Set the HTTP Location Header
pecan.response.location = link.build_url('x509keypairs',
new_x509keypair.uuid)
return X509KeyPair.convert_with_links(new_x509keypair)
@wsme_pecan.wsexpose(None, types.uuid_or_name, status_code=204)
def delete(self, x509keypair_ident):
"""Delete a x509keypair.
:param x509keypair_ident: UUID of a x509keypair or logical
name of the x509keypair.
"""
rpc_x509keypair = api_utils.get_rpc_resource('X509KeyPair',
x509keypair_ident)
pecan.request.rpcapi.x509keypair_delete(rpc_x509keypair.uuid)

View File

@ -29,6 +29,7 @@ from magnum.conductor.handlers import bay_conductor
from magnum.conductor.handlers import conductor_listener
from magnum.conductor.handlers import docker_conductor
from magnum.conductor.handlers import k8s_conductor
from magnum.conductor.handlers import x509keypair_conductor
from magnum.i18n import _LE
from magnum.i18n import _LI
@ -49,6 +50,7 @@ def main():
docker_conductor.Handler(),
k8s_conductor.Handler(),
bay_conductor.Handler(),
x509keypair_conductor.Handler(),
conductor_listener.Handler(),
]

View File

@ -152,6 +152,17 @@ class API(rpc_service.API):
return self._call('container_exec', container_uuid=container_uuid,
command=command)
# X509KeyPair Operations
def x509keypair_create(self, x509keypair):
return self._call('x509keypair_create', x509keypair=x509keypair)
def x509keypair_delete(self, uuid):
return self._call('x509keypair_delete', uuid=uuid)
def x509keypair_list(self, context, limit, marker, sort_key, sort_dir):
return objects.X509KeyPair.list(context, limit, marker,
sort_key, sort_dir)
class ListenerAPI(rpc_service.API):
def __init__(self, context=None, topic=None, server=None, timeout=None):

View File

@ -0,0 +1,42 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Magnum X509KeyPair RPC handler."""
from oslo_log import log as logging
from magnum import objects
LOG = logging.getLogger(__name__)
class Handler(object):
"""These are the backend operations. They are executed by the backend
service. API calls via AMQP (within the ReST API) trigger the
handlers to be called.
"""
def __init__(self):
super(Handler, self).__init__()
def x509keypair_create(self, context, x509keypair):
LOG.debug("Creating x509keypair")
x509keypair.create(context)
return x509keypair
def x509keypair_delete(self, context, uuid):
LOG.debug("Deleting x509keypair_delete %s" % uuid)
x509keypair = objects.X509KeyPair.get_by_uuid(context, uuid)
x509keypair.destroy(context)

View File

@ -69,7 +69,11 @@ class TestRootController(api_base.FunctionalTest):
u'containers': [{u'href': u'http://localhost/v1/containers/',
u'rel': u'self'},
{u'href': u'http://localhost/containers/',
u'rel': u'bookmark'}]}
u'rel': u'bookmark'}],
u'x509keypairs': [{u'href': u'http://localhost/v1/x509keypairs/',
u'rel': u'self'},
{u'href': u'http://localhost/x509keypairs/',
u'rel': u'bookmark'}]}
response = self.app.get('/v1/')
self.assertEqual(expected, response.json)

View File

@ -0,0 +1,290 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from six.moves.urllib import parse as urlparse
from magnum.api.controllers.v1 import x509keypair as api_x509keypair
from magnum.common import utils
from magnum.conductor import api as rpcapi
from magnum import objects
from magnum.tests import base
from magnum.tests.unit.api import base as api_base
from magnum.tests.unit.api import utils as apiutils
from magnum.tests.unit.objects import utils as obj_utils
class TestX509KeyPairObject(base.TestCase):
def test_x509keypair_init(self):
x509keypair_dict = apiutils.x509keypair_post_data(bay_uuid=None)
x509keypair = api_x509keypair.X509KeyPair(**x509keypair_dict)
self.assertEqual('certificate', x509keypair.certificate)
class TestListX509KeyPair(api_base.FunctionalTest):
def setUp(self):
super(TestListX509KeyPair, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
def test_empty(self):
response = self.get_json('/x509keypairs')
self.assertEqual([], response['x509keypairs'])
def test_one(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs')
self.assertEqual(x509keypair.uuid, response['x509keypairs'][0]["uuid"])
self.assertIn('name', response['x509keypairs'][0])
self.assertIn('bay_uuid', response['x509keypairs'][0])
self.assertIn('certificate', response['x509keypairs'][0])
self.assertIn('private_key', response['x509keypairs'][0])
def test_get_one(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/%s' % x509keypair['uuid'])
self.assertEqual(x509keypair.uuid, response['uuid'])
self.assertIn('name', response)
self.assertIn('bay_uuid', response)
self.assertIn('certificate', response)
self.assertIn('private_key', response)
def test_get_one_by_name(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/%s' % x509keypair['name'])
self.assertEqual(x509keypair.uuid, response['uuid'])
self.assertIn('name', response)
self.assertIn('bay_uuid', response)
self.assertIn('certificate', response)
self.assertIn('private_key', response)
def test_get_one_by_name_not_found(self):
response = self.get_json(
'/x509keypairs/not_found',
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_get_one_by_name_multiple_x509keypair(self):
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=utils.generate_uuid())
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=utils.generate_uuid())
response = self.get_json('/x509keypairs/test_x509keypair',
expect_errors=True)
self.assertEqual(409, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_detail(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/detail')
self.assertEqual(x509keypair.uuid, response['x509keypairs'][0]["uuid"])
self.assertIn('name', response['x509keypairs'][0])
self.assertIn('bay_uuid', response['x509keypairs'][0])
self.assertIn('certificate', response['x509keypairs'][0])
self.assertIn('private_key', response['x509keypairs'][0])
def test_detail_against_single(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json(
'/x509keypairs/%s/detail' % x509keypair['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
keypair_list = []
for id_ in range(5):
x509keypair = obj_utils.create_test_x509keypair(
self.context, id=id_,
uuid=utils.generate_uuid())
keypair_list.append(x509keypair.uuid)
response = self.get_json('/x509keypairs')
self.assertEqual(len(keypair_list), len(response['x509keypairs']))
uuids = [b['uuid'] for b in response['x509keypairs']]
self.assertEqual(sorted(keypair_list), sorted(uuids))
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_x509keypair(self.context, id=1, uuid=uuid)
response = self.get_json('/x509keypairs/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_test_x509keypair(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/x509keypairs/?limit=3')
self.assertEqual(3, len(response['x509keypairs']))
next_marker = response['x509keypairs'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_x509keypair(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/x509keypairs')
self.assertEqual(3, len(response['x509keypairs']))
next_marker = response['x509keypairs'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestPost(api_base.FunctionalTest):
def setUp(self):
super(TestPost, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
p = mock.patch.object(rpcapi.API, 'x509keypair_create')
self.mock_x509keypair_create = p.start()
self.mock_x509keypair_create.side_effect = \
self._simulate_rpc_x509keypair_create
self.addCleanup(p.stop)
def _simulate_rpc_x509keypair_create(self, x509keypair):
x509keypair.create()
return x509keypair
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_x509keypair(self, mock_utcnow):
cdict = apiutils.x509keypair_post_data()
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.post_json('/x509keypairs', cdict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/x509keypairs/%s' % cdict['uuid']
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertEqual(cdict['uuid'], response.json['uuid'])
self.assertNotIn('updated_at', response.json.keys)
return_created_at = timeutils.parse_isotime(
response.json['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
def test_create_x509keypair_doesnt_contain_id(self):
with mock.patch.object(self.dbapi, 'create_x509keypair',
wraps=self.dbapi.create_x509keypair) as cc_mock:
cdict = apiutils.x509keypair_post_data(
name='x509keypair_example_A')
response = self.post_json('/x509keypairs', cdict)
self.assertEqual(cdict['name'], response.json['name'])
cc_mock.assert_called_once_with(mock.ANY)
# Check that 'id' is not in first arg of positional args
self.assertNotIn('id', cc_mock.call_args[0][0])
def test_create_x509keypair_generate_uuid(self):
cdict = apiutils.x509keypair_post_data()
del cdict['uuid']
response = self.post_json('/x509keypairs', cdict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(cdict['name'], response.json['name'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
def test_create_x509keypair_no_bay_uuid(self):
cdict = apiutils.x509keypair_post_data()
del cdict['bay_uuid']
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
def test_create_x509keypair_with_non_existent_bay_uuid(self):
cdict = apiutils.x509keypair_post_data(bay_uuid=utils.generate_uuid())
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['error_message'])
def test_create_x509keypair_with_bay_name(self):
cdict = apiutils.x509keypair_post_data(bay_uuid=self.bay.name)
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
self.x509keypair = obj_utils.create_test_x509keypair(self.context)
p = mock.patch.object(rpcapi.API, 'x509keypair_delete')
self.mock_x509keypair_delete = p.start()
self.mock_x509keypair_delete.side_effect = \
self._simulate_rpc_x509keypair_delete
self.addCleanup(p.stop)
def _simulate_rpc_x509keypair_delete(self, x509keypair_uuid):
x509keypair = objects.X509KeyPair.get_by_uuid(self.context,
x509keypair_uuid)
x509keypair.destroy()
def test_delete_x509keypair(self):
self.delete('/x509keypairs/%s' % self.x509keypair.uuid)
response = self.get_json('/x509keypairs/%s' % self.x509keypair.uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_delete_x509keypair_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/x509keypairs/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_delete_x509keypair_with_name_not_found(self):
response = self.delete('/x509keypairs/not_found', expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_delete_x509keypair_with_name(self):
response = self.delete('/x509keypairs/%s' % self.x509keypair.name,
expect_errors=True)
self.assertEqual(204, response.status_int)
def test_delete_multiple_x509keypair_by_name(self):
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=utils.generate_uuid())
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=utils.generate_uuid())
response = self.delete('/x509keypairs/test_x509keypair',
expect_errors=True)
self.assertEqual(409, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])

View File

@ -19,6 +19,7 @@ from magnum.api.controllers.v1 import node as node_controller
from magnum.api.controllers.v1 import pod as pod_controller
from magnum.api.controllers.v1 import replicationcontroller as rc_controller
from magnum.api.controllers.v1 import service as service_controller
from magnum.api.controllers.v1 import x509keypair as x509keypair_controller
from magnum.tests.unit.db import utils
@ -133,3 +134,9 @@ def node_post_data(**kw):
node = utils.get_test_node(**kw)
internal = node_controller.NodePatchType.internal_attrs()
return remove_internal(node, internal)
def x509keypair_post_data(**kw):
x509keypair = utils.get_test_x509keypair(**kw)
internal = x509keypair_controller.X509KeyPairPatchType.internal_attrs()
return remove_internal(x509keypair, internal)

View File

@ -0,0 +1,43 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from magnum.conductor.handlers import x509keypair_conductor
from magnum import objects
from magnum.tests import base
import mock
from mock import patch
class TestX509KeyPairConductor(base.TestCase):
def setUp(self):
super(TestX509KeyPairConductor, self).setUp()
self.x509keypair_handler = x509keypair_conductor.Handler()
def test_x509keypair_create(self):
expected_x509keypair = objects.X509KeyPair({})
expected_x509keypair.create = mock.MagicMock()
self.x509keypair_handler.x509keypair_create(self.context,
expected_x509keypair)
expected_x509keypair.create.assert_called_once_with(self.context)
@patch('magnum.objects.X509KeyPair.get_by_uuid')
def test_x509keypair_delete(self, mock_x509keypair_get_by_uuid):
mock_x509keypair = mock.MagicMock()
mock_x509keypair.name = 'test-x509keypair'
mock_x509keypair.uuid = 'test-uuid'
mock_x509keypair_get_by_uuid.return_value = mock_x509keypair
self.x509keypair_handler.x509keypair_delete(self.context, "test-uuid")
mock_x509keypair.destroy.assert_called_once_with(self.context)

View File

@ -31,6 +31,8 @@ class RPCAPITestCase(base.DbTestCase):
self.fake_pod = dbutils.get_test_pod(driver='fake-driver')
self.fake_rc = dbutils.get_test_rc(driver='fake-driver')
self.fake_service = dbutils.get_test_service(driver='fake-driver')
self.fake_x509keypair = dbutils.get_test_x509keypair(
driver='fake-driver')
def _test_rpcapi(self, method, rpc_method, **kwargs):
rpcapi_cls = kwargs.pop('rpcapi_cls', conductor_rpcapi.API)
@ -231,3 +233,20 @@ class RPCAPITestCase(base.DbTestCase):
'call',
rpcapi_cls=conductor_rpcapi.ListenerAPI,
version='1.0')
def test_x509keypair_create(self):
self._test_rpcapi('x509keypair_create',
'call',
version='1.0',
x509keypair=self.fake_x509keypair)
def test_x509keypair_delete(self):
self._test_rpcapi('x509keypair_delete',
'call',
version='1.0',
uuid=self.fake_x509keypair['uuid'])
self._test_rpcapi('x509keypair_delete',
'call',
version='1.1',
uuid=self.fake_x509keypair['name'])