Merge "Adds container endpoints to Barbican API"

This commit is contained in:
Jenkins 2014-02-19 20:33:32 +00:00 committed by Gerrit Code Review
commit 9a2952dbb2
9 changed files with 843 additions and 1 deletions

View File

@ -59,6 +59,8 @@ def create_main_app(global_config, **local_conf):
order = res.OrderResource()
verifications = res.VerificationsResource()
verification = res.VerificationResource()
containers = res.ContainersResource()
container = res.ContainerResource()
# For performance testing only
performance = res.PerformanceResource()
@ -76,6 +78,8 @@ def create_main_app(global_config, **local_conf):
api.add_route('/v1/{keystone_id}/verifications', verifications)
api.add_route('/v1/{keystone_id}/verifications/{verification_id}',
verification)
api.add_route('/v1/{keystone_id}/containers/', containers)
api.add_route('/v1/{keystone_id}/containers/{container_id}', container)
# For performance testing only
api.add_route('/{0}'.format(performance_uri), performance)

View File

@ -60,6 +60,13 @@ def _verification_not_found(req, resp):
'another castle.'), req, resp)
def _container_not_found(req, resp):
"""Throw exception indicating container not found."""
api.abort(falcon.HTTP_404, u._('Not Found. Sorry but your container '
'is in '
'another castle.'), req, resp)
def _put_accept_incorrect(ct, req, resp):
"""Throw exception indicating request content-type is not supported."""
api.abort(falcon.HTTP_415,
@ -112,12 +119,23 @@ def convert_order_to_href(keystone_id, order_id):
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
def convert_container_to_href(keystone_id, container_id):
"""Convert the tenant/container IDs to a HATEOS-style href."""
if container_id:
resource = 'containers/' + container_id
else:
resource = 'containers/????'
return utils.hostname_for_refs(keystone_id=keystone_id, resource=resource)
#TODO: (hgedikli) handle list of fields in here
def convert_to_hrefs(keystone_id, fields):
"""Convert id's within a fields dict to HATEOS-style hrefs."""
if 'secret_id' in fields:
fields['secret_ref'] = convert_secret_to_href(keystone_id,
fields['secret_id'])
del fields['secret_id']
if 'order_id' in fields:
fields['order_ref'] = convert_order_to_href(keystone_id,
fields['order_id'])
@ -127,6 +145,12 @@ def convert_to_hrefs(keystone_id, fields):
convert_verification_to_href(keystone_id,
fields['verification_id'])
del fields['verification_id']
if 'container_id' in fields:
fields['container_ref'] = \
convert_container_to_href(keystone_id, fields['container_id'])
del fields['container_id']
return fields
@ -668,3 +692,122 @@ class VerificationResource(api.ApiResource):
_verification_not_found(req, resp)
resp.status = falcon.HTTP_200
class ContainersResource(api.ApiResource):
""" Handles Container creation requests. """
def __init__(self, tenant_repo=None, container_repo=None,
secret_repo=None):
self.tenant_repo = tenant_repo or repo.TenantRepo()
self.container_repo = container_repo or repo.ContainerRepo()
self.secret_repo = secret_repo or repo.SecretRepo()
self.validator = validators.ContainerValidator()
@handle_exceptions(u._('Container creation'))
@handle_rbac('containers:post')
def on_post(self, req, resp, keystone_id):
tenant = res.get_or_create_tenant(keystone_id, self.tenant_repo)
data = api.load_body(req, resp, self.validator)
LOG.debug('Start on_post...{0}'.format(data))
new_container = models.Container(data)
new_container.tenant_id = tenant.id
#TODO: (hgedikli) performance optimizations
for secret_ref in new_container.container_secrets:
secret = self.secret_repo.get(entity_id=secret_ref.secret_id,
keystone_id=keystone_id,
suppress_exception=True)
if not secret:
api.abort(falcon.HTTP_404,
u._("Secret provided for '%s'"
" doesn't exist." % secret_ref.name),
req, resp)
self.container_repo.create_from(new_container)
resp.status = falcon.HTTP_202
resp.set_header('Location',
'/{0}/containers/{1}'.format(keystone_id,
new_container.id))
url = convert_container_to_href(keystone_id, new_container.id)
resp.body = json.dumps({'container_ref': url})
@handle_exceptions(u._('Containers(s) retrieval'))
@handle_rbac('containers:get')
def on_get(self, req, resp, keystone_id):
LOG.debug('Start containers on_get '
'for tenant-ID {0}:'.format(keystone_id))
result = self.container_repo.get_by_create_date(
keystone_id,
offset_arg=req.get_param('offset'),
limit_arg=req.get_param('limit'),
suppress_exception=True
)
containers, offset, limit, total = result
if not containers:
resp_ctrs_overall = {'containers': [], 'total': total}
else:
resp_ctrs = [convert_to_hrefs(keystone_id,
c.to_dict_fields())
for c in containers]
resp_ctrs_overall = add_nav_hrefs('containers',
keystone_id, offset,
limit, total,
{'containers': resp_ctrs})
resp_ctrs_overall.update({'total': total})
resp.status = falcon.HTTP_200
resp.body = json.dumps(resp_ctrs_overall,
default=json_handler)
class ContainerResource(api.ApiResource):
"""Handles Container entity retrieval and deletion requests."""
def __init__(self, tenant_repo=None, container_repo=None):
self.tenant_repo = tenant_repo or repo.TenantRepo()
self.container_repo = container_repo or repo.ContainerRepo()
self.validator = validators.ContainerValidator()
@handle_exceptions(u._('Container retrieval'))
@handle_rbac('container:get')
def on_get(self, req, resp, keystone_id, container_id):
container = self.container_repo.get(entity_id=container_id,
keystone_id=keystone_id,
suppress_exception=True)
if not container:
_container_not_found(req, resp)
resp.status = falcon.HTTP_200
dict_fields = container.to_dict_fields()
for secret_ref in dict_fields['secret_refs']:
convert_to_hrefs(keystone_id, secret_ref)
resp.body = json.dumps(
convert_to_hrefs(keystone_id,
convert_to_hrefs(keystone_id, dict_fields)),
default=json_handler)
@handle_exceptions(u._('Container deletion'))
@handle_rbac('container:delete')
def on_delete(self, req, resp, keystone_id, container_id):
try:
self.container_repo.delete_entity_by_id(entity_id=container_id,
keystone_id=keystone_id)
except exception.NotFound:
LOG.exception('Problem deleting container')
_container_not_found(req, resp)
resp.status = falcon.HTTP_200

View File

@ -308,3 +308,79 @@ class VerificationValidator(ValidatorBase):
property=get_invalid_property(e))
return json_data
class ContainerValidator(ValidatorBase):
""" Validator for all types of Container"""
def __init__(self):
self.name = 'Container'
self.schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {
"type": "string",
#TODO: (hgedikli) move this to a common location
"enum": ["generic", "rsa"]
},
"secret_refs": {"type": "array", "items": {
"type": "object",
"required": ["secret_ref"],
"properties": {
"secret_ref": {"type": "string", "minLength": 1}
}
}
}
},
"required": ["type"]
}
def validate(self, json_data, parent_schema=None):
schema_name = self._full_name(parent_schema)
try:
schema.validate(json_data, self.schema)
except schema.ValidationError as e:
raise exception.InvalidObject(schema=schema_name,
reason=e.message,
property=get_invalid_property(e))
container_type = json_data.get('type')
secret_refs = json_data.get('secret_refs')
if secret_refs:
secret_refs_names = [secret_ref['name']
if 'name' in secret_ref else ''
for secret_ref in secret_refs]
if len(set(secret_refs_names)) != len(secret_refs):
raise exception.\
InvalidObject(schema=schema_name,
reason=_("Duplicate reference names"
" are not allowed"),
property="secret_refs")
if container_type == 'rsa':
supported_names = ('public_key',
'private_key',
'private_key_passphrase')
if self.contains_unsupported_names(secret_refs,
supported_names) or len(
secret_refs) > 3:
raise exception.\
InvalidObject(schema=schema_name,
reason=_("only 'private_key',"
" 'public_key'"
" and 'private_key_passphrase'"
" reference names are allowed"
" for RSA type"),
property="secret_refs")
return json_data
def contains_unsupported_names(self, secret_refs, supported_names):
for secret_ref in secret_refs:
if secret_ref.get('name') not in supported_names:
return True

