remove project-id from resource URIs

All Barbican resources have a project-id in their URI. This helps to
correlate the resource with the specified project when no external
authentication mechanism is used. However, most Barbican deployments
would use Keystone to authenticate the API requests. With Keystone,
the project-id information is already obtained when the X-Auth-Token
header from the request is validated. This makes the project-id in the
URI redundant. This commit removes project-id from the URI.

Tests have been updated as well to not expect project-id in the URI.

Patch 3:
  removed unauthenticated-context from admin pipeline to fix
  tempest failure. version api doesn't need auth context now
Patch 4:
  refactored based on review comments
  removed the Noop policy enforcer class
Patch 5:
  rebased and fixed failing tests
  fixed issue in parsing performance_uri
Patch 6:
  updated functional tests to remove project-id from uri
  removed rbac check for versions controller
  expecting X-Project-ID to be present in request for all requests
  this needs to be fixed in the next pass when the admin paste
  pipeline is updated
Patch 7:
  fixed failing functional test
  refactoring based on review comments
Patch 8:
  rebased and fixed failing unit tests
Patch 9:
  fixed merge issue by removing keystone_id from few more lines
  and files
Patch 10:
  fixed failing functional tests (consumers)

Spec: https://review.openstack.org/#/c/100386
Client CR: https://review.openstack.org/#/c/112149
Change-Id: If54911718188eb26b7380331d0b61d70206522a5
Blueprint: https://blueprints.launchpad.net/barbican/+spec/api-remove-uri-tenant-id
This commit is contained in:
tsv 2014-07-05 22:53:12 -06:00
parent aed3b30ab3
commit d15f8db597
19 changed files with 294 additions and 284 deletions

View File

@ -67,20 +67,20 @@ class PecanAPI(pecan.Pecan):
super(PecanAPI, self).__init__(*args, **kwargs)
def route(self, req, node, path):
# Pop the tenant ID from the path
path = path.split('/')[1:]
first_path = path.pop(0)
# parse the first part of the URL. It could be the
# resource name or the ID of the performance controller
# example: /secrets
parts = path.split('/')
if len(parts) > 1:
first_path = parts[1]
else:
first_path = None
# Route to the special performance controller
if first_path == self.performance_uri:
return self.performance_controller.index, []
path = '/%s' % '/'.join(path)
controller, remainder = super(PecanAPI, self).route(req, node, path)
# Pass the tenant ID as the first argument to the controller
remainder = list(remainder)
remainder.insert(0, first_path)
return controller, remainder

View File

@ -30,29 +30,30 @@ def is_json_request_accept(req):
or req.accept.header_value == '*/*'
def _do_enforce_rbac(req, action_name, keystone_id=None):
def _get_barbican_context(req):
if 'barbican.context' in req.environ:
return req.environ['barbican.context']
else:
return None
def _do_enforce_rbac(req, action_name, ctx):
"""Enforce RBAC based on 'request' information."""
if action_name and 'barbican.context' in req.environ:
if action_name and ctx:
# Prepare credentials information.
ctx = req.environ['barbican.context'] # Placed here by context.py
# middleware
credentials = {
'roles': ctx.roles,
'user': ctx.user,
'tenant': ctx.tenant,
'tenant': ctx.tenant
}
# Verify keystone_id matches the tenant ID.
if keystone_id and keystone_id != ctx.tenant:
pecan.abort(403, u._("URI tenant does not match "
"authenticated tenant."))
# Enforce special case: secret GET decryption
if 'secret:get' == action_name and not is_json_request_accept(req):
action_name = 'secret:decrypt' # Override to perform special rules
# Enforce access controls.
if ctx.policy_enforcer:
ctx.policy_enforcer.enforce(action_name, {}, credentials,
do_raise=True)
@ -62,11 +63,20 @@ def enforce_rbac(action_name='default'):
def rbac_decorator(fn):
def enforcer(inst, *args, **kwargs):
# Enforce RBAC rules.
_do_enforce_rbac(pecan.request, action_name,
keystone_id=kwargs.get('keystone_id'))
# context placed here by context.py
# middleware
ctx = _get_barbican_context(pecan.request)
if ctx:
keystone_id = ctx.tenant
else:
keystone_id = None
_do_enforce_rbac(pecan.request, action_name, ctx)
# insert keystone_id as the first arg to the guarded method
args = list(args)
args.insert(0, keystone_id)
# Execute guarded method now.
return fn(inst, *args, **kwargs)

View File

