Implement microversions, bring back secret consumers API

The implementation follows nova and implements an is_supported
function, that can be used in controllers, to check the requested
version and take different code paths depending on the result.

This reverts commit 7b14d983e0.

Change-Id: I5651a69f93288ac1dfdc1c8b1ad0f904e370c127
This commit is contained in:
Grzegorz Grasza 2022-02-10 19:38:16 +00:00 committed by Douglas Mendizábal
parent 935c7158b0
commit 0f74877c77
10 changed files with 459 additions and 345 deletions

View File

@ -19,6 +19,7 @@ from barbican.api import controllers
from barbican.api.controllers import acls
from barbican.api.controllers import consumers
from barbican.api.controllers import secretmeta
from barbican.api.controllers import versions
from barbican.common import accept
from barbican.common import exception
from barbican.common import hrefs
@ -118,6 +119,9 @@ class SecretController(controllers.ACLMixin):
LOG.info('Retrieved secret metadata for project: %s',
external_project_id)
if versions.is_supported(pecan.request, max_version='1.0'):
# NOTE(xek): consumers are being introduced in 1.1
del resp['consumers']
return resp
else:
LOG.warning('Decrypted secret %s requested using deprecated '
@ -360,8 +364,14 @@ class SecretsController(controllers.ACLMixin):
@controllers.handle_exceptions(u._('Secret(s) retrieval'))
@controllers.enforce_rbac('secrets:get')
def on_get(self, external_project_id, **kw):
no_consumers = versions.is_supported(pecan.request, max_version='1.0')
# NOTE(xek): consumers are being introduced in 1.1
def secret_fields(field):
return putil.mime_types.augment_fields_with_content_types(field)
resp = putil.mime_types.augment_fields_with_content_types(field)
if no_consumers:
del resp['consumers']
return resp
LOG.debug('Start secrets on_get '
'for project-ID %s:', external_project_id)

View File

@ -26,12 +26,49 @@ from barbican import version
LOG = utils.getLogger(__name__)
_MIN_MICROVERSION = 0
_MAX_MICROVERSION = 1
_LAST_UPDATED = '2021-02-10T00:00:00Z'
# NOTE(xek): The above defines the minimum and maximum version of the API
# across all of the v1 REST API.
# When introducing a new microversion, the _MAX_MICROVERSION
# needs to be incremented by 1 and the _LAST_UPDATED string updated.
# The following is the complete (ordered) list of supported versions
# used by the microversion middleware to parse what is allowed and
# supported.
VERSIONS = ['1.{}'.format(v) for v in range(_MIN_MICROVERSION,
_MAX_MICROVERSION + 1)]
MIN_API_VERSION = VERSIONS[0]
MAX_API_VERSION = VERSIONS[-1]
MIME_TYPE_JSON = 'application/json'
MIME_TYPE_JSON_HOME = 'application/json-home'
MEDIA_TYPE_JSON = 'application/vnd.openstack.key-manager-%s+json'
def is_supported(req, min_version=MIN_API_VERSION,
max_version=MAX_API_VERSION):
"""Check if API request version satisfies version restrictions.
:param req: request object
:param min_version: minimal version of API needed for correct
request processing
:param max_version: maximum version of API needed for correct
request processing
:returns: True if request satisfies minimal and maximum API version
requirements. False in other case.
"""
requested_version = str(req.environ.get('key-manager.microversion',
MIN_API_VERSION))
return (VERSIONS.index(max_version) >=
VERSIONS.index(requested_version) >=
VERSIONS.index(min_version))
def _version_not_found():
"""Throw exception indicating version not found."""
pecan.abort(404, u._("The version you requested wasn't found"))
@ -84,7 +121,9 @@ class V1Controller(BaseVersionController):
# this is the same as the version string.
version_id = 'v1'
last_updated = '2015-04-28T00:00:00Z'
version = MAX_API_VERSION
min_version = MIN_API_VERSION
last_updated = _LAST_UPDATED
def __init__(self):
LOG.debug('=== Creating V1Controller ===')

View File

@ -0,0 +1,35 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A filter middleware that just outputs to logs, for instructive/sample
purposes only.
"""
from microversion_parse import middleware as microversion_middleware
from barbican.api.controllers import versions
from barbican.api import middleware
from barbican.common import utils
LOG = utils.getLogger(__name__)
class MicroversionMiddleware(
microversion_middleware.MicroversionMiddleware,
middleware.Middleware):
def __init__(self, app):
super(MicroversionMiddleware, self).__init__(
app, 'key-manager', versions.VERSIONS)

View File

@ -374,14 +374,13 @@ class Secret(BASE, SoftDeleteMixIn, ModelBase):
'bit_length': self.bit_length,
'mode': self.mode,
'creator_id': self.creator_id,
# TODO(redrobot): Uncomment this after adding microversions
# "consumers": [
# {
# "service": consumer.service,
# "resource_type": consumer.resource_type,
# "resource_id": consumer.resource_id,
# } for consumer in self.consumers if not consumer.deleted
# ],
"consumers": [
{
"service": consumer.service,
"resource_type": consumer.resource_type,
"resource_id": consumer.resource_id,
} for consumer in self.consumers if not consumer.deleted
],
}

View File

@ -329,335 +329,335 @@ class WhenTestingContainerConsumersResource(utils.BarbicanAPIBaseTestCase):
)
self.assertEqual(400, consumer_resp.status_int)
# TODO(redrobot): Uncomment this after adding microversion
# class WhenTestingSecretConsumersResource(utils.BarbicanAPIBaseTestCase):
#
# def setUp(self):
# super(WhenTestingSecretConsumersResource, self).setUp()
#
# self.consumer_a = {
# "service": "service_a",
# "resource_type": "resource_type_a",
# "resource_id": "resource_id_a",
# }
#
# self.consumer_b = {
# "service": "service_b",
# "resource_type": "resource_type_b",
# "resource_id": "resource_id_b",
# }
#
# self.consumer_c = {
# "service": "service_c",
# "resource_type": "resource_type_c",
# "resource_id": "resource_id_c",
# }
#
# def test_can_create_new_consumer(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_a["service"],
# resource_type=self.consumer_a["resource_type"],
# resource_id=self.consumer_a["resource_id"],
# )
#
# self.assertEqual(200, consumer_resp.status_int)
# self.assertEqual([self.consumer_a], consumer)
#
# def test_can_get_consumers(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_a["service"],
# resource_type=self.consumer_a["resource_type"],
# resource_id=self.consumer_a["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_b["service"],
# resource_type=self.consumer_b["resource_type"],
# resource_id=self.consumer_b["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_c["service"],
# resource_type=self.consumer_c["resource_type"],
# resource_id=self.consumer_c["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_get_resp = self.app.get(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id=secret_id))
#
# self.assertEqual(200, consumer_get_resp.status_int)
# self.assertIn(consumers[0]["service"],
# consumer_get_resp.json["consumers"][0]["service"])
# self.assertIn(consumers[0]["resource_type"],
# consumer_get_resp.json["consumers"][0]["resource_type"])
# self.assertIn(consumers[0]["resource_id"],
# consumer_get_resp.json["consumers"][0]["resource_id"])
# self.assertIn(consumers[1]["service"],
# consumer_get_resp.json["consumers"][1]["service"])
# self.assertIn(consumers[1]["resource_type"],
# consumer_get_resp.json["consumers"][1]["resource_type"])
# self.assertIn(consumers[1]["resource_id"],
# consumer_get_resp.json["consumers"][1]["resource_id"])
# self.assertIn(consumers[2]["service"],
# consumer_get_resp.json["consumers"][2]["service"])
# self.assertIn(consumers[2]["resource_type"],
# consumer_get_resp.json["consumers"][2]["resource_type"])
# self.assertIn(consumers[2]["resource_id"],
# consumer_get_resp.json["consumers"][2]["resource_id"])
#
# def test_can_get_consumers_with_limit_and_offset(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_a["service"],
# resource_type=self.consumer_a["resource_type"],
# resource_id=self.consumer_a["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_b["service"],
# resource_type=self.consumer_b["resource_type"],
# resource_id=self.consumer_b["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_c["service"],
# resource_type=self.consumer_c["resource_type"],
# resource_id=self.consumer_c["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# consumer_get_resp = self.app.get(
# '/secrets/{secret_id}/consumers/?limit=1&offset=1'.format(
# secret_id=secret_id))
# self.assertEqual(200, consumer_get_resp.status_int)
#
# secret_url = resp.json["secret_ref"]
#
# prev_cons = u"{secret_url}/consumers?limit=1&offset=0".format(
# secret_url=secret_url)
# self.assertEqual(prev_cons, consumer_get_resp.json["previous"])
#
# next_cons = u"{secret_url}/consumers?limit=1&offset=2".format(
# secret_url=secret_url)
# self.assertEqual(next_cons, consumer_get_resp.json["next"])
#
# self.assertEqual(
# self.consumer_b["service"],
# consumer_get_resp.json["consumers"][0]["service"]
# )
# self.assertEqual(
# self.consumer_b["resource_type"],
# consumer_get_resp.json["consumers"][0]["resource_type"]
# )
# self.assertEqual(
# self.consumer_b["resource_id"],
# consumer_get_resp.json["consumers"][0]["resource_id"]
# )
#
# self.assertEqual(3, consumer_get_resp.json["total"])
#
# def test_can_delete_consumer(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service=self.consumer_a["service"],
# resource_type=self.consumer_a["resource_type"],
# resource_id=self.consumer_a["resource_id"],
# )
# self.assertEqual(200, consumer_resp.status_int)
#
# request = {
# "service": self.consumer_a["service"],
# "resource_type": self.consumer_a["resource_type"],
# "resource_id": self.consumer_a["resource_id"],
# }
# cleaned_request = {key: val for key, val in request.items()
# if val is not None}
#
# consumer_del_resp = self.app.delete_json(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id=secret_id
# ), cleaned_request, headers={'Content-Type': 'application/json'})
#
# self.assertEqual(200, consumer_del_resp.status_int)
#
# def test_can_get_no_consumers(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_get_resp = self.app.get(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id=secret_id))
#
# self.assertEqual(200, consumer_get_resp.status_int)
# self.assertEqual([], consumer_get_resp.json["consumers"])
#
# def test_fail_create_secret_not_found(self):
# consumer_resp, consumers = create_secret_consumer(
# self.app,
# secret_id="bad_secret_id",
# service=self.consumer_a["service"],
# resource_type=self.consumer_a["resource_type"],
# resource_id=self.consumer_a["resource_id"],
# expect_errors=True
# )
# self.assertEqual(404, consumer_resp.status_int)
#
# def test_fail_get_secret_not_found(self):
# consumer_get_resp = self.app.get(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id="bad_secret_id"), expect_errors=True)
#
# self.assertEqual(404, consumer_get_resp.status_int)
#
# def test_fail_delete_secret_not_found(self):
# request = {
# "service": self.consumer_a["service"],
# "resource_type": self.consumer_a["resource_type"],
# "resource_id": self.consumer_a["resource_id"],
# }
# cleaned_request = {key: val for key, val in request.items()
# if val is not None}
#
# consumer_del_resp = self.app.delete_json(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id="bad_secret_id"
# ), cleaned_request, headers={'Content-Type': 'application/json'},
# expect_errors=True)
#
# self.assertEqual(404, consumer_del_resp.status_int)
#
# def test_fail_delete_consumer_not_found(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# request = {
# "service": self.consumer_a["service"],
# "resource_type": self.consumer_a["resource_type"],
# "resource_id": self.consumer_a["resource_id"],
# }
# cleaned_request = {key: val for key, val in request.items()
# if val is not None}
#
# consumer_del_resp = self.app.delete_json(
# '/secrets/{secret_id}/consumers/'.format(
# secret_id=secret_id
# ), cleaned_request, headers={'Content-Type': 'application/json'},
# expect_errors=True)
#
# self.assertEqual(404, consumer_del_resp.status_int)
#
# def test_fail_create_no_service(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# resource_type="resource_type",
# resource_id="resource_id",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
#
# def test_fail_create_no_resource_type(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service="service",
# resource_id="resource_id",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
#
# def test_fail_create_no_resource_id(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service="service",
# resource_type="resource_type",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
#
# def test_fail_create_empty_service(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service="",
# resource_type="resource_type",
# resource_id="resource_id",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
#
# def test_fail_create_empty_resource_type(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service="service",
# resource_type="",
# resource_id="resource_id",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
#
# def test_fail_create_empty_resource_id(self):
# resp, secret_id = create_secret(self.app)
# self.assertEqual(201, resp.status_int)
#
# consumer_resp, consumer = create_secret_consumer(
# self.app,
# secret_id=secret_id,
# service="service",
# resource_type="resource_type",
# resource_id="",
# expect_errors=True
# )
# self.assertEqual(400, consumer_resp.status_int)
class WhenTestingSecretConsumersResource(utils.BarbicanAPIBaseTestCase):
def setUp(self):
super(WhenTestingSecretConsumersResource, self).setUp()
self.consumer_a = {
"service": "service_a",
"resource_type": "resource_type_a",
"resource_id": "resource_id_a",
}
self.consumer_b = {
"service": "service_b",
"resource_type": "resource_type_b",
"resource_id": "resource_id_b",
}
self.consumer_c = {
"service": "service_c",
"resource_type": "resource_type_c",
"resource_id": "resource_id_c",
}
def test_can_create_new_consumer(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
self.assertEqual([self.consumer_a], consumer)
def test_can_get_consumers(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_b["service"],
resource_type=self.consumer_b["resource_type"],
resource_id=self.consumer_b["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_c["service"],
resource_type=self.consumer_c["resource_type"],
resource_id=self.consumer_c["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
self.assertIn(consumers[0]["service"],
consumer_get_resp.json["consumers"][0]["service"])
self.assertIn(consumers[0]["resource_type"],
consumer_get_resp.json["consumers"][0]["resource_type"])
self.assertIn(consumers[0]["resource_id"],
consumer_get_resp.json["consumers"][0]["resource_id"])
self.assertIn(consumers[1]["service"],
consumer_get_resp.json["consumers"][1]["service"])
self.assertIn(consumers[1]["resource_type"],
consumer_get_resp.json["consumers"][1]["resource_type"])
self.assertIn(consumers[1]["resource_id"],
consumer_get_resp.json["consumers"][1]["resource_id"])
self.assertIn(consumers[2]["service"],
consumer_get_resp.json["consumers"][2]["service"])
self.assertIn(consumers[2]["resource_type"],
consumer_get_resp.json["consumers"][2]["resource_type"])
self.assertIn(consumers[2]["resource_id"],
consumer_get_resp.json["consumers"][2]["resource_id"])
def test_can_get_consumers_with_limit_and_offset(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_b["service"],
resource_type=self.consumer_b["resource_type"],
resource_id=self.consumer_b["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_c["service"],
resource_type=self.consumer_c["resource_type"],
resource_id=self.consumer_c["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/?limit=1&offset=1'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
secret_url = resp.json["secret_ref"]
prev_cons = u"{secret_url}/consumers?limit=1&offset=0".format(
secret_url=secret_url)
self.assertEqual(prev_cons, consumer_get_resp.json["previous"])
next_cons = u"{secret_url}/consumers?limit=1&offset=2".format(
secret_url=secret_url)
self.assertEqual(next_cons, consumer_get_resp.json["next"])
self.assertEqual(
self.consumer_b["service"],
consumer_get_resp.json["consumers"][0]["service"]
)
self.assertEqual(
self.consumer_b["resource_type"],
consumer_get_resp.json["consumers"][0]["resource_type"]
)
self.assertEqual(
self.consumer_b["resource_id"],
consumer_get_resp.json["consumers"][0]["resource_id"]
)
self.assertEqual(3, consumer_get_resp.json["total"])
def test_can_delete_consumer(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id=secret_id,
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
)
self.assertEqual(200, consumer_resp.status_int)
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id
), cleaned_request, headers={'Content-Type': 'application/json'})
self.assertEqual(200, consumer_del_resp.status_int)
def test_can_get_no_consumers(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id))
self.assertEqual(200, consumer_get_resp.status_int)
self.assertEqual([], consumer_get_resp.json["consumers"])
def test_fail_create_secret_not_found(self):
consumer_resp, consumers = create_secret_consumer(
self.app,
secret_id="bad_secret_id",
service=self.consumer_a["service"],
resource_type=self.consumer_a["resource_type"],
resource_id=self.consumer_a["resource_id"],
expect_errors=True
)
self.assertEqual(404, consumer_resp.status_int)
def test_fail_get_secret_not_found(self):
consumer_get_resp = self.app.get(
'/secrets/{secret_id}/consumers/'.format(
secret_id="bad_secret_id"), expect_errors=True)
self.assertEqual(404, consumer_get_resp.status_int)
def test_fail_delete_secret_not_found(self):
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id="bad_secret_id"
), cleaned_request, headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(404, consumer_del_resp.status_int)
def test_fail_delete_consumer_not_found(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
request = {
"service": self.consumer_a["service"],
"resource_type": self.consumer_a["resource_type"],
"resource_id": self.consumer_a["resource_id"],
}
cleaned_request = {key: val for key, val in request.items()
if val is not None}
consumer_del_resp = self.app.delete_json(
'/secrets/{secret_id}/consumers/'.format(
secret_id=secret_id
), cleaned_request, headers={'Content-Type': 'application/json'},
expect_errors=True)
self.assertEqual(404, consumer_del_resp.status_int)
def test_fail_create_no_service(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
resource_type="resource_type",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_no_resource_type(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_no_resource_id(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="resource_type",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_service(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="",
resource_type="resource_type",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_resource_type(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="",
resource_id="resource_id",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
def test_fail_create_empty_resource_id(self):
resp, secret_id = create_secret(self.app)
self.assertEqual(201, resp.status_int)
consumer_resp, consumer = create_secret_consumer(
self.app,
secret_id=secret_id,
service="service",
resource_type="resource_type",
resource_id="",
expect_errors=True
)
self.assertEqual(400, consumer_resp.status_int)
# ----------------------- Helper Functions ---------------------------

View File

@ -224,7 +224,7 @@ class WhenGettingSecretsList(utils.BarbicanAPIBaseTestCase):
secret_list = get_resp.json.get('secrets')
self.assertEqual('secret mission', secret_list[0].get('name'))
def test_list_secrets(self):
def _test_list_secrets(self):
# Creating a secret to be retrieved later
create_resp, _ = create_secret(
self.app,
@ -240,6 +240,17 @@ class WhenGettingSecretsList(utils.BarbicanAPIBaseTestCase):
secret_list = get_resp.json.get('secrets')
self.assertGreater(len(secret_list), 0)
return secret_list
def test_list_secrets_v0(self):
secret_list = self._test_list_secrets()
self.assertNotIn('consumers', secret_list[0])
def test_list_secrets_v1(self):
utils.set_version(self.app, '1.1')
secret_list = self._test_list_secrets()
self.assertIn('consumers', secret_list[0])
def test_pagination_attributes(self):
# Create a list of secrets greater than default limit (10)
for _ in range(11):

View File

@ -383,7 +383,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.transport_key_id = 'tkey12345'
@mock.patch('barbican.plugin.resources.get_transport_key_id_for_retrieval')
def test_should_get_secret_as_json(self, mock_get_transport_key):
def _test_should_get_secret_as_json(self, mock_get_transport_key):
mock_get_transport_key.return_value = None
resp = self.app.get(
'/secrets/{0}/'.format(self.secret.id),
@ -400,6 +400,17 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
resp.namespace['content_types'].values())
self.assertNotIn('mime_type', resp.namespace)
return resp.json
def test_should_get_secret_as_json_v0(self):
secret = self._test_should_get_secret_as_json()
self.assertNotIn('consumers', secret)
def test_should_get_secret_as_json_v1(self):
utils.set_version(self.app, '1.1')
secret = self._test_should_get_secret_as_json()
self.assertIn('consumers', secret)
@testcase.attr('deprecated')
@mock.patch('barbican.plugin.resources.get_secret')
def test_should_get_secret_as_plain_based_on_content_type(self,

View File

@ -684,3 +684,8 @@ def is_pkcs11_enabled():
class DummyClassForTesting(object):
pass
def set_version(app, version):
"""Sets the requested version in the environ"""
app.extra_environ['key-manager.microversion'] = version

View File

@ -10,20 +10,20 @@ pipeline = cors http_proxy_to_wsgi versionapp
# Use this pipeline for Barbican API - DEFAULT no authentication
[pipeline:barbican_api]
pipeline = cors http_proxy_to_wsgi unauthenticated-context apiapp
pipeline = cors http_proxy_to_wsgi unauthenticated-context microversion apiapp
#Use this pipeline to activate a repoze.profile middleware and HTTP port,
# to provide profiling information for the REST API processing.
[pipeline:barbican-profile]
pipeline = cors http_proxy_to_wsgi unauthenticated-context egg:Paste#cgitb egg:Paste#httpexceptions profile apiapp
pipeline = cors http_proxy_to_wsgi unauthenticated-context microversion egg:Paste#cgitb egg:Paste#httpexceptions profile apiapp
#Use this pipeline for keystone auth
[pipeline:barbican-api-keystone]
pipeline = cors http_proxy_to_wsgi authtoken context apiapp
pipeline = cors http_proxy_to_wsgi authtoken context microversion apiapp
#Use this pipeline for keystone auth with audit feature
[pipeline:barbican-api-keystone-audit]
pipeline = http_proxy_to_wsgi authtoken context audit apiapp
pipeline = http_proxy_to_wsgi authtoken context microversion audit apiapp
[app:apiapp]
paste.app_factory = barbican.api.app:create_main_app
@ -40,6 +40,9 @@ paste.filter_factory = barbican.api.middleware.context:UnauthenticatedContextMid
[filter:context]
paste.filter_factory = barbican.api.middleware.context:ContextMiddleware.factory
[filter:microversion]
paste.filter_factory = barbican.api.middleware.microversion:MicroversionMiddleware.factory
[filter:audit]
paste.filter_factory = keystonemiddleware.audit:filter_factory
audit_map_file = /etc/barbican/api_audit_map.conf

View File

@ -31,3 +31,4 @@ SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
stevedore>=1.20.0 # Apache-2.0
WebOb>=1.7.1 # MIT
castellan >= 1.2.1 # Apache-2.0
microversion-parse>=0.2.1 # Apache-2.0