838 lines
36 KiB
Python
838 lines
36 KiB
Python
# Copyright (c) 2013-2014 Rackspace, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import unittest
|
|
|
|
from barbican.common import exception
|
|
from barbican.model import models
|
|
from barbican.plugin.interface import secret_store
|
|
from barbican.tests import utils
|
|
|
|
|
|
class WhenCreatingNewSecret(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecret, self).setUp()
|
|
self.parsed_secret = {'name': 'name',
|
|
'secret_type': secret_store.SecretType.OPAQUE,
|
|
'algorithm': 'algorithm',
|
|
'bit_length': 512,
|
|
'mode': 'mode',
|
|
'plain_text': 'not-encrypted',
|
|
'creator_id': 'creator12345'}
|
|
|
|
self.parsed_order = {'secret': self.parsed_secret}
|
|
|
|
def test_new_secret_is_created_from_dict(self):
|
|
date_time = datetime.datetime.utcnow().isoformat()
|
|
self.parsed_secret['expiration'] = date_time
|
|
secret = models.Secret(self.parsed_secret)
|
|
self.assertEqual(self.parsed_secret['name'], secret.name)
|
|
self.assertEqual(self.parsed_secret['secret_type'], secret.secret_type)
|
|
self.assertEqual(self.parsed_secret['algorithm'], secret.algorithm)
|
|
self.assertEqual(self.parsed_secret['bit_length'], secret.bit_length)
|
|
self.assertEqual(self.parsed_secret['mode'], secret.mode)
|
|
self.assertIsInstance(secret.expiration, datetime.datetime)
|
|
self.assertEqual(self.parsed_secret['creator_id'], secret.creator_id)
|
|
self.assertEqual(secret.created_at, secret.updated_at)
|
|
|
|
fields = secret.to_dict_fields()
|
|
self.assertEqual(self.parsed_secret['secret_type'],
|
|
fields['secret_type'])
|
|
self.assertEqual(self.parsed_secret['algorithm'], fields['algorithm'])
|
|
self.assertEqual(self.parsed_secret['creator_id'],
|
|
fields['creator_id'])
|
|
|
|
def test_new_secret_is_created_with_default_secret_type(self):
|
|
secret_spec = dict(self.parsed_secret)
|
|
date_time = datetime.datetime.utcnow().isoformat()
|
|
secret_spec['expiration'] = date_time
|
|
del secret_spec['secret_type']
|
|
secret = models.Secret(secret_spec)
|
|
self.assertEqual(self.parsed_secret['secret_type'], secret.secret_type)
|
|
|
|
|
|
class WhenCreatingNewSecretMetadata(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecretMetadata, self).setUp()
|
|
self.key = 'dog'
|
|
self.value = 'poodle'
|
|
|
|
self.metadata = {
|
|
'key': self.key,
|
|
'value': self.value
|
|
}
|
|
|
|
def test_new_secret_metadata_is_created_from_dict(self):
|
|
secret_meta = models.SecretUserMetadatum(self.key, self.value)
|
|
|
|
self.assertEqual(self.key, secret_meta.key)
|
|
self.assertEqual(self.value, secret_meta.value)
|
|
|
|
fields = secret_meta.to_dict_fields()
|
|
self.assertEqual(self.metadata['key'],
|
|
fields['key'])
|
|
self.assertEqual(self.metadata['value'],
|
|
fields['value'])
|
|
|
|
def test_should_raise_exception_metadata_with_no_key(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretUserMetadatum,
|
|
None,
|
|
self.value)
|
|
|
|
def test_should_raise_exception_metadata_with_no_value(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretUserMetadatum,
|
|
self.key,
|
|
None)
|
|
|
|
|
|
class WhenCreatingNewOrder(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewOrder, self).setUp()
|
|
self.parsed_order = {
|
|
'type': 'certificate',
|
|
'meta': {
|
|
'email': 'email@email.com'
|
|
},
|
|
'sub_status': 'Pending',
|
|
'sub_status_message': 'Waiting for instructions...',
|
|
'creator_id': 'creator12345'
|
|
}
|
|
|
|
def test_new_order_is_created(self):
|
|
order = models.Order(self.parsed_order)
|
|
|
|
self.assertEqual(self.parsed_order['type'], order.type)
|
|
self.assertEqual(self.parsed_order['meta'], order.meta)
|
|
self.assertEqual(self.parsed_order['sub_status'], order.sub_status)
|
|
self.assertEqual(self.parsed_order['creator_id'], order.creator_id)
|
|
self.assertEqual(
|
|
self.parsed_order['sub_status_message'],
|
|
order.sub_status_message
|
|
)
|
|
fields = order.to_dict_fields()
|
|
self.assertEqual(self.parsed_order['sub_status'], fields['sub_status'])
|
|
self.assertEqual(self.parsed_order['type'], fields['type'])
|
|
self.assertEqual(self.parsed_order['creator_id'],
|
|
fields['creator_id'])
|
|
|
|
|
|
class WhenCreatingNewContainer(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewContainer, self).setUp()
|
|
self.parsed_container = {'name': 'name',
|
|
'type': 'generic',
|
|
'secret_refs': [
|
|
{'name': 'test secret 1',
|
|
'secret_ref': '123'},
|
|
{'name': 'test secret 2',
|
|
'secret_ref': '123'},
|
|
{'name': 'test secret 3',
|
|
'secret_ref': '123'}
|
|
],
|
|
'creator_id': 'creator123456'}
|
|
|
|
def test_new_container_is_created_from_dict(self):
|
|
container = models.Container(self.parsed_container)
|
|
self.assertEqual(self.parsed_container['name'], container.name)
|
|
self.assertEqual(self.parsed_container['type'], container.type)
|
|
self.assertEqual(self.parsed_container['creator_id'],
|
|
container.creator_id)
|
|
self.assertEqual(len(self.parsed_container['secret_refs']),
|
|
len(container.container_secrets))
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][0]['name'],
|
|
container.container_secrets[0].name)
|
|
self.assertEqual(self.parsed_container['secret_refs'][0]['secret_ref'],
|
|
container.container_secrets[0].secret_id)
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][1]['name'],
|
|
container.container_secrets[1].name)
|
|
self.assertEqual(self.parsed_container['secret_refs'][1]['secret_ref'],
|
|
container.container_secrets[1].secret_id)
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][2]['name'],
|
|
container.container_secrets[2].name)
|
|
self.assertEqual(self.parsed_container['secret_refs'][2]['secret_ref'],
|
|
container.container_secrets[2].secret_id)
|
|
fields = container.to_dict_fields()
|
|
self.assertEqual(self.parsed_container['name'], fields['name'])
|
|
self.assertEqual(self.parsed_container['type'], fields['type'])
|
|
self.assertEqual(self.parsed_container['creator_id'],
|
|
fields['creator_id'])
|
|
|
|
def test_new_certificate_container_is_created_from_dict(self):
|
|
self.parsed_container['type'] = 'certificate'
|
|
container = models.Container(self.parsed_container)
|
|
self.assertEqual(self.parsed_container['name'], container.name)
|
|
self.assertEqual(self.parsed_container['type'], container.type)
|
|
self.assertEqual(self.parsed_container['creator_id'],
|
|
container.creator_id)
|
|
self.assertEqual(len(self.parsed_container['secret_refs']),
|
|
len(container.container_secrets))
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][0]['name'],
|
|
container.container_secrets[0].name)
|
|
self.assertEqual(self.parsed_container['secret_refs'][0]['secret_ref'],
|
|
container.container_secrets[0].secret_id)
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][1]['name'],
|
|
container.container_secrets[1].name,)
|
|
self.assertEqual(self.parsed_container['secret_refs'][1]['secret_ref'],
|
|
container.container_secrets[1].secret_id)
|
|
|
|
self.assertEqual(self.parsed_container['secret_refs'][2]['name'],
|
|
container.container_secrets[2].name)
|
|
self.assertEqual(self.parsed_container['secret_refs'][2]['secret_ref'],
|
|
container.container_secrets[2].secret_id)
|
|
|
|
def test_parse_secret_ref_uri(self):
|
|
self.parsed_container['secret_refs'][0]['secret_ref'] = (
|
|
'http://localhost:9110/123/secrets/123456')
|
|
container = models.Container(self.parsed_container)
|
|
self.assertEqual('123456', container.container_secrets[0].secret_id)
|
|
|
|
self.parsed_container['secret_refs'][0]['secret_ref'] = (
|
|
'http://localhost:9110/123/secrets/123456/')
|
|
container = models.Container(self.parsed_container)
|
|
self.assertEqual('123456', container.container_secrets[0].secret_id)
|
|
|
|
|
|
class WhenCreatingNewConsumer(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewConsumer, self).setUp()
|
|
self.parsed_consumer = {'name': 'name',
|
|
'URL': 'URL'}
|
|
self.project_id = '12345project'
|
|
self.container_id = '12345container'
|
|
|
|
def test_new_consumer_is_created_from_dict(self):
|
|
consumer = models.ContainerConsumerMetadatum(self.container_id,
|
|
self.project_id,
|
|
self.parsed_consumer)
|
|
self.assertEqual(self.parsed_consumer['name'], consumer.name)
|
|
self.assertEqual(self.parsed_consumer['URL'], consumer.URL)
|
|
self.assertEqual(models.States.ACTIVE, consumer.status)
|
|
|
|
def test_new_consumer_has_correct_hash(self):
|
|
consumer_one = models.ContainerConsumerMetadatum(self.container_id,
|
|
self.project_id,
|
|
self.parsed_consumer)
|
|
consumer_two = models.ContainerConsumerMetadatum(self.container_id,
|
|
self.project_id,
|
|
self.parsed_consumer)
|
|
different_container = '67890container'
|
|
consumer_three = models.ContainerConsumerMetadatum(
|
|
different_container, self.project_id, self.parsed_consumer)
|
|
self.assertEqual(consumer_one.data_hash, consumer_two.data_hash)
|
|
self.assertNotEqual(consumer_one.data_hash, consumer_three.data_hash)
|
|
|
|
|
|
class WhenProcessingJsonBlob(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenProcessingJsonBlob, self).setUp()
|
|
self.json_blob = models.JsonBlob()
|
|
|
|
def test_process_bind_param_w_dict(self):
|
|
res = self.json_blob.process_bind_param({'test': True}, None)
|
|
self.assertEqual('{"test": true}', res)
|
|
|
|
def test_process_result_value_w_json_str(self):
|
|
res = self.json_blob.process_result_value('{"test": true}', None)
|
|
self.assertTrue(res.get('test'))
|
|
|
|
|
|
class WhenCreatingOrderRetryTask(utils.BaseTestCase):
|
|
|
|
def test_create_new_order_task(self):
|
|
order = models.Order({
|
|
'type': 'certificate',
|
|
'meta': {
|
|
'email': 'email@email.com'
|
|
},
|
|
'sub_status': 'Pending',
|
|
'sub_status_message': 'Waiting for instructions...'
|
|
})
|
|
at = datetime.datetime.utcnow()
|
|
order_retry_task = models.OrderRetryTask()
|
|
order_retry_task.order_id = order.id
|
|
order_retry_task.retry_task = "foobar"
|
|
order_retry_task.retry_at = at
|
|
order_retry_task.retry_args = ["one", "two"]
|
|
order_retry_task.retry_kwargs = {"three": "four"}
|
|
|
|
self.assertEqual(order.id, order_retry_task.order_id)
|
|
self.assertEqual("foobar", order_retry_task.retry_task)
|
|
self.assertEqual(at, order_retry_task.retry_at)
|
|
self.assertEqual(
|
|
["one", "two"],
|
|
order_retry_task.retry_args,
|
|
)
|
|
self.assertEqual(
|
|
{"three": "four"},
|
|
order_retry_task.retry_kwargs,
|
|
)
|
|
|
|
|
|
class WhenCreatingNewCertificateAuthority(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewCertificateAuthority, self).setUp()
|
|
expiration = (datetime.datetime.utcnow() +
|
|
datetime.timedelta(minutes=10))
|
|
self.parsed_ca = {'plugin_name': 'dogtag_plugin',
|
|
'plugin_ca_id': 'ca_master',
|
|
'expiration': expiration.isoformat(),
|
|
'name': 'Dogtag CA',
|
|
'description': 'Master CA for Dogtag plugin',
|
|
'ca_signing_certificate': 'XXXXX',
|
|
'intermediates': 'YYYYY',
|
|
'creator_id': 'user12345',
|
|
'parent_ca_id': '12330-223-22',
|
|
'project_id': '12345'}
|
|
|
|
def test_new_ca_is_created_from_dict(self):
|
|
ca = models.CertificateAuthority(self.parsed_ca)
|
|
self.assertEqual(self.parsed_ca['plugin_name'], ca.plugin_name)
|
|
self.assertEqual(self.parsed_ca['plugin_ca_id'], ca.plugin_ca_id)
|
|
self.assertEqual(self.parsed_ca['name'], ca.ca_meta['name'].value)
|
|
self.assertEqual(self.parsed_ca['description'],
|
|
ca.ca_meta['description'].value)
|
|
self.assertEqual(self.parsed_ca['ca_signing_certificate'],
|
|
ca.ca_meta['ca_signing_certificate'].value)
|
|
self.assertEqual(self.parsed_ca['intermediates'],
|
|
ca.ca_meta['intermediates'].value)
|
|
self.assertIsInstance(ca.expiration, datetime.datetime)
|
|
self.assertEqual(ca.created_at, ca.updated_at)
|
|
self.assertEqual(self.parsed_ca['creator_id'], ca.creator_id)
|
|
self.assertEqual(self.parsed_ca['project_id'], ca.project_id)
|
|
|
|
|
|
class WhenCreatingNewProjectCertificateAuthority(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewProjectCertificateAuthority, self).setUp()
|
|
expiration = (datetime.datetime.utcnow() +
|
|
datetime.timedelta(minutes=10))
|
|
self.parsed_ca = {'plugin_name': 'dogtag_plugin',
|
|
'plugin_ca_id': 'ca_master',
|
|
'expiration': expiration.isoformat(),
|
|
'name': 'Dogtag CA',
|
|
'description': 'Master CA for Dogtag plugin',
|
|
'ca_signing_certificate': 'XXXXX',
|
|
'intermediates': 'YYYYY'}
|
|
|
|
def test_create_new_project_ca(self):
|
|
ca = models.CertificateAuthority(self.parsed_ca)
|
|
ca.id = '67890'
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
project_ca = models.ProjectCertificateAuthority(project.id, ca.id)
|
|
|
|
self.assertEqual(ca.id, project_ca.ca_id)
|
|
self.assertEqual(project.id, project_ca.project_id)
|
|
|
|
|
|
class WhenCreatingNewPreferredCertificateAuthority(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewPreferredCertificateAuthority, self).setUp()
|
|
expiration = (datetime.datetime.utcnow() +
|
|
datetime.timedelta(minutes=10))
|
|
self.parsed_ca = {'plugin_name': 'dogtag_plugin',
|
|
'plugin_ca_id': 'ca_master',
|
|
'expiration': expiration.isoformat(),
|
|
'name': 'Dogtag CA',
|
|
'description': 'Master CA for Dogtag plugin',
|
|
'ca_signing_certificate': 'XXXXX',
|
|
'intermediates': 'YYYYY'}
|
|
|
|
def test_create_new_preferred_ca(self):
|
|
ca = models.CertificateAuthority(self.parsed_ca)
|
|
ca.id = '67890'
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
preferred_ca = models.PreferredCertificateAuthority(project.id, ca.id)
|
|
|
|
self.assertEqual(ca.id, preferred_ca.ca_id)
|
|
self.assertEqual(project.id, preferred_ca.project_id)
|
|
|
|
|
|
class WhenCreatingNewSecretACL(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecretACL, self).setUp()
|
|
self.secret_id = 'secret123456'
|
|
self.user_ids = ['user12345', 'user67890']
|
|
self.operation = 'read'
|
|
self.project_access = True
|
|
|
|
def test_new_secretacl_for_given_all_input(self):
|
|
acl = models.SecretACL(self.secret_id, self.operation,
|
|
self.project_access, self.user_ids)
|
|
self.assertEqual(self.secret_id, acl.secret_id)
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertEqual(self.project_access, acl.project_access)
|
|
self.assertTrue(all(acl_user.user_id in self.user_ids for acl_user
|
|
in acl.acl_users))
|
|
|
|
def test_new_secretacl_check_to_dict_fields(self):
|
|
acl = models.SecretACL(self.secret_id, self.operation,
|
|
self.project_access, self.user_ids)
|
|
self.assertEqual(self.secret_id, acl.to_dict_fields()['secret_id'])
|
|
self.assertEqual(self.operation, acl.to_dict_fields()['operation'])
|
|
self.assertEqual(self.project_access,
|
|
acl.to_dict_fields()['project_access'])
|
|
self.assertTrue(all(user_id in self.user_ids for user_id in
|
|
acl.to_dict_fields()['users']))
|
|
self.assertIsNone(acl.to_dict_fields()['acl_id'])
|
|
|
|
def test_new_secretacl_for_bare_minimum_input(self):
|
|
acl = models.SecretACL(self.secret_id, self.operation,
|
|
None, None)
|
|
self.assertEqual(self.secret_id, acl.secret_id)
|
|
self.assertEqual(0, len(acl.acl_users))
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertIsNone(acl.project_access)
|
|
|
|
def test_new_secretacl_with_duplicate_userids_input(self):
|
|
user_ids = list(self.user_ids)
|
|
user_ids *= 2 # duplicate ids
|
|
acl = models.SecretACL(self.secret_id, self.operation,
|
|
None, user_ids=user_ids)
|
|
self.assertEqual(self.secret_id, acl.secret_id)
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertIsNone(acl.project_access)
|
|
self.assertEqual(2, len(acl.acl_users))
|
|
|
|
def test_should_throw_exception_missing_secret_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretACL, None, 'read',
|
|
['user246'], None)
|
|
|
|
def test_should_throw_exception_missing_operation(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretACL, self.secret_id, None,
|
|
None, ['user246'])
|
|
|
|
def test_new_secretacl_expect_user_ids_as_list(self):
|
|
acl = models.SecretACL(self.secret_id, self.operation,
|
|
None, {'aUser': '12345'})
|
|
self.assertEqual(0, len(acl.acl_users))
|
|
|
|
|
|
class WhenCreatingNewContainerACL(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewContainerACL, self).setUp()
|
|
self.container_id = 'container123456'
|
|
self.user_ids = ['user12345', 'user67890']
|
|
self.operation = 'read'
|
|
self.project_access = True
|
|
|
|
def test_new_containeracl_for_given_all_input(self):
|
|
acl = models.ContainerACL(self.container_id, self.operation,
|
|
self.project_access, self.user_ids)
|
|
self.assertEqual(self.container_id, acl.container_id)
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertEqual(self.project_access, acl.project_access)
|
|
self.assertTrue(all(acl_user.user_id in self.user_ids for acl_user
|
|
in acl.acl_users))
|
|
|
|
def test_new_containeracl_check_to_dict_fields(self):
|
|
acl = models.ContainerACL(self.container_id, self.operation,
|
|
self.project_access, self.user_ids)
|
|
self.assertEqual(self.container_id,
|
|
acl.to_dict_fields()['container_id'])
|
|
self.assertEqual(self.operation, acl.to_dict_fields()['operation'])
|
|
self.assertEqual(self.project_access,
|
|
acl.to_dict_fields()['project_access'])
|
|
self.assertTrue(all(user_id in self.user_ids for user_id
|
|
in acl.to_dict_fields()['users']))
|
|
self.assertIsNone(acl.to_dict_fields()['acl_id'])
|
|
|
|
def test_new_containeracl_for_bare_minimum_input(self):
|
|
acl = models.ContainerACL(self.container_id, self.operation,
|
|
None, None)
|
|
self.assertEqual(self.container_id, acl.container_id)
|
|
self.assertEqual(0, len(acl.acl_users))
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertIsNone(acl.project_access)
|
|
|
|
def test_new_containeracl_with_duplicate_userids_input(self):
|
|
user_ids = list(self.user_ids)
|
|
user_ids *= 2 # duplicate ids
|
|
acl = models.ContainerACL(self.container_id, self.operation,
|
|
True, user_ids=user_ids)
|
|
self.assertEqual(self.container_id, acl.container_id)
|
|
self.assertEqual(self.operation, acl.operation)
|
|
self.assertTrue(acl.project_access)
|
|
self.assertEqual(2, len(acl.acl_users))
|
|
|
|
def test_should_throw_exception_missing_container_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ContainerACL, None, 'read',
|
|
None, ['user246'])
|
|
|
|
def test_should_throw_exception_missing_operation(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ContainerACL, self.container_id, None,
|
|
None, ['user246'])
|
|
|
|
def test_new_containeracl_expect_user_ids_as_list(self):
|
|
acl = models.ContainerACL(self.container_id, self.operation,
|
|
None, {'aUser': '12345'})
|
|
self.assertEqual(0, len(acl.acl_users))
|
|
|
|
|
|
class WhenCreatingNewSecretACLUser(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecretACLUser, self).setUp()
|
|
self.secret_acl_id = 'secret_acl_123456'
|
|
self.user_ids = ['user12345', 'user67890']
|
|
|
|
def test_new_secretacl_user_for_given_all_input(self):
|
|
acl_user = models.SecretACLUser(self.secret_acl_id, self.user_ids[0])
|
|
|
|
self.assertEqual(self.secret_acl_id, acl_user.acl_id)
|
|
self.assertEqual(self.user_ids[0], acl_user.user_id)
|
|
self.assertEqual(models.States.ACTIVE, acl_user.status)
|
|
|
|
def test_new_secretacl_user_check_to_dict_fields(self):
|
|
acl_user = models.SecretACLUser(self.secret_acl_id, self.user_ids[1])
|
|
|
|
self.assertEqual(self.secret_acl_id,
|
|
acl_user.to_dict_fields()['acl_id'])
|
|
self.assertEqual(self.user_ids[1],
|
|
acl_user.to_dict_fields()['user_id'])
|
|
self.assertEqual(models.States.ACTIVE,
|
|
acl_user.to_dict_fields()['status'])
|
|
|
|
def test_should_throw_exception_missing_user_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretACLUser, self.secret_acl_id,
|
|
None)
|
|
|
|
|
|
class WhenCreatingNewContainerACLUser(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewContainerACLUser, self).setUp()
|
|
self.container_acl_id = 'container_acl_123456'
|
|
self.user_ids = ['user12345', 'user67890']
|
|
|
|
def test_new_secretacl_user_for_given_all_input(self):
|
|
acl_user = models.ContainerACLUser(self.container_acl_id,
|
|
self.user_ids[0])
|
|
|
|
self.assertEqual(self.container_acl_id, acl_user.acl_id)
|
|
self.assertEqual(self.user_ids[0], acl_user.user_id)
|
|
self.assertEqual(models.States.ACTIVE, acl_user.status)
|
|
|
|
def test_new_secretacl_user_check_to_dict_fields(self):
|
|
acl_user = models.ContainerACLUser(self.container_acl_id,
|
|
self.user_ids[1])
|
|
|
|
self.assertEqual(self.container_acl_id,
|
|
acl_user.to_dict_fields()['acl_id'])
|
|
self.assertEqual(self.user_ids[1],
|
|
acl_user.to_dict_fields()['user_id'])
|
|
self.assertEqual(models.States.ACTIVE,
|
|
acl_user.to_dict_fields()['status'])
|
|
|
|
def test_should_throw_exception_missing_user_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ContainerACLUser, self.container_acl_id,
|
|
None)
|
|
|
|
|
|
class WhenCreatingNewProjectQuotas(utils.BaseTestCase):
|
|
def test_create_new_project_quotas(self):
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
project.external_id = '67890'
|
|
parsed_project_quotas = {
|
|
'secrets': 101,
|
|
'orders': 102,
|
|
'containers': 103,
|
|
'consumers': 105,
|
|
'cas': 106}
|
|
project_quotas = models.ProjectQuotas(project.id,
|
|
parsed_project_quotas)
|
|
|
|
self.assertEqual('12345', project_quotas.project_id)
|
|
self.assertEqual(101, project_quotas.secrets)
|
|
self.assertEqual(102, project_quotas.orders)
|
|
self.assertEqual(103, project_quotas.containers)
|
|
self.assertEqual(105, project_quotas.consumers)
|
|
self.assertEqual(106, project_quotas.cas)
|
|
|
|
def test_create_new_project_quotas_with_all_default_quotas(self):
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
project.external_id = '67890'
|
|
project_quotas = models.ProjectQuotas(project.id,
|
|
None)
|
|
|
|
self.assertEqual('12345', project_quotas.project_id)
|
|
self.assertIsNone(project_quotas.secrets)
|
|
self.assertIsNone(project_quotas.orders)
|
|
self.assertIsNone(project_quotas.containers)
|
|
self.assertIsNone(project_quotas.consumers)
|
|
self.assertIsNone(project_quotas.cas)
|
|
|
|
def test_create_new_project_quotas_with_some_default_quotas(self):
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
project.external_id = '67890'
|
|
parsed_project_quotas = {
|
|
'secrets': 101,
|
|
'containers': 103,
|
|
'consumers': 105}
|
|
project_quotas = models.ProjectQuotas(project.id,
|
|
parsed_project_quotas)
|
|
|
|
self.assertEqual('12345', project_quotas.project_id)
|
|
self.assertEqual(101, project_quotas.secrets)
|
|
self.assertIsNone(project_quotas.orders)
|
|
self.assertEqual(103, project_quotas.containers)
|
|
self.assertEqual(105, project_quotas.consumers)
|
|
self.assertIsNone(project_quotas.cas)
|
|
|
|
def test_should_throw_exception_missing_project_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ProjectQuotas, None, None)
|
|
|
|
def test_project_quotas_check_to_dict_fields(self):
|
|
project = models.Project()
|
|
project.id = '12345'
|
|
project.external_id = '67890'
|
|
parsed_project_quotas = {
|
|
'secrets': 101,
|
|
'orders': 102,
|
|
'containers': 103,
|
|
'consumers': 105,
|
|
'cas': 106}
|
|
project_quotas = models.ProjectQuotas(project.id,
|
|
parsed_project_quotas)
|
|
self.assertEqual(project.id,
|
|
project_quotas.to_dict_fields()['project_id'])
|
|
self.assertEqual(101,
|
|
project_quotas.to_dict_fields()['secrets'])
|
|
self.assertEqual(102,
|
|
project_quotas.to_dict_fields()['orders'])
|
|
self.assertEqual(103,
|
|
project_quotas.to_dict_fields()['containers'])
|
|
self.assertEqual(105,
|
|
project_quotas.to_dict_fields()['consumers'])
|
|
self.assertEqual(106,
|
|
project_quotas.to_dict_fields()['cas'])
|
|
|
|
|
|
class WhenCreatingNewSecretStores(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecretStores, self).setUp()
|
|
|
|
def test_new_secret_stores_for_all_input(self):
|
|
name = "db backend"
|
|
store_plugin = 'store_crypto'
|
|
crypto_plugin = 'simple_crypto'
|
|
ss = models.SecretStores(name, store_plugin, crypto_plugin,
|
|
global_default=True)
|
|
|
|
self.assertEqual(store_plugin, ss.store_plugin)
|
|
self.assertEqual(crypto_plugin, ss.crypto_plugin)
|
|
self.assertEqual(name, ss.name)
|
|
self.assertTrue(ss.global_default)
|
|
self.assertEqual(models.States.ACTIVE, ss.status)
|
|
|
|
def test_new_secret_stores_required_input_only(self):
|
|
store_plugin = 'store_crypto'
|
|
name = "db backend"
|
|
ss = models.SecretStores(name, store_plugin)
|
|
|
|
self.assertEqual(store_plugin, ss.store_plugin)
|
|
self.assertEqual(name, ss.name)
|
|
self.assertIsNone(ss.crypto_plugin)
|
|
self.assertIsNone(ss.global_default) # False default is not used
|
|
self.assertEqual(models.States.ACTIVE, ss.status)
|
|
|
|
def test_should_throw_exception_missing_store_plugin(self):
|
|
name = "db backend"
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretStores, name, None)
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretStores, name, "")
|
|
|
|
def test_should_throw_exception_missing_name(self):
|
|
store_plugin = 'store_crypto'
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretStores, None, store_plugin)
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.SecretStores, "", store_plugin)
|
|
|
|
def test_secret_stores_check_to_dict_fields(self):
|
|
name = "pkcs11 backend"
|
|
store_plugin = 'store_crypto'
|
|
crypto_plugin = 'p11_crypto'
|
|
ss = models.SecretStores(name, store_plugin, crypto_plugin,
|
|
global_default=True)
|
|
self.assertEqual(store_plugin,
|
|
ss.to_dict_fields()['store_plugin'])
|
|
self.assertEqual(crypto_plugin,
|
|
ss.to_dict_fields()['crypto_plugin'])
|
|
self.assertTrue(ss.to_dict_fields()['global_default'])
|
|
self.assertEqual(models.States.ACTIVE,
|
|
ss.to_dict_fields()['status'])
|
|
self.assertEqual(name, ss.to_dict_fields()['name'])
|
|
|
|
# check with required input only
|
|
ss = models.SecretStores(name, store_plugin)
|
|
self.assertEqual(store_plugin,
|
|
ss.to_dict_fields()['store_plugin'])
|
|
self.assertIsNone(ss.to_dict_fields()['crypto_plugin'])
|
|
self.assertIsNone(ss.to_dict_fields()['global_default'])
|
|
self.assertEqual(models.States.ACTIVE,
|
|
ss.to_dict_fields()['status'])
|
|
self.assertEqual(name, ss.to_dict_fields()['name'])
|
|
|
|
|
|
class WhenCreatingNewProjectSecretStore(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewProjectSecretStore, self).setUp()
|
|
|
|
def test_new_project_secret_store(self):
|
|
|
|
project_id = 'proj_123456'
|
|
name = "db backend"
|
|
store_plugin = 'store_crypto'
|
|
crypto_plugin = 'simple_crypto'
|
|
ss = models.SecretStores(name, store_plugin, crypto_plugin,
|
|
global_default=True)
|
|
ss.id = "ss_123456"
|
|
|
|
project_ss = models.ProjectSecretStore(project_id, ss.id)
|
|
self.assertEqual(project_id, project_ss.project_id)
|
|
self.assertEqual(ss.id, project_ss.secret_store_id)
|
|
self.assertEqual(models.States.ACTIVE, project_ss.status)
|
|
|
|
def test_should_throw_exception_missing_project_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ProjectSecretStore, None, "ss_123456")
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ProjectSecretStore, "", "ss_123456")
|
|
|
|
def test_should_throw_exception_missing_secret_store_id(self):
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ProjectSecretStore, "proj_123456", None)
|
|
self.assertRaises(exception.MissingArgumentError,
|
|
models.ProjectSecretStore, "proj_123456", "")
|
|
|
|
def test_project_secret_store_check_to_dict_fields(self):
|
|
project_id = 'proj_123456'
|
|
secret_store_id = 'ss_7689012'
|
|
project_ss = models.ProjectSecretStore(project_id, secret_store_id)
|
|
self.assertEqual(project_id,
|
|
project_ss.to_dict_fields()['project_id'])
|
|
self.assertEqual(secret_store_id,
|
|
project_ss.to_dict_fields()['secret_store_id'])
|
|
self.assertEqual(models.States.ACTIVE,
|
|
project_ss.to_dict_fields()['status'])
|
|
|
|
|
|
class WhenCreatingNewSecretConsumer(utils.BaseTestCase):
|
|
def setUp(self):
|
|
super(WhenCreatingNewSecretConsumer, self).setUp()
|
|
self.secret_id = "12345secret"
|
|
self.project_id = "12345project"
|
|
self.service = "12345service"
|
|
self.resource_type = "12345resource_type"
|
|
self.resource_id = "12345resource_id"
|
|
|
|
def test_new_secret_consumer(self):
|
|
consumer = models.SecretConsumerMetadatum(
|
|
self.secret_id,
|
|
self.project_id,
|
|
self.service,
|
|
self.resource_type,
|
|
self.resource_id
|
|
)
|
|
self.assertEqual(self.secret_id, consumer.secret_id)
|
|
self.assertEqual(self.project_id, consumer.project_id)
|
|
self.assertEqual(self.service, consumer.service)
|
|
self.assertEqual(self.resource_type, consumer.resource_type)
|
|
self.assertEqual(self.resource_id, consumer.resource_id)
|
|
self.assertEqual(models.States.ACTIVE, consumer.status)
|
|
|
|
def test_to_dict_fields(self):
|
|
consumer = models.SecretConsumerMetadatum(
|
|
self.secret_id,
|
|
self.project_id,
|
|
self.service,
|
|
self.resource_type,
|
|
self.resource_id
|
|
)
|
|
fields = consumer.to_dict_fields()
|
|
self.assertEqual(self.service, fields["service"])
|
|
self.assertEqual(self.resource_type, fields["resource_type"])
|
|
self.assertEqual(self.resource_id, fields["resource_id"])
|
|
|
|
def test_should_raise_exception_when_missing_arguments(self):
|
|
self.assertRaises(
|
|
exception.MissingArgumentError,
|
|
models.SecretConsumerMetadatum,
|
|
None,
|
|
self.project_id,
|
|
self.service,
|
|
self.resource_type,
|
|
self.resource_id,
|
|
)
|
|
self.assertRaises(
|
|
exception.MissingArgumentError,
|
|
models.SecretConsumerMetadatum,
|
|
self.secret_id,
|
|
None,
|
|
self.service,
|
|
self.resource_type,
|
|
self.resource_id,
|
|
)
|
|
self.assertRaises(
|
|
exception.MissingArgumentError,
|
|
models.SecretConsumerMetadatum,
|
|
self.secret_id,
|
|
self.project_id,
|
|
None,
|
|
self.resource_type,
|
|
self.resource_id,
|
|
)
|
|
self.assertRaises(
|
|
exception.MissingArgumentError,
|
|
models.SecretConsumerMetadatum,
|
|
self.secret_id,
|
|
self.project_id,
|
|
self.service,
|
|
None,
|
|
self.resource_id,
|
|
)
|
|
self.assertRaises(
|
|
exception.MissingArgumentError,
|
|
models.SecretConsumerMetadatum,
|
|
self.secret_id,
|
|
self.project_id,
|
|
self.service,
|
|
self.resource_type,
|
|
None,
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|