X509keypair cleanup

This patch does following:

* Removes X509keypair controller as there is already Certificate
  controller for same purpose.
* Removes X509keypair conductor.
* Removes name, ca_cert and bay_uuid from x509keypair model as
  Bay model already holds certificate references.
* Add intermediates and private_key_passphrase to x509keypair
  model.
* Remove related tests and changes.

Change-Id: I9271221cd1d07c672c4a380a4ae3593237fca66a
Partially-Implements: blueprint barbican-alternative-storeX
changes/09/322009/6
Madhuri Kumari 2016-05-27 14:12:23 +05:30
parent 41b9ae644d
commit 303d14dde0
20 changed files with 57 additions and 865 deletions

View File

@ -30,7 +30,6 @@ from magnum.api.controllers.v1 import bay
from magnum.api.controllers.v1 import baymodel
from magnum.api.controllers.v1 import certificate
from magnum.api.controllers.v1 import magnum_services
from magnum.api.controllers.v1 import x509keypair
from magnum.api import expose
from magnum.i18n import _
@ -91,8 +90,6 @@ class V1(controllers_base.APIBase):
bays = [link.Link]
"""Links to the bays resource"""
x509keypairs = [link.Link]
certificates = [link.Link]
"""Links to the certificates resource"""
@ -124,12 +121,6 @@ class V1(controllers_base.APIBase):
pecan.request.host_url,
'bays', '',
bookmark=True)]
v1.x509keypairs = [link.Link.make_link('self', pecan.request.host_url,
'x509keypairs', ''),
link.Link.make_link('bookmark',
pecan.request.host_url,
'x509keypairs', '',
bookmark=True)]
v1.certificates = [link.Link.make_link('self', pecan.request.host_url,
'certificates', ''),
link.Link.make_link('bookmark',
@ -150,7 +141,6 @@ class Controller(rest.RestController):
bays = bay.BaysController()
baymodels = baymodel.BayModelsController()
x509keypairs = x509keypair.X509KeyPairController()
certificates = certificate.CertificateController()
mservices = magnum_services.MagnumServiceController()

View File

@ -1,264 +0,0 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
import pecan
from pecan import rest
import wsme
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from magnum.api.controllers import base
from magnum.api.controllers import link
from magnum.api.controllers.v1 import collection
from magnum.api.controllers.v1 import types
from magnum.api import utils as api_utils
from magnum.common import exception
from magnum import objects
class X509KeyPairPatchType(types.JsonPatchType):
@staticmethod
def mandatory_attrs():
return ['/bay_uuid']
class X509KeyPair(base.APIBase):
"""API representation of a x509keypair.
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of a
x509keypair.
"""
_bay_uuid = None
def _get_bay_uuid(self):
return self._bay_uuid
def _set_bay_uuid(self, value):
if value and self._bay_uuid != value:
try:
bay = api_utils.get_resource('Bay', value)
self._bay_uuid = bay.uuid
except exception.BayNotFound as e:
# Change error code because 404 (NotFound) is inappropriate
# response for a POST request to create a Bay
e.code = 400 # BadRequest
raise e
elif value == wtypes.Unset:
self._bay_uuid = wtypes.Unset
uuid = types.uuid
"""Unique UUID for this x509keypair"""
name = wtypes.StringType(min_length=1, max_length=255)
"""Name of the x509keypair"""
bay_uuid = wsme.wsproperty(wtypes.text, _get_bay_uuid,
_set_bay_uuid, mandatory=True)
"""The bay UUID or id"""
links = wsme.wsattr([link.Link], readonly=True)
"""A list containing a self link and associated x509keypair links"""
ca_cert = wtypes.StringType(min_length=1)
""""The CA certificate"""
certificate = wtypes.StringType(min_length=1)
"""The certificate"""
private_key = wtypes.StringType(min_length=1)
"""The private key"""
def __init__(self, **kwargs):
super(X509KeyPair, self).__init__()
self.fields = []
for field in objects.X509KeyPair.fields:
# Skip fields we do not expose.
if not hasattr(self, field):
continue
self.fields.append(field)
setattr(self, field, kwargs.get(field, wtypes.Unset))
@staticmethod
def _convert_with_links(x509keypair, url, expand=True):
if not expand:
x509keypair.unset_fields_except(['uuid', 'name', 'bay_uuid',
'ca_cert', 'certificate',
'private_key'])
x509keypair.links = [link.Link.make_link('self', url,
'x509keypairs',
x509keypair.uuid),
link.Link.make_link('bookmark', url,
'x509keypairs',
x509keypair.uuid,
bookmark=True)]
return x509keypair
@classmethod
def convert_with_links(cls, rpc_x509keypair, expand=True):
x509keypair = X509KeyPair(**rpc_x509keypair.as_dict())
return cls._convert_with_links(x509keypair,
pecan.request.host_url, expand)
@classmethod
def sample(cls, expand=True):
sample = cls(uuid='f978db47-9a37-4e9f-8572-804a10abc0aa',
name='MyX509KeyPair',
bay_uuid='7ae81bb3-dec3-4289-8d6c-da80bd8001ae',
created_at=timeutils.utcnow(),
ca_cert='AAA....AAA',
certificate='BBB....BBB',
private_key='CCC....CCC')
return cls._convert_with_links(sample, 'http://localhost:9511', expand)
class X509KeyPairCollection(collection.Collection):
"""API representation of a collection of x509keypairs."""
x509keypairs = [X509KeyPair]
"""A list containing x509keypairs objects"""
def __init__(self, **kwargs):
self._type = 'x509keypairs'
@staticmethod
def convert_with_links(rpc_x509keypairs, limit, url=None,
expand=False, **kwargs):
collection = X509KeyPairCollection()
collection.x509keypairs = [X509KeyPair.convert_with_links(p, expand)
for p in rpc_x509keypairs]
collection.next = collection.get_next(limit, url=url, **kwargs)
return collection
@classmethod
def sample(cls):
sample = cls()
sample.x509keypairs = [X509KeyPair.sample(expand=False)]
return sample
class X509KeyPairController(rest.RestController):
"""REST controller for X509KeyPair."""
def __init__(self):
super(X509KeyPairController, self).__init__()
_custom_actions = {
'detail': ['GET'],
}
def _get_x509keypairs_collection(self, marker, limit,
sort_key, sort_dir, expand=False,
resource_url=None):
limit = api_utils.validate_limit(limit)
sort_dir = api_utils.validate_sort_dir(sort_dir)
marker_obj = None
if marker:
marker_obj = objects.X509KeyPair.get_by_uuid(pecan.request.context,
marker)
x509keypairs = objects.X509KeyPair.list(
pecan.request.context, limit,
marker_obj, sort_key=sort_key,
sort_dir=sort_dir)
return X509KeyPairCollection.convert_with_links(x509keypairs, limit,
url=resource_url,
expand=expand,
sort_key=sort_key,
sort_dir=sort_dir)
@wsme_pecan.wsexpose(X509KeyPairCollection, types.uuid, int,
wtypes.text, wtypes.text)
def get_all(self, marker=None, limit=None, sort_key='id',
sort_dir='asc'):
"""Retrieve a list of x509keypairs.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
return self._get_x509keypairs_collection(marker, limit, sort_key,
sort_dir)
@wsme_pecan.wsexpose(X509KeyPairCollection, types.uuid, int,
wtypes.text, wtypes.text)
def detail(self, marker=None, limit=None, sort_key='id',
sort_dir='asc'):
"""Retrieve a list of x509keypairs with detail.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
# NOTE(lucasagomes): /detail should only work against collections
parent = pecan.request.path.split('/')[:-1][-1]
if parent != "x509keypairs":
raise exception.HTTPNotFound
expand = True
resource_url = '/'.join(['x509keypairs', 'detail'])
return self._get_x509keypairs_collection(marker, limit,
sort_key, sort_dir, expand,
resource_url)
@wsme_pecan.wsexpose(X509KeyPair, types.uuid_or_name)
def get_one(self, x509keypair_ident):
"""Retrieve information about the given x509keypair.
:param x509keypair_ident: UUID of a x509keypair or
logical name of the x509keypair.
"""
x509keypair = api_utils.get_resource('X509KeyPair',
x509keypair_ident)
return X509KeyPair.convert_with_links(x509keypair)
@wsme_pecan.wsexpose(X509KeyPair, body=X509KeyPair, status_code=201)
def post(self, x509keypair):
"""Create a new x509keypair.
:param x509keypair: a x509keypair within the request body.
"""
x509keypair_dict = x509keypair.as_dict()
context = pecan.request.context
x509keypair_dict['project_id'] = context.project_id
x509keypair_dict['user_id'] = context.user_id
x509keypair_obj = objects.X509KeyPair(context, **x509keypair_dict)
new_x509keypair = pecan.request.rpcapi.x509keypair_create(
x509keypair_obj)
# Set the HTTP Location Header
pecan.response.location = link.build_url('x509keypairs',
new_x509keypair.uuid)
return X509KeyPair.convert_with_links(new_x509keypair)
@wsme_pecan.wsexpose(None, types.uuid_or_name, status_code=204)
def delete(self, x509keypair_ident):
"""Delete a x509keypair.
:param x509keypair_ident: UUID of a x509keypair or logical
name of the x509keypair.
"""
x509keypair = api_utils.get_resource('X509KeyPair',
x509keypair_ident)
pecan.request.rpcapi.x509keypair_delete(x509keypair.uuid)

View File

@ -31,7 +31,6 @@ from magnum.conductor.handlers import conductor_listener
from magnum.conductor.handlers import docker_conductor
from magnum.conductor.handlers import indirection_api
from magnum.conductor.handlers import k8s_conductor
from magnum.conductor.handlers import x509keypair_conductor
from magnum.i18n import _LE
from magnum.i18n import _LI
from magnum import version
@ -56,7 +55,6 @@ def main():
docker_conductor.Handler(),
k8s_conductor.Handler(),
bay_conductor.Handler(),
x509keypair_conductor.Handler(),
conductor_listener.Handler(),
ca_conductor.Handler(),
]

View File

@ -502,7 +502,7 @@ class KubernetesAPIFailed(MagnumException):
class X509KeyPairNotFound(ResourceNotFound):
message = _("A key pair %(keypair)s could not be found.")
message = _("A key pair %(x509keypair)s could not be found.")
class X509KeyPairAlreadyExists(Conflict):

View File

@ -113,14 +113,6 @@ class API(rpc_service.API):
return self._call('container_exec', container_uuid=container_uuid,
command=command)
# X509KeyPair Operations
def x509keypair_create(self, x509keypair):
return self._call('x509keypair_create', x509keypair=x509keypair)
def x509keypair_delete(self, uuid):
return self._call('x509keypair_delete', uuid=uuid)
# CA operations
def sign_certificate(self, bay, certificate):

View File

@ -1,42 +0,0 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from magnum import objects
LOG = logging.getLogger(__name__)
class Handler(object):
"""Magnum X509KeyPair RPC handler.
These are the backend operations. They are executed by the backend service.
API calls via AMQP (within the ReST API) trigger the handlers to be called.
"""
def __init__(self):
super(Handler, self).__init__()
def x509keypair_create(self, context, x509keypair):
LOG.debug("Creating x509keypair")
x509keypair.create(context)
return x509keypair
def x509keypair_delete(self, context, uuid):
LOG.debug("Deleting x509keypair %s", uuid)
x509keypair = objects.X509KeyPair.get_by_uuid(context, uuid)
x509keypair.destroy(context)

View File

@ -463,10 +463,10 @@ class Connection(object):
{
'uuid': uuidutils.generate_uuid(),
'name': 'example',
'ca_cert': 'AAA...',
'certificate': 'BBB...',
'private_key': 'CCC...',
'certificate': 'AAA...',
'private_key': 'BBB...',
'private_key_passphrase': 'CCC...',
'intermediates': 'DDD...',
}
:returns: A X509KeyPair.
"""
@ -489,15 +489,6 @@ class Connection(object):
:returns: A x509keypair.
"""
@abc.abstractmethod
def get_x509keypair_by_name(self, context, x509keypair_name):
"""Return a x509keypair.
:param context: The security context
:param x509keypair_name: The name of a x509keypair.
:returns: A x509keypair.
"""
@abc.abstractmethod
def destroy_x509keypair(self, x509keypair_id):
"""Destroy a x509keypair.
@ -533,13 +524,6 @@ class Connection(object):
:returns: A list of tuples of the specified columns.
"""
def get_x509keypair_by_bay_uuid(self, bay_uuid):
"""Returns the cert for a given bay.
:param bay_uuid: The uuid of a bay.
:returns: A cert.
"""
@abc.abstractmethod
def destroy_magnum_service(self, magnum_service_id):
"""Destroys a magnum_service record.

View File

@ -0,0 +1,39 @@
# Copyright 2016 Intel Technologies India Pvt. Ld.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""modify x509keypair table
Revision ID: d072f58ab240
Revises: e647f5931da8
Create Date: 2016-05-27 15:29:22.955268
"""
# revision identifiers, used by Alembic.
revision = 'd072f58ab240'
down_revision = 'ef08a5e057bd'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_column('x509keypair', 'bay_uuid')
op.drop_column('x509keypair', 'name')
op.drop_column('x509keypair', 'ca_cert')
op.add_column('x509keypair', sa.Column('intermediates',
sa.Text(), nullable=True))
op.add_column('x509keypair', sa.Column('private_key_passphrase',
sa.Text(), nullable=True))

View File

@ -687,19 +687,6 @@ class Connection(api.Connection):
except NoResultFound:
raise exception.X509KeyPairNotFound(x509keypair=x509keypair_id)
def get_x509keypair_by_name(self, context, x509keypair_name):
query = model_query(models.X509KeyPair)
query = self._add_tenant_filters(context, query)
query = query.filter_by(name=x509keypair_name)
try:
return query.one()
except MultipleResultsFound:
raise exception.Conflict('Multiple x509keypairs exist with '
'same name. Please use the x509keypair '
'uuid instead.')
except NoResultFound:
raise exception.X509KeyPairNotFound(x509keypair=x509keypair_name)
def get_x509keypair_by_uuid(self, context, x509keypair_uuid):
query = model_query(models.X509KeyPair)
query = self._add_tenant_filters(context, query)
@ -746,10 +733,6 @@ class Connection(api.Connection):
if filters is None:
filters = {}
if 'bay_uuid' in filters:
query = query.filter_by(bay_uuid=filters['bay_uuid'])
if 'name' in filters:
query = query.filter_by(name=filters['name'])
if 'project_id' in filters:
query = query.filter_by(project_id=filters['project_id'])
if 'user_id' in filters:
@ -765,13 +748,6 @@ class Connection(api.Connection):
return _paginate_query(models.X509KeyPair, limit, marker,
sort_key, sort_dir, query)
def get_x509keypair_by_bay_uuid(self, context, bay_uuid):
query = model_query(models.X509KeyPair).filter_by(bay_uuid=bay_uuid)
try:
return query.one()
except NoResultFound:
raise exception.BayNotFound(bay=bay_uuid)
def destroy_magnum_service(self, magnum_service_id):
session = get_session()
with session.begin():

View File

@ -244,11 +244,10 @@ class X509KeyPair(Base):
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36))
name = Column(String(255))
bay_uuid = Column(String(36))
ca_cert = Column(Text())
certificate = Column(Text())
private_key = Column(Text())
private_key_passphrase = Column(Text())
intermediates = Column(Text())
project_id = Column(String(255))
user_id = Column(String(255))