View File

@ -152,6 +152,27 @@ class TenantSecret(BASE, ModelBase):
name='_tenant_secret_uc'),)
class ContainerSecret(BASE):
"""Represents an association between a Container and a Secret."""
__tablename__ = 'container_secret'
container_id = sa.Column(sa.String(36), sa.ForeignKey('containers.id'),
primary_key=True)
secret_id = sa.Column(sa.String(36), sa.ForeignKey('secrets.id'),
primary_key=True)
name = sa.Column(sa.String(255), nullable=True)
container = orm.relationship('Container',
backref=orm.backref('container_secrets',
lazy='joined'))
secrets = orm.relationship('Secret',
backref=orm.backref('container_secrets'))
__table_args__ = (sa.UniqueConstraint('container_id', 'secret_id', 'name',
name='_container_secret_name_uc'),)
class Tenant(BASE, ModelBase):
"""Represents a Tenant in the datastore.
@ -167,6 +188,7 @@ class Tenant(BASE, ModelBase):
verifications = orm.relationship("Verification", backref="tenant")
secrets = orm.relationship("TenantSecret", backref="tenants")
keks = orm.relationship("KEKDatum", backref="tenant")
containers = orm.relationship("Container", backref="tenant")
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
@ -215,6 +237,9 @@ class Secret(BASE, ModelBase):
for datum in self.encrypted_data:
datum.delete(session)
for secret_ref in self.container_secrets:
session.delete(secret_ref)
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'secret_id': self.id,
@ -398,8 +423,70 @@ class Verification(BASE, ModelBase):
ret['error_reason'] = self.error_reason
return ret
class Container(BASE, ModelBase):
"""Represents a Container for Secrets in the datastore.
Containers store secret references. Containers are owned by Tenants.
Containers can be generic or have a predefined type. Predefined typed
containers allow users to store structured key relationship
inside Barbican.
"""
__tablename__ = 'containers'
name = sa.Column(sa.String(255))
type = sa.Column(sa.Enum('generic', 'rsa', name='container_types'))
tenant_id = sa.Column(sa.String(36), sa.ForeignKey('tenants.id'),
nullable=False)
def __init__(self, parsed_request=None):
"""Creates a Container entity from a dict."""
super(Container, self).__init__()
if parsed_request:
self.name = parsed_request.get('name')
self.type = parsed_request.get('type')
self.status = States.ACTIVE
secret_refs = parsed_request.get('secret_refs')
if secret_refs:
for secret_ref in parsed_request.get('secret_refs'):
container_secret = ContainerSecret()
container_secret.name = secret_ref.get('name')
#TODO: (hgedikli) move this into a common location
#TODO: (hgedikli) validate provided url
#TODO: (hgedikli) parse out secret_id with regex
secret_id = secret_ref.get('secret_ref')
if secret_id.endswith('/'):
secret_id = secret_id.rsplit('/', 2)[1]
elif '/' in secret_id:
secret_id = secret_id.rsplit('/', 1)[1]
else:
secret_id = secret_id
container_secret.secret_id = secret_id
self.container_secrets.append(container_secret)
def _do_delete_children(self, session):
"""Sub-class hook: delete children relationships."""
for container_secret in self.container_secrets:
session.delete(container_secret)
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'container_id': self.id,
'name': self.name or self.id,
'type': self.type,
'secret_refs': [
{
'secret_id': container_secret.secret_id,
'name': container_secret.name
if hasattr(container_secret, 'name') else None
} for container_secret in self.container_secrets]}
# Keep this tuple synchronized with the models in the file
MODELS = [TenantSecret, Tenant, Secret, EncryptedDatum, Order, Verification]
MODELS = [TenantSecret, Tenant, Secret, EncryptedDatum, Order, Verification,
Container, ContainerSecret]
def register_models(engine):

View File

@ -790,3 +790,63 @@ class VerificationRepo(BaseRepo):
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
class ContainerRepo(BaseRepo):
"""Repository for the Container entity."""
def get_by_create_date(self, keystone_id, offset_arg=None, limit_arg=None,
suppress_exception=False, session=None):
"""
Returns a list of containers, ordered by the date they were
created at and paged based on the offset and limit fields. The
keystone_id is external-to-Barbican value assigned to the tenant
by Keystone.
"""
offset, limit = clean_paging_values(offset_arg, limit_arg)
session = self.get_session(session)
try:
query = session.query(models.Container) \
.order_by(models.Container.created_at)
query = query.filter_by(deleted=False) \
.join(models.Tenant, models.Container.tenant) \
.filter(models.Tenant.keystone_id == keystone_id)
start = offset
end = offset + limit
LOG.debug('Retrieving from {0} to {1}'.format(start, end))
total = query.count()
entities = query[start:end]
LOG.debug('Number entities retrieved: {0} out of {1}'.format(
len(entities), total
))
except sa_orm.exc.NoResultFound:
entities = None
total = 0
if not suppress_exception:
raise exception.NotFound("No %s's found"
% (self._do_entity_name()))
return entities, offset, limit, total
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "Container"
def _do_create_instance(self):
return models.Container()
def _do_build_get_query(self, entity_id, keystone_id, session):
"""Sub-class hook: build a retrieve query."""
return session.query(models.Container).filter_by(id=entity_id)\
.filter_by(deleted=False)\
.join(models.Tenant, models.Container.tenant)\
.filter(models.Tenant.keystone_id == keystone_id)
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass

View File

@ -87,6 +87,19 @@ def create_verification(id_ref="id"):
return verify
def create_container(id_ref):
"""Generate a Container entity instance."""
container = models.Container()
container.id = id_ref
container.name = 'test name'
container.type = 'rsa'
container_secret = models.ContainerSecret()
container_secret.container_id = id
container_secret.secret_id = '123'
container.container_secrets.append(container_secret)
return container
class WhenTestingVersionResource(unittest.TestCase):
def setUp(self):
self.req = mock.MagicMock()
@ -1540,3 +1553,232 @@ class TestingJsonSanitization(unittest.TestCase):
.startswith(' '), "whitespace should be gone")
self.assertFalse(json_with_array['an-array'][1]['name']
.endswith(' '), "whitespace should be gone")
class WhenCreatingContainersUsingContainersResource(unittest.TestCase):
def setUp(self):
self.name = 'test container name'
self.type = 'generic'
self.secret_refs = [
{
'name': 'test secret 1',
'secret_ref': '123'
},
{
'name': 'test secret 2',
'secret_ref': '123'
},
{
'name': 'test secret 3',
'secret_ref': '123'
}
]
self.tenant_internal_id = 'tenantid1234'
self.tenant_keystone_id = 'keystoneid1234'
self.tenant = models.Tenant()
self.tenant.id = self.tenant_internal_id
self.tenant.keystone_id = self.tenant_keystone_id
self.tenant_repo = mock.MagicMock()
self.tenant_repo.get.return_value = self.tenant
self.container_repo = mock.MagicMock()
self.container_repo.create_from.return_value = None
self.secret_repo = mock.MagicMock()
self.secret_repo.create_from.return_value = None
self.stream = mock.MagicMock()
self.container_req = {'name': self.name,
'type': self.type,
'secret_refs': self.secret_refs}
self.json = json.dumps(self.container_req)
self.stream.read.return_value = self.json
self.req = mock.MagicMock()
self.req.stream = self.stream
self.resp = mock.MagicMock()
self.resource = res.ContainersResource(self.tenant_repo,
self.container_repo,
self.secret_repo)
def test_should_add_new_container(self):
self.resource.on_post(self.req, self.resp, self.tenant_keystone_id)
self.assertEquals(falcon.HTTP_202, self.resp.status)
args, kwargs = self.container_repo.create_from.call_args
container = args[0]
self.assertIsInstance(container, models.Container)
def test_should_fail_container_bad_json(self):
self.stream.read.return_value = ''
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_throw_exception_when_secret_ref_doesnt_exist(self):
self.secret_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.tenant_keystone_id)
exception = cm.exception
self.assertEqual(falcon.HTTP_404, exception.status)
class WhenGettingOrDeletingContainerUsingContainerResource(unittest.TestCase):
def setUp(self):
self.tenant_keystone_id = 'keystoneid1234'
self.tenant_internal_id = 'tenantid1234'
self.tenant = models.Tenant()
self.tenant.id = self.tenant_internal_id
self.tenant.keystone_id = self.tenant_keystone_id
self.tenant_repo = mock.MagicMock()
self.tenant_repo.get.return_value = self.tenant
self.container = create_container(id_ref='id1')
self.container_repo = mock.MagicMock()
self.container_repo.get.return_value = self.container
self.container_repo.delete_entity_by_id.return_value = None
self.req = mock.MagicMock()
self.resp = mock.MagicMock()
self.resource = res.ContainerResource(self.tenant_repo,
self.container_repo)
def test_should_get_container(self):
self.resource.on_get(self.req, self.resp, self.tenant_keystone_id,
self.container.id)
self.container_repo.get \
.assert_called_once_with(entity_id=self.container.id,
keystone_id=self.tenant_keystone_id,
suppress_exception=True)
def test_should_delete_container(self):
self.resource.on_delete(self.req, self.resp, self.tenant_keystone_id,
self.container.id)
self.container_repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.container.id,
keystone_id=self.tenant_keystone_id)
def test_should_throw_exception_for_get_when_container_not_found(self):
self.container_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.tenant_keystone_id,
self.container.id)
exception = cm.exception
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_throw_exception_for_delete_when_container_not_found(self):
self.container_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_delete(self.req, self.resp,
self.tenant_keystone_id,
self.container.id)
exception = cm.exception
self.assertEqual(falcon.HTTP_404, exception.status)
class WhenGettingContainersListUsingResource(unittest.TestCase):
def setUp(self):
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystoneid1234'
self.num_containers = 10
self.offset = 2
self.limit = 2
self.containers = [create_container(id_ref='id' + str(id_ref)) for
id_ref in xrange(self.num_containers)]
self.total = len(self.containers)
self.container_repo = mock.MagicMock()
self.container_repo.get_by_create_date.return_value = (self.containers,
self.offset,
self.limit,
self.total)
self.tenant_repo = mock.MagicMock()
self.secret_repo = mock.MagicMock()
self.req = mock.MagicMock()
self.req.accept = 'application/json'
self.req.get_param = mock.Mock()
self.req.get_param.side_effect = [self.offset, self.limit, None, None,
None, 0]
self.resp = mock.MagicMock()
self.resource = res.ContainersResource(self.tenant_repo,
self.container_repo,
self.secret_repo)
def test_should_get_list_containers(self):
self.resource.on_get(self.req, self.resp, self.keystone_id)
self.container_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=self.offset,
limit_arg=self.limit,
suppress_exception=True)
resp_body = jsonutils.loads(self.resp.body)
self.assertTrue('previous' in resp_body)
self.assertTrue('next' in resp_body)
url_nav_next = self._create_url(self.keystone_id,
self.offset + self.limit, self.limit)
self.assertTrue(self.resp.body.count(url_nav_next) == 1)
url_nav_prev = self._create_url(self.keystone_id,
0, self.limit)
self.assertTrue(self.resp.body.count(url_nav_prev) == 1)
url_hrefs = self._create_url(self.keystone_id)
self.assertTrue(self.resp.body.count(url_hrefs) ==
(self.num_containers + 2))
def test_response_should_include_total(self):
self.resource.on_get(self.req, self.resp, self.keystone_id)
resp_body = jsonutils.loads(self.resp.body)
self.assertIn('total', resp_body)
self.assertEqual(resp_body['total'], self.total)
def test_should_handle_no_containers(self):
del self.containers[:]
self.resource.on_get(self.req, self.resp, self.keystone_id)
self.container_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=self.offset,
limit_arg=self.limit,
suppress_exception=True)
resp_body = jsonutils.loads(self.resp.body)
self.assertFalse('previous' in resp_body)
self.assertFalse('next' in resp_body)
def _create_url(self, keystone_id, offset_arg=None, limit_arg=None):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/v1/{0}/containers' \
'?limit={1}&offset={2}'.format(keystone_id,
limit, offset)
else:
return '/v1/{0}/containers'.format(self.keystone_id)

View File

@ -394,5 +394,184 @@ class WhenTestingOrderValidator(unittest.TestCase):
self.assertEqual('algorithm', e.exception.invalid_field)
class WhenTestingContainerValidator(unittest.TestCase):
def setUp(self):
self.name = 'name'
self.type = 'generic'
self.secret_refs = [
{
'name': 'testname',
'secret_ref': '123'
},
{
'name': 'testname2',
'secret_ref': '123'
}
]
self.container_req = {'name': self.name,
'type': self.type,
'secret_refs': self.secret_refs}
self.validator = validators.ContainerValidator()
def test_should_validate_all_fields(self):
self.validator.validate(self.container_req)
def test_should_validate_no_name(self):
del self.container_req['name']
self.validator.validate(self.container_req)
def test_should_validate_empty_name(self):
self.container_req['name'] = ' '
self.validator.validate(self.container_req)
def test_should_fail_no_type(self):
del self.container_req['type']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
#TODO: (hgedikli) figure out why invalid_property is null here
#self.assertEqual('type', e.exception.invalid_property)
def test_should_fail_empty_type(self):
self.container_req['type'] = ''
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('type', e.exception.invalid_property)
def test_should_fail_not_supported_type(self):
self.container_req['type'] = 'testtype'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('type', e.exception.invalid_property)
def test_should_fail_numeric_name(self):
self.container_req['name'] = 123
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('name', e.exception.invalid_property)
def test_should_fail_all_nulls(self):
self.container_req = {'name': None,
'type': None,
'bit_length': None,
'secret_refs': None}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
def test_should_fail_all_empties(self):
self.container_req = {'name': '',
'type': '',
'secret_refs': []}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
def test_should_validate_empty_secret_refs(self):
self.container_req['secret_refs'] = []
self.validator.validate(self.container_req)
def test_should_fail_no_secret_ref_in_secret_refs(self):
del self.container_req['secret_refs'][0]['secret_ref']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
def test_should_fail_empty_secret_ref_in_secret_refs(self):
self.container_req['secret_refs'][0]['secret_ref'] = ''
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
def test_should_fail_numeric_secret_ref_in_secret_refs(self):
self.container_req['secret_refs'][0]['secret_ref'] = 123
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
def test_should_fail_duplicate_names_in_secret_refs(self):
self.container_req['secret_refs'].append(
self.container_req['secret_refs'][0])
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('secret_refs', e.exception.invalid_property)
class WhenTestingRSAContainerValidator(unittest.TestCase):
def setUp(self):
self.name = 'name'
self.type = 'rsa'
self.secret_refs = [
{
'name': 'public_key',
'secret_ref': '123'
},
{
'name': 'private_key',
'secret_ref': '123'
},
{
'name': 'private_key_passphrase',
'secret_ref': '123'
}
]
self.container_req = {'name': self.name,
'type': self.type,
'secret_refs': self.secret_refs}
self.validator = validators.ContainerValidator()
def test_should_fail_no_names_in_secret_refs(self):
del self.container_req['secret_refs'][0]['name']
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('secret_refs', e.exception.invalid_property)
def test_should_fail_empty_names_in_secret_refs(self):
self.container_req['secret_refs'][0]['name'] = ''
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('secret_refs', e.exception.invalid_property)
def test_should_fail_unsupported_names_in_secret_refs(self):
self.container_req['secret_refs'][0]['name'] = 'testttt'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('secret_refs', e.exception.invalid_property)
def test_should_fail_more_than_3_secret_refs(self):
new_secret_ref = {
'name': 'new secret ref',
'secret_ref': '234234'
}
self.container_req['secret_refs'].append(new_secret_ref)
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
self.assertEqual('secret_refs', e.exception.invalid_property)
if __name__ == '__main__':
unittest.main()

View File

@ -34,3 +34,50 @@ class WhenCreatingNewSecret(unittest.TestCase):
self.assertEqual(secret.algorithm, self.parsed_secret['algorithm'])
self.assertEqual(secret.bit_length, self.parsed_secret['bit_length'])
self.assertEqual(secret.mode, self.parsed_secret['mode'])
class WhenCreatingNewContainer(unittest.TestCase):
def setUp(self):
self.parsed_container = {'name': 'name',
'type': 'generic',
'secret_refs': [
{'name': 'test secret 1',
'secret_ref': '123'},
{'name': 'test secret 2',
'secret_ref': '123'},
{'name': 'test secret 3',
'secret_ref': '123'}
]}
def test_new_container_is_created_from_dict(self):
container = models.Container(self.parsed_container)
self.assertEqual(container.name, self.parsed_container['name'])
self.assertEqual(container.type, self.parsed_container['type'])
self.assertEqual(len(container.container_secrets),
len(self.parsed_container['secret_refs']))
self.assertEqual(container.container_secrets[0].name,
self.parsed_container['secret_refs'][0]['name'])
self.assertEqual(container.container_secrets[0].secret_id,
self.parsed_container['secret_refs'][0]['secret_ref'])
self.assertEqual(container.container_secrets[1].name,
self.parsed_container['secret_refs'][1]['name'])
self.assertEqual(container.container_secrets[1].secret_id,
self.parsed_container['secret_refs'][1]['secret_ref'])
self.assertEqual(container.container_secrets[2].name,
self.parsed_container['secret_refs'][2]['name'])
self.assertEqual(container.container_secrets[2].secret_id,
self.parsed_container['secret_refs'][2]['secret_ref'])
def test_parse_secret_ref_uri(self):
self.parsed_container['secret_refs'][0]['secret_ref'] =\
'http://localhost:9110/123/secrets/123456'
container = models.Container(self.parsed_container)
self.assertEqual(container.container_secrets[0].secret_id, '123456')
self.parsed_container['secret_refs'][0]['secret_ref'] =\
'http://localhost:9110/123/secrets/123456/'
container = models.Container(self.parsed_container)
self.assertEqual(container.container_secrets[0].secret_id, '123456')

View File

@ -10,6 +10,10 @@
"orders:get": "rule:all_but_audit",
"order:get": "rule:all_users",
"order:delete": "rule:admin",
"containers:post": "rule:admin_or_creator",
"containers:get": "rule:all_but_audit",
"container:get": "rule:all_users",
"container:delete": "rule:admin",
"verifications:post": "rule:admin_or_creator",
"verifications:get": "rule:all_but_audit",
"verification:get": "rule:all_users",