@ -53,8 +53,7 @@ class ContainerConsumerController(object):
dict_fields = consumer.to_dict_fields()
return controllers.hrefs.convert_to_hrefs(
keystone_id,
controllers.hrefs.convert_to_hrefs(keystone_id, dict_fields)
controllers.hrefs.convert_to_hrefs(dict_fields)
)
@ -99,13 +98,11 @@ class ContainerConsumersController(object):
resp_ctrs_overall = {'consumers': [], 'total': total}
else:
resp_ctrs = [
controllers.hrefs.convert_to_hrefs(keystone_id,
c.to_dict_fields())
controllers.hrefs.convert_to_hrefs(c.to_dict_fields())
for c in consumers
]
resp_ctrs_overall = controllers.hrefs.add_nav_hrefs(
'consumers',
keystone_id,
offset,
limit,
total,
@ -172,9 +169,8 @@ class ContainerConsumersController(object):
controllers.containers.container_not_found()
for secret_ref in dict_fields['secret_refs']:
controllers.hrefs.convert_to_hrefs(keystone_id, secret_ref)
controllers.hrefs.convert_to_hrefs(secret_ref)
return controllers.hrefs.convert_to_hrefs(
keystone_id,
controllers.hrefs.convert_to_hrefs(keystone_id, dict_fields)
controllers.hrefs.convert_to_hrefs(dict_fields)
)

View File

@ -60,18 +60,16 @@ class ContainerController(object):
dict_fields = container.to_dict_fields()
for secret_ref in dict_fields['secret_refs']:
controllers.hrefs.convert_to_hrefs(keystone_id, secret_ref)
controllers.hrefs.convert_to_hrefs(secret_ref)
return controllers.hrefs.convert_to_hrefs(
keystone_id,
controllers.hrefs.convert_to_hrefs(keystone_id, dict_fields)
controllers.hrefs.convert_to_hrefs(dict_fields)
)
@index.when(method='DELETE', template='')
@controllers.handle_exceptions(u._('Container deletion'))
@controllers.enforce_rbac('container:delete')
def on_delete(self, keystone_id, **kwargs):
try:
self.container_repo.delete_entity_by_id(
entity_id=self.container_id,
@ -120,13 +118,11 @@ class ContainersController(object):
resp_ctrs_overall = {'containers': [], 'total': total}
else:
resp_ctrs = [
controllers.hrefs.convert_to_hrefs(keystone_id,
c.to_dict_fields())
controllers.hrefs.convert_to_hrefs(c.to_dict_fields())
for c in containers
]
resp_ctrs_overall = controllers.hrefs.add_nav_hrefs(
'containers',
keystone_id,
offset,
limit,
total,
@ -167,6 +163,5 @@ class ContainersController(object):
pecan.response.headers['Location'] = '/{0}/containers/{1}'.format(
keystone_id, new_container.id
)
url = controllers.hrefs.convert_container_to_href(keystone_id,
new_container.id)
url = controllers.hrefs.convert_container_to_href(new_container.id)
return {'container_ref': url}

View File

@ -13,81 +13,78 @@
from barbican.common import utils
def convert_secret_to_href(keystone_id, secret_id):
"""Convert the tenant/secret IDs to a HATEOS-style href."""
def convert_secret_to_href(secret_id):
"""Convert the secret IDs to a HATEOS-style href."""
if secret_id:
resource = 'secrets/' + secret_id
else:
resource = 'secrets/????'
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
return utils.hostname_for_refs(resource=resource)
def convert_order_to_href(keystone_id, order_id):
"""Convert the tenant/order IDs to a HATEOS-style href."""
def convert_order_to_href(order_id):
"""Convert the order IDs to a HATEOS-style href."""
if order_id:
resource = 'orders/' + order_id
else:
resource = 'orders/????'
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
return utils.hostname_for_refs(resource=resource)
def convert_container_to_href(keystone_id, container_id):
"""Convert the tenant/container IDs to a HATEOS-style href."""
def convert_container_to_href(container_id):
"""Convert the container IDs to a HATEOS-style href."""
if container_id:
resource = 'containers/' + container_id
else:
resource = 'containers/????'
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
return utils.hostname_for_refs(resource=resource)
def convert_transport_key_to_href(keystone_id, transport_key_id):
def convert_transport_key_to_href(transport_key_id):
"""Convert the transport key IDs to a HATEOS-style href."""
if transport_key_id:
resource = 'transport_keys/' + transport_key_id
else:
resource = 'transport_keys/????'
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
return utils.hostname_for_refs(resource=resource)
#TODO(hgedikli) handle list of fields in here
def convert_to_hrefs(keystone_id, fields):
def convert_to_hrefs(fields):
"""Convert id's within a fields dict to HATEOS-style hrefs."""
if 'secret_id' in fields:
fields['secret_ref'] = convert_secret_to_href(keystone_id,
fields['secret_id'])
fields['secret_ref'] = convert_secret_to_href(fields['secret_id'])
del fields['secret_id']
if 'order_id' in fields:
fields['order_ref'] = convert_order_to_href(keystone_id,
fields['order_id'])
fields['order_ref'] = convert_order_to_href(fields['order_id'])
del fields['order_id']
if 'container_id' in fields:
fields['container_ref'] = \
convert_container_to_href(keystone_id, fields['container_id'])
convert_container_to_href(fields['container_id'])
del fields['container_id']
if 'transport_key_id' in fields:
fields['transport_key_ref'] = convert_transport_key_to_href(
keystone_id,
fields['transport_key_id'])
del fields['transport_key_id']
return fields
def convert_list_to_href(resources_name, keystone_id, offset, limit):
def convert_list_to_href(resources_name, offset, limit):
"""Supports pretty output of paged-list hrefs.
Convert the tenant ID and offset/limit info to a HATEOS-style href
Convert the offset/limit info to a HATEOS-style href
suitable for use in a list navigation paging interface.
"""
resource = '{0}?limit={1}&offset={2}'.format(resources_name, limit,
offset)
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
return utils.hostname_for_refs(resource=resource)
def previous_href(resources_name, keystone_id, offset, limit):
def previous_href(resources_name, offset, limit):
"""Supports pretty output of previous-page hrefs.
Create a HATEOS-style 'previous' href suitable for use in a list
@ -95,10 +92,10 @@ def previous_href(resources_name, keystone_id, offset, limit):
currently viewed page.
"""
offset = max(0, offset - limit)
return convert_list_to_href(resources_name, keystone_id, offset, limit)
return convert_list_to_href(resources_name, offset, limit)
def next_href(resources_name, keystone_id, offset, limit):
def next_href(resources_name, offset, limit):
"""Supports pretty output of next-page hrefs.
Create a HATEOS-style 'next' href suitable for use in a list
@ -106,15 +103,14 @@ def next_href(resources_name, keystone_id, offset, limit):
currently viewed page.
"""
offset = offset + limit
return convert_list_to_href(resources_name, keystone_id, offset, limit)
return convert_list_to_href(resources_name, offset, limit)
def add_nav_hrefs(resources_name, keystone_id, offset, limit,
def add_nav_hrefs(resources_name, offset, limit,
total_elements, data):
"""Adds next and/or previous hrefs to paged list responses.
:param resources_name: Name of api resource
:param keystone_id: Keystone id of the tenant
:param offset: Element number (ie. index) where current page starts
:param limit: Max amount of elements listed on current page
:param num_elements: Total number of elements
@ -122,12 +118,10 @@ def add_nav_hrefs(resources_name, keystone_id, offset, limit,
"""
if offset > 0:
data.update({'previous': previous_href(resources_name,
keystone_id,
offset,
limit)})
if total_elements > (offset + limit):
data.update({'next': next_href(resources_name,
keystone_id,
offset,
limit)})
return data

View File

@ -60,7 +60,7 @@ class OrderController(object):
if not order:
_order_not_found()
return hrefs.convert_to_hrefs(keystone_id, order.to_dict_fields())
return hrefs.convert_to_hrefs(order.to_dict_fields())
@index.when(method='PUT')
@controllers.handle_exceptions(u._('Order update'))
@ -115,10 +115,10 @@ class OrdersController(object):
'total': total}
else:
orders_resp = [
hrefs.convert_to_hrefs(keystone_id, o.to_dict_fields())
hrefs.convert_to_hrefs(o.to_dict_fields())
for o in orders
]
orders_resp_overall = hrefs.add_nav_hrefs('orders', keystone_id,
orders_resp_overall = hrefs.add_nav_hrefs('orders',
offset, limit, total,
{'orders': orders_resp})
orders_resp_overall.update({'total': total})
@ -168,5 +168,5 @@ class OrdersController(object):
pecan.response.headers['Location'] = '/{0}/orders/{1}'.format(
keystone_id, new_order.id
)
url = hrefs.convert_order_to_href(keystone_id, new_order.id)
url = hrefs.convert_order_to_href(new_order.id)
return {'order_ref': url}

View File

@ -104,7 +104,7 @@ class SecretController(object):
secret)
if transport_key_id is not None:
secret_fields['transport_key_id'] = transport_key_id
return hrefs.convert_to_hrefs(keystone_id, secret_fields)
return hrefs.convert_to_hrefs(secret_fields)
else:
tenant = res.get_or_create_tenant(keystone_id,
self.repos.tenant_repo)
@ -250,11 +250,11 @@ class SecretsController(object):
secret_fields = lambda sf: putil.mime_types\
.augment_fields_with_content_types(sf)
secrets_resp = [
hrefs.convert_to_hrefs(keystone_id, secret_fields(s))
hrefs.convert_to_hrefs(secret_fields(s))
for s in secrets
]
secrets_resp_overall = hrefs.add_nav_hrefs(
'secrets', keystone_id, offset, limit, total,
'secrets', offset, limit, total,
{'secrets': secrets_resp}
)
secrets_resp_overall.update({'total': total})
@ -285,14 +285,14 @@ class SecretsController(object):
transport_key_id=data.get('transport_key_id'))
pecan.response.status = 201
pecan.response.headers['Location'] = '/{0}/secrets/{1}'.format(
keystone_id, new_secret.id
pecan.response.headers['Location'] = '/secrets/{0}'.format(
new_secret.id
)
url = hrefs.convert_secret_to_href(keystone_id, new_secret.id)
url = hrefs.convert_secret_to_href(new_secret.id)
LOG.debug('URI to secret is {0}'.format(url))
if transport_key_model is not None:
tkey_url = hrefs.convert_transport_key_to_href(
keystone_id, transport_key_model.id)
transport_key_model.id)
return {'secret_ref': url, 'transport_key_ref': tkey_url}
else:
return {'secret_ref': url}

View File

@ -105,11 +105,11 @@ class TransportKeysController(object):
'total': total}
else:
transport_keys_resp = [
hrefs.convert_transport_key_to_href(keystone_id, s.id)
hrefs.convert_transport_key_to_href(s.id)
for s in transport_keys
]
transport_keys_resp_overall = hrefs.add_nav_hrefs(
'transport_keys', keystone_id, offset, limit, total,
'transport_keys', offset, limit, total,
{'transport_keys': transport_keys_resp}
)
transport_keys_resp_overall.update({'total': total})
@ -135,9 +135,9 @@ class TransportKeysController(object):
self.repo.create_from(new_key)
pecan.response.status = 201
pecan.response.headers['Location'] = '/{0}/transport_keys/{1}'.format(
keystone_id, new_key.id
pecan.response.headers['Location'] = '/transport_keys/{0}'.format(
new_key.id
)
url = hrefs.convert_transport_key_to_href(keystone_id, new_key.id)
url = hrefs.convert_transport_key_to_href(new_key.id)
LOG.debug('URI to transport key is {0}'.format(url))
return {'transport_key_ref': url}

View File

@ -27,7 +27,6 @@ class VersionController(object):
@pecan.expose('json')
@controllers.handle_exceptions(u._('Version retrieval'))
@controllers.enforce_rbac('version:get')
def index(self):
return {
'v1': 'current',

View File

@ -140,13 +140,27 @@ class ContextMiddleware(BaseContextMiddleware):
class UnauthenticatedContextMiddleware(BaseContextMiddleware):
def _get_project_id_from_header(self, req):
project_id = req.headers.get('X-Project-Id')
if not project_id:
accept_header = req.headers.get('Accept')
if not accept_header:
req.headers['Accept'] = 'text/plain'
raise webob.exc.HTTPBadRequest(detail=u._('Missing X-Project-Id'))
return project_id
def process_request(self, req):
"""Create a context without an authorized user."""
project_id = self._get_project_id_from_header(req)
kwargs = {
'user': None,
'tenant': None,
'tenant': project_id,
'roles': [],
'is_admin': True,
'is_admin': True
}
req.context = barbican.context.RequestContext(**kwargs)
context = barbican.context.RequestContext(**kwargs)
context.policy_enforcer = None
req.environ['barbican.context'] = context

View File

@ -38,12 +38,9 @@ CONF.register_opts(host_opts)
API_VERSION = 'v1'
def hostname_for_refs(keystone_id=None, resource=None):
def hostname_for_refs(resource=None):
"""Return the HATEOS-style return URI reference for this service."""
ref = ['{0}/{1}'.format(CONF.host_href, API_VERSION)]
if not keystone_id:
return ref[0]
ref.append('/' + keystone_id)
if resource:
ref.append('/' + resource)
return ''.join(ref)

View File

@ -35,6 +35,7 @@ from barbican.api import controllers
from barbican.api.controllers import hrefs
from barbican.common import exception as excep
from barbican.common import validators
import barbican.context
from barbican.model import models
from barbican.openstack.common import timeutils
@ -42,6 +43,21 @@ from barbican.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def get_barbican_env(keystone_id):
"""Create and return a barbican.context for use with
the RBAC decorator by injecting the provided
keystone_id
"""
kwargs = {'roles': None,
'user': None,
'tenant': keystone_id,
'is_admin': True}
ctx = barbican.context.RequestContext(**kwargs)
ctx.policy_enforcer = None
barbican_env = {'barbican.context': ctx}
return barbican_env
def create_secret(id_ref="id", name="name",
algorithm=None, bit_length=None,
mode=None, encrypted_datum=None):
@ -171,6 +187,7 @@ class BaseSecretsResource(FunctionalTest):
def setUp(self):
super(BaseSecretsResource, self).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -243,7 +260,6 @@ class BaseSecretsResource(FunctionalTest):
'default_plugin_name', 'XXXABCDEF')
self.transport_key_id = 'tkey12345'
self.tkey_url = hrefs.convert_transport_key_to_href(
self.keystone_id,
self.transport_key.id)
self.transport_key_repo = mock.MagicMock()
@ -255,7 +271,7 @@ class BaseSecretsResource(FunctionalTest):
self.secret_req.update({'expiration': expiration})
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req
)
@ -289,7 +305,7 @@ class BaseSecretsResource(FunctionalTest):
mock_store_secret.return_value = self.secret, None
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req
)
self.assertEqual(resp.status_int, 201)
@ -320,10 +336,7 @@ class BaseSecretsResource(FunctionalTest):
mock_store_secret.return_value = self.secret, None
self.secret_req['transport_key_id'] = self.transport_key_id
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
self.secret_req
)
resp = self.app.post_json('/secrets/', self.secret_req)
self.assertEqual(resp.status_int, 201)
expected = dict(self.secret_req)
@ -354,7 +367,7 @@ class BaseSecretsResource(FunctionalTest):
def _test_should_add_new_secret_metadata_without_payload(self):
self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
{'name': self.name}
)
@ -376,9 +389,8 @@ class BaseSecretsResource(FunctionalTest):
mock_store_secret):
mock_store_secret.return_value = self.secret, self.transport_key
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
{'name': self.name,
'transport_key_needed': 'true'}
)
@ -409,7 +421,7 @@ class BaseSecretsResource(FunctionalTest):
if self.payload_content_encoding:
self.secret_req['payload_content_encoding'] = \
self.payload_content_encoding
self.app.post_json('/%s/secrets/' % self.keystone_id, self.secret_req)
self.app.post_json('/secrets/', self.secret_req)
def _test_should_raise_due_to_payload_too_large(self):
big_text = ''.join(['A' for x
@ -427,7 +439,7 @@ class BaseSecretsResource(FunctionalTest):
self.payload_content_encoding
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -446,7 +458,7 @@ class BaseSecretsResource(FunctionalTest):
self.payload_content_encoding
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -491,7 +503,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
'payload': self.payload}
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -512,7 +524,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# 'payload': self.payload}
#
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req
# )
# self.assertEqual(resp.status_int, 201)
@ -525,7 +537,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# 'payload': self.payload}
#
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req
# )
# self.assertEqual(resp.status_int, 201)
@ -541,7 +553,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# 'payload': self.payload}
#
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req
# )
# self.assertEqual(resp.status_int, 201)
@ -555,7 +567,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# 'payload': self.payload}
#
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req
# )
# self.assertEqual(resp.status_int, 201)
@ -565,7 +577,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# self.secret_req = {'payload_content_type':
# 'text/plain'}
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req,
# expect_errors=True
# )
@ -575,7 +587,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
# 'text/plain',
# 'payload': 'somejunk'}
# resp = self.app.post_json(
# '/%s/secrets/' % self.keystone_id,
# '/secrets/',
# self.secret_req
# )
# self.assertEqual(resp.status_int, 201)
@ -627,7 +639,7 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
'payload_content_type': 'application/octet-stream'
}
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -645,7 +657,7 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
}
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -662,7 +674,7 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
}
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -680,7 +692,7 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
}
resp = self.app.post_json(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
self.secret_req,
expect_errors=True
)
@ -692,6 +704,7 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
def setUp(self):
super(WhenGettingSecretsListUsingSecretsResource, self).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -761,7 +774,7 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
self.params['name'] = urllib.quote_plus(self.name)
resp = self.app.get(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
dict((k, v) for k, v in self.params.items() if v is not None)
)
# Verify that the name is unquoted correctly in the
@ -782,7 +795,7 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
def test_should_get_list_secrets(self):
resp = self.app.get(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
dict((k, v) for k, v in self.params.items() if v is not None)
)
@ -811,7 +824,7 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
def test_response_should_include_total(self):
resp = self.app.get(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
dict((k, v) for k, v in self.params.items() if v is not None)
)
@ -823,7 +836,7 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
del self.secrets[:]
resp = self.app.get(
'/%s/secrets/' % self.keystone_id,
'/secrets/',
dict((k, v) for k, v in self.params.items() if v is not None)
)
@ -842,11 +855,10 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/{0}/secrets?limit={1}&offset={2}'.format(keystone_id,
limit,
return '/secrets?limit={0}&offset={1}'.format(limit,
offset)
else:
return '/{0}/secrets'.format(keystone_id)
return '/secrets'
class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
@ -855,6 +867,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
WhenGettingPuttingOrDeletingSecretUsingSecretResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -933,7 +946,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
def test_should_get_secret_as_json(self, mock_get_transport_key):
mock_get_transport_key.return_value = None
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
self.secret_repo \
@ -954,7 +967,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
mock_get_secret.return_value = data
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'text/plain'}
)
@ -979,8 +992,8 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
twsk = "trans_wrapped_session_key"
resp = self.app.get(
'/%s/secrets/%s/?trans_wrapped_session_key=%s&transport_key_id=%s'
% (self.keystone_id, self.secret.id, twsk, self.transport_key_id),
'/secrets/{0}/?trans_wrapped_session_key={1}&transport_key_id={2}'
.format(self.secret.id, twsk, self.transport_key_id),
headers={'Accept': 'text/plain'}
)
@ -1006,8 +1019,8 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
twsk = "trans_wrapped_session_key"
resp = self.app.get(
'/%s/secrets/%s/?trans_wrapped_session_key=%s'
% (self.keystone_id, self.secret.id, twsk),
'/secrets/{0}/?trans_wrapped_session_key={1}'.format(
self.secret.id, twsk),
headers={'Accept': 'text/plain'},
expect_errors=True
)
@ -1025,7 +1038,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.datum.cypher_text = 'aaaa'
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
@ -1049,8 +1062,8 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.datum.cypher_text = 'aaaa'
resp = self.app.get(
'/%s/secrets/%s/?transport_key_needed=true'
% (self.keystone_id, self.secret.id),
'/secrets/{0}/?transport_key_needed=true'.format(
self.secret.id),
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
@ -1069,7 +1082,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(
resp.namespace['transport_key_ref'],
hrefs.convert_transport_key_to_href(
self.keystone_id, self.transport_key_id)
self.transport_key_id)
)
@mock.patch('barbican.plugin.resources.get_secret')
@ -1081,7 +1094,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.datum.cypher_text = 'aaaa'
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={
'Accept': 'application/octet-stream',
'Accept-Encoding': 'gzip'
@ -1101,7 +1114,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret_repo.get.return_value = None
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'},
expect_errors=True
)
@ -1109,7 +1122,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
def test_should_throw_exception_for_get_when_accept_not_supported(self):
resp = self.app.get(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'bogusaccept', 'Accept-Encoding': 'gzip'},
expect_errors=True
)
@ -1120,7 +1133,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
)
@ -1138,8 +1151,8 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/?transport_key_id=%s' %
(self.keystone_id, self.secret.id, self.transport_key_id),
'/secrets/{0}/?transport_key_id={1}'.format(
self.secret.id, self.transport_key_id),
'plain text',
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
)
@ -1157,7 +1170,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={
'Accept': 'text/plain',
@ -1179,8 +1192,8 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/?transport_key_id=%s' %
(self.keystone_id, self.secret.id, self.transport_key_id),
'/secrets/{0}/?transport_key_id={1}'.format(
self.secret.id, self.transport_key_id),
'plain text',
headers={
'Accept': 'text/plain',
@ -1202,7 +1215,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
payload = base64.b64encode('plain text')
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
payload,
headers={
'Accept': 'text/plain',
@ -1222,7 +1235,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
def test_should_raise_to_put_secret_with_unsupported_encoding(self):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={
'Accept': 'text/plain',
@ -1237,7 +1250,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
def test_should_raise_put_secret_as_json(self):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={
'Accept': 'text/plain',
@ -1254,7 +1267,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
expect_errors=True
@ -1265,7 +1278,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
def test_should_raise_put_secret_no_payload(self):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
# response.body = None
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
expect_errors=True
@ -1278,7 +1291,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = [self.datum]
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'plain text',
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
expect_errors=True
@ -1289,7 +1302,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
'',
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
expect_errors=True
@ -1303,7 +1316,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret.encrypted_data = []
resp = self.app.put(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
big_text,
headers={'Accept': 'text/plain', 'Content-Type': 'text/plain'},
expect_errors=True
@ -1313,7 +1326,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
@mock.patch('barbican.plugin.resources.delete_secret')
def test_should_delete_secret(self, mock_delete_secret):
self.app.delete(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id)
'/secrets/{0}/'.format(self.secret.id)
)
mock_delete_secret\
@ -1323,7 +1336,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.secret_repo.get.return_value = None
resp = self.app.delete(
'/%s/secrets/%s/' % (self.keystone_id, self.secret.id),
'/secrets/{0}/'.format(self.secret.id),
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
@ -1337,6 +1350,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
WhenCreatingOrdersUsingOrdersResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -1385,7 +1399,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
def test_should_add_new_order(self):
resp = self.app.post_json(
'/%s/orders/' % self.tenant_keystone_id,
'/orders/',
self.order_req
)
self.assertEqual(resp.status_int, 202)
@ -1400,7 +1414,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
def test_should_raise_add_new_order_no_secret(self):
resp = self.app.post_json(
'/%s/orders/' % self.tenant_keystone_id,
'/orders/',
{},
expect_errors=True
)
@ -1408,7 +1422,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
def test_should_raise_add_new_order_bad_json(self):
resp = self.app.post(
'/%s/orders/' % self.tenant_keystone_id,
'/orders/',
'',
expect_errors=True,
headers={'Content-Type': 'application/json'},
@ -1428,7 +1442,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
}
}
resp = self.app.post_json(
'/%s/orders/' % self.tenant_keystone_id,
'/orders/',
self.unsupported_req,
expect_errors=True
)
@ -1437,7 +1451,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
def test_should_raise_add_new_order_no_content_type_header(self):
resp = self.app.post(
'/%s/orders/' % self.tenant_keystone_id,
'/orders/',
self.order_req,
expect_errors=True,
)
@ -1450,6 +1464,7 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
WhenGettingOrdersListUsingOrdersResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -1500,7 +1515,7 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
}
def test_should_get_list_orders(self):
resp = self.app.get('/%s/orders/' % self.keystone_id, self.params)
resp = self.app.get('/orders/', self.params)
self.order_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
@ -1524,7 +1539,7 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
(self.num_orders + 2))
def test_response_should_include_total(self):
resp = self.app.get('/%s/orders/' % self.keystone_id, self.params)
resp = self.app.get('/orders/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(resp.namespace['total'], self.total)
@ -1532,7 +1547,7 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
del self.orders[:]
resp = self.app.get('/%s/orders/' % self.keystone_id, self.params)
resp = self.app.get('/orders/', self.params)
self.order_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
@ -1547,11 +1562,10 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/{0}/orders?limit={1}&offset={2}'.format(keystone_id,
limit,
return '/orders?limit={0}&offset={1}'.format(limit,
offset)
else:
return '/{0}/orders'.format(self.keystone_id)
return '/orders'
class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
@ -1560,6 +1574,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
WhenGettingOrDeletingOrderUsingOrderResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -1586,8 +1601,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
self.queue_resource = mock.MagicMock()
def test_should_get_order(self):
self.app.get('/%s/orders/%s/' % (self.tenant_keystone_id,
self.order.id))
self.app.get('/orders/{0}/'.format(self.order.id))
self.order_repo.get \
.assert_called_once_with(entity_id=self.order.id,
@ -1595,8 +1609,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
suppress_exception=True)
def test_should_delete_order(self):
self.app.delete('/%s/orders/%s/' % (self.tenant_keystone_id,
self.order.id))
self.app.delete('/orders/{0}/'.format(self.order.id))
self.order_repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.order.id,
keystone_id=self.tenant_keystone_id)
@ -1604,7 +1617,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
def test_should_throw_exception_for_get_when_order_not_found(self):
self.order_repo.get.return_value = None
resp = self.app.get(
'/%s/orders/%s/' % (self.tenant_keystone_id, self.order.id),
'/orders/{0}/'.format(self.order.id),
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
@ -1613,7 +1626,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
self.order_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
resp = self.app.delete(
'/%s/orders/%s/' % (self.tenant_keystone_id, self.order.id),
'/orders/{0}/'.format(self.order.id),
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
@ -1636,7 +1649,6 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
self.keystone_id,
offset, limit,
self.num_elements,
self.data)
@ -1649,7 +1661,6 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
self.keystone_id,
offset, limit,
self.num_elements,
self.data)
@ -1662,7 +1673,6 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
self.keystone_id,
offset, limit,
self.num_elements,
self.data)
@ -1715,6 +1725,7 @@ class WhenCreatingContainersUsingContainersResource(FunctionalTest):
WhenCreatingContainersUsingContainersResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -1771,7 +1782,7 @@ class WhenCreatingContainersUsingContainersResource(FunctionalTest):
def test_should_add_new_container(self):
resp = self.app.post_json(
'/%s/containers/' % self.tenant_keystone_id,
'/containers/',
self.container_req
)
self.assertEqual(resp.status_int, 201)
@ -1782,7 +1793,7 @@ class WhenCreatingContainersUsingContainersResource(FunctionalTest):
def test_should_raise_container_bad_json(self):
resp = self.app.post(
'/%s/containers/' % self.tenant_keystone_id,
'/containers/',
'',
expect_errors=True,
headers={'Content-Type': 'application/json'},
@ -1791,7 +1802,7 @@ class WhenCreatingContainersUsingContainersResource(FunctionalTest):
def test_should_raise_container_no_content_type_header(self):
resp = self.app.post(
'/%s/containers/' % self.tenant_keystone_id,
'/containers/',
self.container_req,
expect_errors=True,
)
@ -1800,7 +1811,7 @@ class WhenCreatingContainersUsingContainersResource(FunctionalTest):
def test_should_throw_exception_when_secret_ref_doesnt_exist(self):
self.secret_repo.get.return_value = None
resp = self.app.post_json(
'/%s/containers/' % self.tenant_keystone_id,
'/containers/',
self.container_req,
expect_errors=True
)
@ -1813,6 +1824,7 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
WhenGettingOrDeletingContainerUsingContainerResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -1848,8 +1860,8 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
self.consumer_repo = mock.MagicMock()
def test_should_get_container(self):
self.app.get('/%s/containers/%s/' % (
self.tenant_keystone_id, self.container.id
self.app.get('/containers/{0}/'.format(
self.container.id
))
self.container_repo.get \
@ -1858,8 +1870,8 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
suppress_exception=True)
def test_should_delete_container(self):
self.app.delete('/%s/containers/%s/' % (
self.tenant_keystone_id, self.container.id
self.app.delete('/containers/{0}/'.format(
self.container.id
))
self.container_repo.delete_entity_by_id \
@ -1868,8 +1880,8 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
def test_should_throw_exception_for_get_when_container_not_found(self):
self.container_repo.get.return_value = None
resp = self.app.get('/%s/containers/%s/' % (
self.tenant_keystone_id, self.container.id
resp = self.app.get('/containers/{0}/'.format(
self.container.id
), expect_errors=True)
self.assertEqual(resp.status_int, 404)
@ -1877,8 +1889,8 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
self.container_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
resp = self.app.delete('/%s/containers/%s/' % (
self.tenant_keystone_id, self.container.id
resp = self.app.delete('/containers/{0}/'.format(
self.container.id
), expect_errors=True)
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
@ -1891,6 +1903,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
WhenCreatingConsumersUsingConsumersResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -1953,8 +1966,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
def test_should_add_new_consumer(self):
resp = self.app.post_json(
'/{0}/containers/{1}/consumers/'.format(self.tenant_keystone_id,
self.container.id),
'/containers/{0}/consumers/'.format(self.container.id),
self.consumer_ref
)
self.assertEqual(resp.status_int, 200)
@ -1965,8 +1977,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
def test_should_fail_consumer_bad_json(self):
resp = self.app.post(
'/{0}/containers/{1}/consumers/'.format(self.tenant_keystone_id,
self.container.id),
'/containers/{0}/consumers/'.format(self.container.id),
'',
expect_errors=True
)
@ -1975,8 +1986,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
def test_should_404_consumer_bad_container_id(self):
self.container_repo.get.side_effect = excep.NotFound()
resp = self.app.post_json(
'/{0}/containers/{1}/consumers/'.format(self.tenant_keystone_id,
'bad_id'),
'/containers/{0}/consumers/'.format('bad_id'),
self.consumer_ref, expect_errors=True
)
self.container_repo.get.side_effect = None
@ -1985,8 +1995,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
def test_should_raise_exception_when_container_ref_doesnt_exist(self):
self.container_repo.get.return_value = None
resp = self.app.post_json(
'/{0}/containers/{1}/consumers/'.format(self.tenant_keystone_id,
self.container.id),
'/containers/{0}/consumers/'.format(self.container.id),
self.consumer_ref,
expect_errors=True
)
@ -1999,6 +2008,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
WhenGettingOrDeletingConsumersUsingConsumerResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -2043,8 +2053,8 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
def test_should_get_consumer(self):
self.consumer_repo.get_by_container_id.return_value = \
([self.consumer], 0, 0, 1)
resp = self.app.get('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, self.container.id
resp = self.app.get('/containers/{0}/consumers/'.format(
self.container.id
))
self.assertEqual(resp.status_int, 200)
@ -2058,37 +2068,37 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
def test_should_404_with_bad_container_id(self):
self.container_repo.get.side_effect = excep.NotFound()
resp = self.app.get('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, 'bad_id'
resp = self.app.get('/containers/{0}/consumers/'.format(
'bad_id'
), expect_errors=True)
self.container_repo.get.side_effect = None
self.assertEqual(resp.status_int, 404)
def test_should_get_consumer_by_id(self):
self.consumer_repo.get.return_value = self.consumer
resp = self.app.get('/{0}/containers/{1}/consumers/{2}/'.format(
self.tenant_keystone_id, self.container.id, self.consumer.id
resp = self.app.get('/containers/{0}/consumers/{1}/'.format(
self.container.id, self.consumer.id
))
self.assertEqual(resp.status_int, 200)
def test_should_404_with_bad_consumer_id(self):
self.consumer_repo.get.return_value = None
resp = self.app.get('/{0}/containers/{1}/consumers/{2}/'.format(
self.tenant_keystone_id, self.container.id, 'bad_id'
resp = self.app.get('/containers/{0}/consumers/{1}/'.format(
self.container.id, 'bad_id'
), expect_errors=True)
self.assertEqual(resp.status_int, 404)
def test_should_get_no_consumers(self):
self.consumer_repo.get_by_container_id.return_value = \
([], 0, 0, 0)
resp = self.app.get('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, self.container.id
resp = self.app.get('/containers/{0}/consumers/'.format(
self.container.id
))
self.assertEqual(resp.status_int, 200)
def test_should_delete_consumer(self):
self.app.delete_json('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, self.container.id
self.app.delete_json('/containers/{0}/consumers/'.format(
self.container.id
), self.consumer_ref)
self.consumer_repo.delete_entity_by_id \
@ -2097,8 +2107,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
def test_should_fail_deleting_consumer_bad_json(self):
resp = self.app.delete(
'/{0}/containers/{1}/consumers/'.format(self.tenant_keystone_id,
self.container.id),
'/containers/{0}/consumers/'.format(self.container.id),
'',
expect_errors=True
)
@ -2107,8 +2116,8 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
def test_should_404_on_delete_when_consumer_not_found(self):
old_return = self.consumer_repo.get_by_values.return_value
self.consumer_repo.get_by_values.return_value = None
resp = self.app.delete_json('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, self.container.id
resp = self.app.delete_json('/containers/{0}/consumers/'.format(
self.container.id
), self.consumer_ref, expect_errors=True)
self.consumer_repo.get_by_values.return_value = old_return
self.assertEqual(resp.status_int, 404)
@ -2117,8 +2126,8 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
def test_should_404_on_delete_when_consumer_not_found_later(self):
self.consumer_repo.delete_entity_by_id.side_effect = excep.NotFound()
resp = self.app.delete_json('/{0}/containers/{1}/consumers/'.format(
self.tenant_keystone_id, self.container.id
resp = self.app.delete_json('/containers/{0}/consumers/'.format(
self.container.id
), self.consumer_ref, expect_errors=True)
self.consumer_repo.delete_entity_by_id.side_effect = None
self.assertEqual(resp.status_int, 404)
@ -2132,6 +2141,7 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
WhenGettingContainersListUsingResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -2172,7 +2182,7 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
def test_should_get_list_containers(self):
resp = self.app.get(
'/%s/containers/' % self.keystone_id,
'/containers/',
self.params
)
@ -2199,7 +2209,7 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
def test_response_should_include_total(self):
resp = self.app.get(
'/%s/containers/' % self.keystone_id,
'/containers/',
self.params
)
self.assertIn('total', resp.namespace)
@ -2210,7 +2220,7 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
del self.containers[:]
resp = self.app.get(
'/%s/containers/' % self.keystone_id,
'/containers/',
self.params
)
@ -2227,8 +2237,7 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/{0}/containers' \
'?limit={1}&offset={2}'.format(keystone_id,
limit, offset)
return '/containers' \
'?limit={0}&offset={1}'.format(limit, offset)
else:
return '/{0}/containers'.format(self.keystone_id)
return '/containers'

View File

@ -26,8 +26,25 @@ import webtest
from barbican.api import app
from barbican.api import controllers
from barbican.common import exception as excep
import barbican.context
from barbican.model import models
def get_barbican_env(keystone_id):
class NoopPolicyEnforcer(object):
def enforce(self, *args, **kwargs):
return
kwargs = {'roles': None,
'user': None,
'tenant': keystone_id,
'is_admin': True,
'policy_enforcer': NoopPolicyEnforcer()}
barbican_env = {'barbican.context':
barbican.context.RequestContext(**kwargs)}
return barbican_env
SAMPLE_TRANSPORT_KEY = """
-----BEGIN CERTIFICATE-----
MIIDlDCCAnygAwIBAgIBGDANBgkqhkiG9w0BAQsFADBCMR8wHQYDVQQKDBZ0b21j
@ -86,6 +103,7 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
WhenGettingTransKeysListUsingTransportKeysResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -125,8 +143,8 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
}
def test_should_get_list_transport_keys(self):
resp = self.app.get('/%s/transport_keys/' %
self.keystone_id, self.params)
resp = self.app.get('/transport_keys/',
self.params)
self.repo.get_by_create_date \
.assert_called_once_with(plugin_name=None,
@ -150,8 +168,8 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
(self.num_keys + 2))
def test_response_should_include_total(self):
resp = self.app.get('/%s/transport_keys/' %
self.keystone_id, self.params)
resp = self.app.get('/transport_keys/',
self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(resp.namespace['total'], self.total)
@ -159,8 +177,8 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
del self.tkeys[:]
resp = self.app.get('/%s/transport_keys/' %
self.keystone_id, self.params)
resp = self.app.get('/transport_keys/',
self.params)
self.repo.get_by_create_date \
.assert_called_once_with(plugin_name=None,
@ -175,10 +193,10 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/{0}/transport_keys?limit={1}&offset={2}'.format(
keystone_id, limit, offset)
return '/transport_keys?limit={0}&offset={1}'.format(
limit, offset)
else:
return '/{0}/transport_keys'.format(self.keystone_id)
return '/transport_keys'
class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
@ -187,6 +205,7 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
WhenCreatingTransKeysListUsingTransportKeysResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.keystone_id)
@property
def root(self):
@ -210,7 +229,7 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
def test_should_add_new_transport_key(self):
resp = self.app.post_json(
'/%s/transport_keys/' % self.keystone_id,
'/transport_keys/',
self.transport_key_req
)
self.assertEqual(resp.status_int, 201)
@ -221,7 +240,7 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
def test_should_raise_add_new_transport_key_no_secret(self):
resp = self.app.post_json(
'/%s/transport_keys/' % self.keystone_id,
'/transport_keys/',
{},
expect_errors=True
)
@ -229,7 +248,7 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
def test_should_raise_add_new_transport_key_bad_json(self):
resp = self.app.post(
'/%s/transport_keys/' % self.keystone_id,
'/transport_keys/',
'',
expect_errors=True,
content_type='application/json'
@ -238,7 +257,7 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
def test_should_raise_add_new_transport_key_no_content_type_header(self):
resp = self.app.post(
'/%s/transport_keys/' % self.keystone_id,
'/transport_keys/',
self.transport_key_req,
expect_errors=True,
)
@ -252,6 +271,7 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
WhenGettingOrDeletingTransKeyUsingTransportKeyResource, self
).setUp()
self.app = webtest.TestApp(app.PecanAPI(self.root))
self.app.extra_environ = get_barbican_env(self.tenant_keystone_id)
@property
def root(self):
@ -277,22 +297,20 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
self.repo.get.return_value = self.tkey
def test_should_get_transport_key(self):
self.app.get('/%s/transport_keys/%s/' % (self.tenant_keystone_id,
self.tkey.id))
self.app.get('/transport_keys/{0}/'.format(self.tkey.id))
self.repo.get.assert_called_once_with(entity_id=self.tkey.id)
def test_should_throw_exception_for_get_when_trans_key_not_found(self):
self.repo.get.return_value = None
resp = self.app.get(
'/%s/transport_keys/%s/' % (self.tenant_keystone_id, self.tkey.id),
'/transport_keys/{0}/'.format(self.tkey.id),
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
def test_should_delete_transport_key(self):
self.app.delete('/%s/transport_keys/%s/' % (self.tenant_keystone_id,
self.tkey.id))
self.app.delete('/transport_keys/{0}/'.format(self.tkey.id))
self.repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.tkey.id,
keystone_id=self.tenant_keystone_id)
@ -301,7 +319,7 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
self.repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
resp = self.app.delete(
'/%s/transport_keys/%s/' % (self.tenant_keystone_id, self.tkey.id),
'/transport_keys/{0}/'.format(self.tkey.id),
expect_errors=True
)
self.assertEqual(resp.status_int, 404)

View File

@ -38,23 +38,17 @@ class WhenTestingHostnameForRefsGetter(testtools.TestCase):
utils.CONF.host_href = self._old_host
utils.API_VERSION = self._old_version
def test_hostname_for_refs_no_keystone_id(self):
def test_hostname_for_refs(self):
uri = utils.hostname_for_refs(resource=self.resource)
self.assertEqual(uri, "{0}/{1}".format(self.host, self.version))
def test_hostname_for_refs_no_resource(self):
uri = utils.hostname_for_refs(keystone_id=self.keystone_id)
self.assertEqual(uri, "{0}/{1}/{2}".format(self.host,
self.version,
self.keystone_id))
def test_hostname_for_refs_with_resource_and_keystone_id(self):
uri = utils.hostname_for_refs(keystone_id=self.keystone_id,
resource=self.resource)
self.assertEqual(uri, "{0}/{1}/{2}/{3}".format(self.host, self.version,
self.keystone_id,
self.resource))
def test_hostname_for_refs_no_resource(self):
uri = utils.hostname_for_refs()
self.assertEqual(uri, "{0}/{1}".format(self.host,
self.version))
class WhenTestingAcceptEncodingGetter(testtools.TestCase):

View File

@ -5,7 +5,7 @@ use = egg:Paste#urlmap
# Use this pipeline for Barbican API - versions no authentication
[pipeline:barbican_version]
pipeline = unauthenticated-context versionapp
pipeline = versionapp
# Use this pipeline for Barbican API - DEFAULT no authentication
[pipeline:barbican_api]

View File

@ -67,9 +67,8 @@ class ConsumersTestCase(base.TestCase):
super(ConsumersTestCase, self).setUp()
# Set up two secrets
secret_json_data = json.dumps(create_secret_data)
project_id = self.client.project_id
resp, body = self.client.post(
'{0}/secrets'.format(project_id), secret_json_data, headers={
'/secrets', secret_json_data, headers={
'content-type': 'application/json'})
self.assertEqual(resp.status, 201)
@ -77,7 +76,7 @@ class ConsumersTestCase(base.TestCase):
secret_ref_1 = returned_data['secret_ref']
self.assertIsNotNone(secret_ref_1)
resp, body = self.client.post(
'{0}/secrets'.format(project_id), secret_json_data, headers={
'/secrets', secret_json_data, headers={
'content-type': 'application/json'})
self.assertEqual(resp.status, 201)
@ -90,7 +89,7 @@ class ConsumersTestCase(base.TestCase):
create_container_data['secret_refs'][1]['secret_ref'] = secret_ref_2
container_json_data = json.dumps(create_container_data)
resp, body = self.client.post(
'{0}/containers'.format(project_id), container_json_data,
'/containers', container_json_data,
headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 201)
@ -104,10 +103,8 @@ class ConsumersTestCase(base.TestCase):
create the consumer is provided in a single POST.
"""
json_data = json.dumps(create_consumer_data)
project_id = self.client.project_id
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -121,12 +118,10 @@ class ConsumersTestCase(base.TestCase):
the consumer is deleted and verified to no longer exist.
"""
json_data = json.dumps(create_consumer_data_for_delete)
project_id = self.client.project_id
#Register the consumer once
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -137,8 +132,7 @@ class ConsumersTestCase(base.TestCase):
#Delete the consumer
resp, body = self.client.delete(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
body=json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -152,12 +146,10 @@ class ConsumersTestCase(base.TestCase):
the consumer is deleted and verified to no longer exist.
"""
json_data = json.dumps(create_consumer_data_for_recreate)
project_id = self.client.project_id
#Register the consumer once
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -168,8 +160,7 @@ class ConsumersTestCase(base.TestCase):
#Delete the consumer
resp, body = self.client.delete(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
body=json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -180,8 +171,7 @@ class ConsumersTestCase(base.TestCase):
#Register the consumer again
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -195,12 +185,10 @@ class ConsumersTestCase(base.TestCase):
the consumer is deleted and verified to no longer exist.
"""
json_data = json.dumps(create_consumer_data_for_idempotency)
project_id = self.client.project_id
#Register the consumer once
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)
@ -211,8 +199,7 @@ class ConsumersTestCase(base.TestCase):
#Register the consumer again, without deleting it first
resp, body = self.client.post(
'{0}/containers/{1}/consumers'.format(project_id,
self.container_id),
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)

View File

@ -35,9 +35,8 @@ class OrdersTestCase(base.TestCase):
create the order is provided in a single POST.
"""
json_data = json.dumps(create_order_data)
project_id = self.client.project_id
resp, body = self.client.post(
'{0}/orders'.format(project_id), json_data, headers={
'/orders', json_data, headers={
'content-type': 'application/json'})
self.assertEqual(resp.status, 202)

View File

@ -51,9 +51,8 @@ class SecretsTestCase(base.TestCase):
create the secret, including payload, is provided in a single POST.
"""
json_data = json.dumps(one_phase_create_data)
project_id = self.client.project_id
resp, body = self.client.post(
'{0}/secrets'.format(project_id), json_data, headers={
'/secrets', json_data, headers={
'content-type': 'application/json'})
self.assertEqual(resp.status, 201)
@ -68,9 +67,8 @@ class SecretsTestCase(base.TestCase):
"""
# phase 1 - POST secret without payload
json_data = json.dumps(two_phase_create_data)
project_id = self.client.project_id
resp, body = self.client.post(
'{0}/secrets'.format(project_id), json_data, headers={
'/secrets', json_data, headers={
'content-type': 'application/json'})
self.assertEqual(resp.status, 201)
@ -84,7 +82,7 @@ class SecretsTestCase(base.TestCase):
# phase 2 - provide (PUT) the secret payload
json_data = json.dumps(two_phase_payload_data)
resp, body = self.client.post(
'{0}/secrets/{1}'.format(project_id, secret_id), json_data,
'/secrets/{0}'.format(secret_id), json_data,
headers={'content-type': 'application/json'})
self.assertEqual(resp.status, 200)

View File

@ -22,7 +22,7 @@ fi
echo "Successfully contacted the Barbican (non-admin) API"
if ! timeout ${API_RESPONDING_TIMEOUT} sh -c "while ! curl -s http://127.0.0.1:9312/ 2>/dev/null | grep -q 'v1' ; do sleep 1; done"; then
if ! timeout ${API_RESPONDING_TIMEOUT} sh -c "while ! curl -s http://127.0.0.1:9312/ -HX-Project-Id:123 2>/dev/null | grep -q 'v1' ; do sleep 1; done"; then
echo "The Barbican (admin) API failed to respond within ${API_RESPONDING_TIMEOUT} seconds"
exit 1
fi