View File

@ -24,18 +24,19 @@ class X509KeyPair(base.MagnumPersistentObject, base.MagnumObject,
base.MagnumObjectDictCompat):
# Version 1.0: Initial version
# Version 1.1: Added new method get_x509keypair_by_bay_uuid
VERSION = '1.1'
# Version 1.2: Remove bay_uuid, name, ca_cert and add intermediates
# and private_key_passphrase
VERSION = '1.2'
dbapi = dbapi.get_instance()
fields = {
'id': fields.IntegerField(),
'uuid': fields.UUIDField(nullable=True),
'name': fields.StringField(nullable=True),
'bay_uuid': fields.StringField(nullable=True),
'ca_cert': fields.StringField(nullable=True),
'certificate': fields.StringField(nullable=True),
'private_key': fields.StringField(nullable=True),
'intermediates': fields.StringField(nullable=True),
'private_key_passphrase': fields.StringField(nullable=True),
'project_id': fields.StringField(nullable=True),
'user_id': fields.StringField(nullable=True),
}
@ -99,32 +100,6 @@ class X509KeyPair(base.MagnumPersistentObject, base.MagnumObject,
x509keypair = X509KeyPair._from_db_object(cls(context), db_x509keypair)
return x509keypair
@base.remotable_classmethod
def get_by_name(cls, context, name):
"""Find a x509keypair based on name and return a X509KeyPair object.
:param name: the logical name of a x509keypair.
:param context: Security context
:returns: a :class:`X509KeyPair` object.
"""
db_x509keypair = cls.dbapi.get_x509keypair_by_name(context, name)
x509keypair = X509KeyPair._from_db_object(cls(context), db_x509keypair)
return x509keypair
@base.remotable_classmethod
def get_by_bay_uuid(cls, context, bay_uuid):
"""Find a x509keypair based on a bay uuid and return a :class:`X509KeyPair`
object.
:param bay_uuid: the uuid of a bay.
:param context: Security context.
:returns: a :class:`X509KeyPair` object.
"""
db_cert = cls.dbapi.get_x509keypair_by_bay_uuid(context, bay_uuid)
x509keypair = X509KeyPair._from_db_object(cls(context), db_cert)
return x509keypair
@base.remotable_classmethod
def list(cls, context, limit=None, marker=None,
sort_key=None, sort_dir=None, filters=None):
@ -135,8 +110,8 @@ class X509KeyPair(base.MagnumPersistentObject, base.MagnumObject,
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:param filters: filter dict, can include 'x509keypairmodel_id', 'name',
'bay_uuid', 'project_id', 'user_id'.
:param filters: filter dict, can include 'x509keypairmodel_id',
'project_id', 'user_id'.
:returns: a list of :class:`X509KeyPair` object.
"""

View File

@ -62,10 +62,6 @@ class TestRootController(api_base.FunctionalTest):
{u'href': u'http://localhost/baymodels/',
u'rel': u'bookmark'}],
u'id': u'v1',
u'x509keypairs': [{u'href': u'http://localhost/v1/x509keypairs/',
u'rel': u'self'},
{u'href': u'http://localhost/x509keypairs/',
u'rel': u'bookmark'}],
u'certificates': [{u'href': u'http://localhost/v1/certificates/',
u'rel': u'self'},
{u'href': u'http://localhost/certificates/',

View File

@ -1,303 +0,0 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
from six.moves.urllib import parse as urlparse
from magnum.api.controllers.v1 import x509keypair as api_x509keypair
from magnum.conductor import api as rpcapi
from magnum import objects
from magnum.tests import base
from magnum.tests.unit.api import base as api_base
from magnum.tests.unit.api import utils as apiutils
from magnum.tests.unit.objects import utils as obj_utils
class TestX509KeyPairObject(base.TestCase):
def test_x509keypair_init(self):
x509keypair_dict = apiutils.x509keypair_post_data(bay_uuid=None)
x509keypair = api_x509keypair.X509KeyPair(**x509keypair_dict)
self.assertEqual('certificate', x509keypair.certificate)
class TestListX509KeyPair(api_base.FunctionalTest):
def setUp(self):
super(TestListX509KeyPair, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
def test_empty(self):
response = self.get_json('/x509keypairs')
self.assertEqual([], response['x509keypairs'])
def test_one(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs')
self.assertEqual(x509keypair.uuid, response['x509keypairs'][0]["uuid"])
self.assertIn('name', response['x509keypairs'][0])
self.assertIn('bay_uuid', response['x509keypairs'][0])
self.assertIn('certificate', response['x509keypairs'][0])
self.assertIn('private_key', response['x509keypairs'][0])
def test_get_one(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/%s' % x509keypair['uuid'])
self.assertEqual(x509keypair.uuid, response['uuid'])
self.assertIn('name', response)
self.assertIn('bay_uuid', response)
self.assertIn('certificate', response)
self.assertIn('private_key', response)
def test_get_one_by_name(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/%s' % x509keypair['name'])
self.assertEqual(x509keypair.uuid, response['uuid'])
self.assertIn('name', response)
self.assertIn('bay_uuid', response)
self.assertIn('certificate', response)
self.assertIn('private_key', response)
def test_get_one_by_name_not_found(self):
response = self.get_json(
'/x509keypairs/not_found',
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])
def test_get_one_by_name_multiple_x509keypair(self):
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=uuidutils.generate_uuid())
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=uuidutils.generate_uuid())
response = self.get_json('/x509keypairs/test_x509keypair',
expect_errors=True)
self.assertEqual(409, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])
def test_detail(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json('/x509keypairs/detail')
self.assertEqual(x509keypair.uuid, response['x509keypairs'][0]["uuid"])
self.assertIn('name', response['x509keypairs'][0])
self.assertIn('bay_uuid', response['x509keypairs'][0])
self.assertIn('certificate', response['x509keypairs'][0])
self.assertIn('private_key', response['x509keypairs'][0])
def test_detail_against_single(self):
x509keypair = obj_utils.create_test_x509keypair(self.context)
response = self.get_json(
'/x509keypairs/%s/detail' % x509keypair['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
keypair_list = []
for id_ in range(5):
x509keypair = obj_utils.create_test_x509keypair(
self.context, id=id_,
uuid=uuidutils.generate_uuid())
keypair_list.append(x509keypair.uuid)
response = self.get_json('/x509keypairs')
self.assertEqual(len(keypair_list), len(response['x509keypairs']))
uuids = [b['uuid'] for b in response['x509keypairs']]
self.assertEqual(sorted(keypair_list), sorted(uuids))
def test_links(self):
uuid = uuidutils.generate_uuid()
obj_utils.create_test_x509keypair(self.context, id=1, uuid=uuid)
response = self.get_json('/x509keypairs/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_test_x509keypair(self.context, id=id_,
uuid=uuidutils.generate_uuid())
response = self.get_json('/x509keypairs/?limit=3')
self.assertEqual(3, len(response['x509keypairs']))
next_marker = response['x509keypairs'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_x509keypair(self.context, id=id_,
uuid=uuidutils.generate_uuid())
response = self.get_json('/x509keypairs')
self.assertEqual(3, len(response['x509keypairs']))
next_marker = response['x509keypairs'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestPost(api_base.FunctionalTest):
def setUp(self):
super(TestPost, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
p = mock.patch.object(rpcapi.API, 'x509keypair_create')
self.mock_x509keypair_create = p.start()
self.mock_x509keypair_create.side_effect = \
self._simulate_rpc_x509keypair_create
self.addCleanup(p.stop)
def _simulate_rpc_x509keypair_create(self, x509keypair):
x509keypair.create()
return x509keypair
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_x509keypair(self, mock_utcnow):
cdict = apiutils.x509keypair_post_data()
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.post_json('/x509keypairs', cdict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/x509keypairs/%s' % cdict['uuid']
self.assertEqual(expected_location,
urlparse.urlparse(response.location).path)
self.assertEqual(cdict['uuid'], response.json['uuid'])
self.assertNotIn('updated_at', response.json.keys)
return_created_at = timeutils.parse_isotime(
response.json['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
def test_create_x509keypair_set_project_id_and_user_id(self):
cdict = apiutils.x509keypair_post_data()
def _simulate_keypair_create(x509keypair):
self.assertEqual(self.context.project_id, x509keypair.project_id)
self.assertEqual(self.context.user_id, x509keypair.user_id)
x509keypair.create()
return x509keypair
self.mock_x509keypair_create.side_effect = _simulate_keypair_create
self.post_json('/x509keypairs', cdict)
def test_create_x509keypair_doesnt_contain_id(self):
with mock.patch.object(self.dbapi, 'create_x509keypair',
wraps=self.dbapi.create_x509keypair) as cc_mock:
cdict = apiutils.x509keypair_post_data(
name='x509keypair_example_A')
response = self.post_json('/x509keypairs', cdict)
self.assertEqual(cdict['name'], response.json['name'])
cc_mock.assert_called_once_with(mock.ANY)
# Check that 'id' is not in first arg of positional args
self.assertNotIn('id', cc_mock.call_args[0][0])
def test_create_x509keypair_generate_uuid(self):
cdict = apiutils.x509keypair_post_data()
del cdict['uuid']
response = self.post_json('/x509keypairs', cdict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(cdict['name'], response.json['name'])
self.assertTrue(uuidutils.is_uuid_like(response.json['uuid']))
def test_create_x509keypair_no_bay_uuid(self):
cdict = apiutils.x509keypair_post_data()
del cdict['bay_uuid']
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
def test_create_x509keypair_with_non_existent_bay_uuid(self):
cdict = apiutils.x509keypair_post_data(
bay_uuid=uuidutils.generate_uuid())
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['errors'])
def test_create_x509keypair_with_bay_name(self):
cdict = apiutils.x509keypair_post_data(bay_uuid=self.bay.name)
response = self.post_json('/x509keypairs', cdict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
self.bay = obj_utils.create_test_bay(self.context)
self.x509keypair = obj_utils.create_test_x509keypair(self.context)
p = mock.patch.object(rpcapi.API, 'x509keypair_delete')
self.mock_x509keypair_delete = p.start()
self.mock_x509keypair_delete.side_effect = \
self._simulate_rpc_x509keypair_delete
self.addCleanup(p.stop)
def _simulate_rpc_x509keypair_delete(self, x509keypair_uuid):
x509keypair = objects.X509KeyPair.get_by_uuid(self.context,
x509keypair_uuid)
x509keypair.destroy()
def test_delete_x509keypair(self):
self.delete('/x509keypairs/%s' % self.x509keypair.uuid)
response = self.get_json('/x509keypairs/%s' % self.x509keypair.uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])
def test_delete_x509keypair_not_found(self):
uuid = uuidutils.generate_uuid()
response = self.delete('/x509keypairs/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])
def test_delete_x509keypair_with_name_not_found(self):
response = self.delete('/x509keypairs/not_found', expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])
def test_delete_x509keypair_with_name(self):
response = self.delete('/x509keypairs/%s' % self.x509keypair.name,
expect_errors=True)
self.assertEqual(204, response.status_int)
def test_delete_multiple_x509keypair_by_name(self):
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=uuidutils.generate_uuid())
obj_utils.create_test_x509keypair(self.context,
name='test_x509keypair',
uuid=uuidutils.generate_uuid())
response = self.delete('/x509keypairs/test_x509keypair',
expect_errors=True)
self.assertEqual(409, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['errors'])

View File

@ -18,7 +18,6 @@ import pytz
from magnum.api.controllers.v1 import bay as bay_controller
from magnum.api.controllers.v1 import baymodel as baymodel_controller
from magnum.api.controllers.v1 import x509keypair as x509keypair_controller
from magnum.tests.unit.db import utils
@ -49,12 +48,6 @@ def cert_post_data(**kw):
}
def x509keypair_post_data(**kw):
x509keypair = utils.get_test_x509keypair(**kw)
internal = x509keypair_controller.X509KeyPairPatchType.internal_attrs()
return remove_internal(x509keypair, internal)
def mservice_get_data(**kw):
"""Simulate what the RPC layer will get from DB """
faketime = datetime.datetime(2001, 1, 1, tzinfo=pytz.UTC)

View File

@ -1,43 +0,0 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import patch
from magnum.conductor.handlers import x509keypair_conductor
from magnum import objects
from magnum.tests import base
class TestX509KeyPairConductor(base.TestCase):
def setUp(self):
super(TestX509KeyPairConductor, self).setUp()
self.x509keypair_handler = x509keypair_conductor.Handler()
def test_x509keypair_create(self):
expected_x509keypair = objects.X509KeyPair({})
expected_x509keypair.create = mock.MagicMock()
self.x509keypair_handler.x509keypair_create(self.context,
expected_x509keypair)
expected_x509keypair.create.assert_called_once_with(self.context)
@patch('magnum.objects.X509KeyPair.get_by_uuid')
def test_x509keypair_delete(self, mock_x509keypair_get_by_uuid):
mock_x509keypair = mock.MagicMock()
mock_x509keypair.name = 'test-x509keypair'
mock_x509keypair.uuid = 'test-uuid'
mock_x509keypair_get_by_uuid.return_value = mock_x509keypair
self.x509keypair_handler.x509keypair_delete(self.context, "test-uuid")
mock_x509keypair.destroy.assert_called_once_with(self.context)

View File

@ -31,8 +31,6 @@ class RPCAPITestCase(base.DbTestCase):
self.fake_container = dbutils.get_test_container(driver='fake-driver')
self.fake_rc = dbutils.get_test_rc(driver='fake-driver')
self.fake_service = dbutils.get_test_service(driver='fake-driver')
self.fake_x509keypair = dbutils.get_test_x509keypair(
driver='fake-driver')
self.fake_certificate = objects.Certificate.from_db_bay(self.fake_bay)
self.fake_certificate.csr = 'fake-csr'
@ -221,23 +219,6 @@ class RPCAPITestCase(base.DbTestCase):
rpcapi_cls=conductor_rpcapi.ListenerAPI,
version='1.0')
def test_x509keypair_create(self):
self._test_rpcapi('x509keypair_create',
'call',
version='1.0',
x509keypair=self.fake_x509keypair)
def test_x509keypair_delete(self):
self._test_rpcapi('x509keypair_delete',
'call',
version='1.0',
uuid=self.fake_x509keypair['uuid'])
self._test_rpcapi('x509keypair_delete',
'call',
version='1.1',
uuid=self.fake_x509keypair['name'])
def test_sign_certificate(self):
self._test_rpcapi('sign_certificate',
'call',

View File

@ -29,9 +29,6 @@ class DbX509KeyPairTestCase(base.DbTestCase):
def test_create_x509keypair(self):
utils.create_test_x509keypair()
def test_create_x509keypair_nullable_bay_uuid(self):
utils.create_test_x509keypair(bay_uuid=None)
def test_create_x509keypair_already_exists(self):
utils.create_test_x509keypair()
self.assertRaises(exception.X509KeyPairAlreadyExists,
@ -43,13 +40,6 @@ class DbX509KeyPairTestCase(base.DbTestCase):
self.assertEqual(x509keypair.id, res.id)
self.assertEqual(x509keypair.uuid, res.uuid)
def test_get_x509keypair_by_name(self):
x509keypair = utils.create_test_x509keypair()
res = self.dbapi.get_x509keypair_by_name(self.context,
x509keypair.name)
self.assertEqual(x509keypair.name, res.name)
self.assertEqual(x509keypair.uuid, res.uuid)
def test_get_x509keypair_by_uuid(self):
x509keypair = utils.create_test_x509keypair()
res = self.dbapi.get_x509keypair_by_uuid(self.context,
@ -76,41 +66,6 @@ class DbX509KeyPairTestCase(base.DbTestCase):
res_uuids = [r.uuid for r in res]
self.assertEqual(sorted(uuids), sorted(res_uuids))
def test_get_x509keypair_list_with_filters(self):
bay1 = utils.get_test_bay(id=1, uuid=uuidutils.generate_uuid())
bay2 = utils.get_test_bay(id=2, uuid=uuidutils.generate_uuid())
self.dbapi.create_bay(bay1)
self.dbapi.create_bay(bay2)
x509keypair1 = utils.create_test_x509keypair(
name='x509keypair-one',
uuid=uuidutils.generate_uuid(),
bay_uuid=bay1['uuid'])
x509keypair2 = utils.create_test_x509keypair(
name='x509keypair-two',
uuid=uuidutils.generate_uuid(),
bay_uuid=bay2['uuid'])
x509keypair3 = utils.create_test_x509keypair(
name='x509keypair-three',
bay_uuid=bay2['uuid'])
res = self.dbapi.get_x509keypair_list(
self.context, filters={'bay_uuid': bay1['uuid']})
self.assertEqual([x509keypair1.id], [r.id for r in res])
res = self.dbapi.get_x509keypair_list(
self.context, filters={'bay_uuid': bay2['uuid']})
self.assertEqual([x509keypair2.id, x509keypair3.id],
[r.id for r in res])
res = self.dbapi.get_x509keypair_list(
self.context, filters={'name': 'x509keypair-one'})
self.assertEqual([x509keypair1.id], [r.id for r in res])
res = self.dbapi.get_x509keypair_list(
self.context, filters={'name': 'bad-x509keypair'})
self.assertEqual([], [r.id for r in res])
def test_get_x509keypair_list_by_admin_all_tenants(self):
uuids = []
for i in range(1, 6):
@ -124,13 +79,6 @@ class DbX509KeyPairTestCase(base.DbTestCase):
res_uuids = [r.uuid for r in res]
self.assertEqual(sorted(uuids), sorted(res_uuids))
def test_get_x509keypair_list_bay_not_exist(self):
utils.create_test_x509keypair()
self.assertEqual(1, len(self.dbapi.get_x509keypair_list(self.context)))
res = self.dbapi.get_x509keypair_list(self.context, filters={
'bay_uuid': uuidutils.generate_uuid()})
self.assertEqual(0, len(res))
def test_destroy_x509keypair(self):
x509keypair = utils.create_test_x509keypair()
self.assertIsNotNone(self.dbapi.get_x509keypair_by_id(

View File

@ -212,15 +212,14 @@ def get_test_x509keypair(**kw):
return {
'id': kw.get('id', 42),
'uuid': kw.get('uuid', '72625085-c507-4410-9b28-cd7cf1fbf1ad'),
'name': kw.get('name', 'x509keypair1'),
'project_id': kw.get('project_id', 'fake_project'),
'user_id': kw.get('user_id', 'fake_user'),
'bay_uuid': kw.get('bay_uuid',
'5d12f6fd-a196-4bf0-ae4c-1f639a523a52'),
'ca_cert': kw.get('ca_cert', 'client_ca'),
'certificate': kw.get('certificate',
'certificate'),
'private_key': kw.get('private_key', 'private_key'),
'private_key_passphrase': kw.get('private_key_passphrase',
'private_key_passphrase'),
'intermediates': kw.get('intermediates', 'intermediates'),
'created_at': kw.get('created_at'),
'updated_at': kw.get('updated_at'),
}

View File

@ -364,7 +364,7 @@ object_data = {
'MyObj': '1.0-b43567e512438205e32f4e95ca616697',
'ReplicationController': '1.0-a471c2429c212ed91833cfcf0f934eab',
'Service': '1.0-f4a1c5a4618708824a553568c1ada0ea',
'X509KeyPair': '1.1-4aecc268e23e32b8a762d43ba1a4b159',
'X509KeyPair': '1.2-d81950af36c59a71365e33ce539d24f9',
'MagnumService': '1.0-2d397ec59b0046bd5ec35cd3e06efeca',
}

View File

@ -48,15 +48,6 @@ class TestX509KeyPairObject(base.DbTestCase):
mock_get_x509keypair.assert_called_once_with(self.context, uuid)
self.assertEqual(self.context, x509keypair._context)
def test_get_by_name(self):
name = self.fake_x509keypair['name']
with mock.patch.object(self.dbapi, 'get_x509keypair_by_name',
autospec=True) as mock_get_x509keypair:
mock_get_x509keypair.return_value = self.fake_x509keypair
x509keypair = objects.X509KeyPair.get_by_name(self.context, name)
mock_get_x509keypair.assert_called_once_with(self.context, name)
self.assertEqual(self.context, x509keypair._context)
def test_get_bad_id_and_uuid(self):
self.assertRaises(exception.InvalidIdentity,
objects.X509KeyPair.get, self.context, 'not-a-uuid')
@ -85,23 +76,6 @@ class TestX509KeyPairObject(base.DbTestCase):
self.assertIsInstance(x509keypairs[0], objects.X509KeyPair)
self.assertEqual(self.context, x509keypairs[0]._context)
def test_list_with_filters(self):
with mock.patch.object(self.dbapi, 'get_x509keypair_list',
autospec=True) as mock_get_list:
mock_get_list.return_value = [self.fake_x509keypair]
filters = {'name': 'x509keypair1'}
x509keypairs = objects.X509KeyPair.list(self.context,
filters=filters)
mock_get_list.assert_called_once_with(self.context, sort_key=None,
sort_dir=None,
filters=filters, limit=None,
marker=None)
self.assertEqual(1, mock_get_list.call_count)
self.assertThat(x509keypairs, HasLength(1))
self.assertIsInstance(x509keypairs[0], objects.X509KeyPair)
self.assertEqual(self.context, x509keypairs[0]._context)
def test_create(self):
with mock.patch.object(self.dbapi, 'create_x509keypair',
autospec=True) as mock_create_x509keypair: