Add remaining verification unit test and keep files/mods.
This commit is contained in:
@@ -26,13 +26,3 @@ class BaseEntityManager(object):
|
||||
for k in dictionary.keys():
|
||||
if dictionary[k] is None:
|
||||
dictionary.pop(k)
|
||||
|
||||
def total(self):
|
||||
"""
|
||||
Returns the total number of entities stored in Barbican.
|
||||
"""
|
||||
href = '{0}/{1}'.format(self.api.base_url, self.entity)
|
||||
params = {'limit': 0, 'offset': 0}
|
||||
resp = self.api.get(href, params)
|
||||
|
||||
return resp['total']
|
||||
|
||||
@@ -15,6 +15,10 @@
|
||||
from keystoneclient.v2_0 import client as ksclient
|
||||
from keystoneclient import exceptions
|
||||
|
||||
from barbicanclient.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AuthException(Exception):
|
||||
"""Raised when authorization fails."""
|
||||
@@ -24,14 +28,14 @@ class AuthException(Exception):
|
||||
|
||||
class KeystoneAuthV2(object):
|
||||
def __init__(self, auth_url='', username='', password='',
|
||||
tenant_name='', tenant_id=''):
|
||||
tenant_name='', tenant_id='', keystone=None):
|
||||
if not all([auth_url, username, password, tenant_name or tenant_id]):
|
||||
raise ValueError('Please provide auth_url, username, password,'
|
||||
' and tenant_id or tenant_name)')
|
||||
self._keystone = ksclient.Client(username=username,
|
||||
password=password,
|
||||
tenant_name=tenant_name,
|
||||
auth_url=auth_url)
|
||||
self._keystone = keystone or ksclient.Client(username=username,
|
||||
password=password,
|
||||
tenant_name=tenant_name,
|
||||
auth_url=auth_url)
|
||||
self._barbican_url = None
|
||||
#TODO(dmend): make these configurable
|
||||
self._service_type = 'keystore'
|
||||
|
||||
@@ -46,48 +46,3 @@ def parse_args(args=None, usage=None, default_config_files=None):
|
||||
version=__version__,
|
||||
usage=usage,
|
||||
default_config_files=default_config_files)
|
||||
|
||||
|
||||
def setup_logging():
|
||||
"""
|
||||
Sets up the logging options
|
||||
"""
|
||||
|
||||
if CONF.log_config:
|
||||
# Use a logging configuration file for all settings...
|
||||
if os.path.exists(CONF.log_config):
|
||||
logging.config.fileConfig(CONF.log_config)
|
||||
return
|
||||
else:
|
||||
raise RuntimeError("Unable to locate specified logging "
|
||||
"config file: %s" % CONF.log_config)
|
||||
|
||||
root_logger = logging.root
|
||||
if CONF.debug:
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
elif CONF.verbose:
|
||||
root_logger.setLevel(logging.INFO)
|
||||
else:
|
||||
root_logger.setLevel(logging.WARNING)
|
||||
|
||||
formatter = logging.Formatter(CONF.log_format, CONF.log_date_format)
|
||||
|
||||
if CONF.use_syslog:
|
||||
try:
|
||||
facility = getattr(logging.handlers.SysLogHandler,
|
||||
CONF.syslog_log_facility)
|
||||
except AttributeError:
|
||||
raise ValueError(_("Invalid syslog facility"))
|
||||
|
||||
handler = logging.handlers.SysLogHandler(address='/dev/log',
|
||||
facility=facility)
|
||||
elif CONF.log_file:
|
||||
logfile = CONF.log_file
|
||||
if CONF.log_dir:
|
||||
logfile = os.path.join(CONF.log_dir, logfile)
|
||||
handler = logging.handlers.WatchedFileHandler(logfile)
|
||||
else:
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
|
||||
handler.setFormatter(formatter)
|
||||
root_logger.addHandler(handler)
|
||||
|
||||
@@ -33,6 +33,7 @@ class Keep:
|
||||
self._add_store_args()
|
||||
self._add_get_args()
|
||||
self._add_list_args()
|
||||
self._add_verify_args()
|
||||
self._add_delete_args()
|
||||
|
||||
def _get_main_parser(self):
|
||||
@@ -41,9 +42,9 @@ class Keep:
|
||||
)
|
||||
parser.add_argument('command',
|
||||
metavar='<entity>',
|
||||
choices=['order', 'secret'],
|
||||
choices=['order', 'secret', 'verification'],
|
||||
help='Entity used for command, e.g.,'
|
||||
' order, secret.')
|
||||
' order, secret, verification.')
|
||||
auth_group = parser.add_mutually_exclusive_group()
|
||||
auth_group.add_argument('--no-auth', '-N', action='store_true',
|
||||
help='Do not use authentication.')
|
||||
@@ -73,6 +74,22 @@ class Keep:
|
||||
help='Defaults to env[BARBICAN_ENDPOINT].')
|
||||
return parser
|
||||
|
||||
def _add_verify_args(self):
|
||||
verify_parser = self.subparsers.add_parser('verify',
|
||||
help='Create a new verification.')
|
||||
verify_parser.add_argument('--type', '-t', default='image',
|
||||
help='resource type to verify, such as "image".')
|
||||
|
||||
verify_parser.add_argument('--ref', '-r',
|
||||
help='reference URI to resource to verify.')
|
||||
|
||||
verify_parser.add_argument('--action', '-a', default='vm_attach',
|
||||
help='action to perform on resource, such as "vm_attach".')
|
||||
|
||||
verify_parser.add_argument('--impersonation', '-i', default=True,
|
||||
help='is impersonation allowed for the resource.')
|
||||
verify_parser.set_defaults(func=self.verify)
|
||||
|
||||
def _add_create_args(self):
|
||||
create_parser = self.subparsers.add_parser('create',
|
||||
help='Create a new order.')
|
||||
@@ -131,19 +148,19 @@ class Keep:
|
||||
def _add_delete_args(self):
|
||||
delete_parser = self.subparsers.add_parser(
|
||||
'delete',
|
||||
help='Delete a secret or an order by providing its href.'
|
||||
help='Delete a secret, order or verification by providing its href.'
|
||||
)
|
||||
delete_parser.add_argument('URI', help='The URI reference for the'
|
||||
' secret or order')
|
||||
' secret, order or verification')
|
||||
delete_parser.set_defaults(func=self.delete)
|
||||
|
||||
def _add_get_args(self):
|
||||
get_parser = self.subparsers.add_parser(
|
||||
'get',
|
||||
help='Retrieve a secret or an order by providing its URI.'
|
||||
help='Retrieve a secret, order or verification by providing its URI.'
|
||||
)
|
||||
get_parser.add_argument('URI', help='The URI reference for the secret'
|
||||
' or order.')
|
||||
get_parser.add_argument('URI', help='The URI reference for the secret, '
|
||||
'order or verification.')
|
||||
get_parser.add_argument('--decrypt', '-d', help='if specified, keep'
|
||||
' will retrieve the unencrypted secret data;'
|
||||
' the data type can be specified with'
|
||||
@@ -159,7 +176,7 @@ class Keep:
|
||||
|
||||
def _add_list_args(self):
|
||||
list_parser = self.subparsers.add_parser('list',
|
||||
help='List secrets or orders')
|
||||
help='List secrets, orders or verifications')
|
||||
list_parser.add_argument('--limit', '-l', default=10, help='specify t'
|
||||
'he limit to the number of items to list per'
|
||||
' page (default: %(default)s; maximum: 100)',
|
||||
@@ -200,8 +217,13 @@ class Keep:
|
||||
def delete(self, args):
|
||||
if args.command == 'secret':
|
||||
self.client.secrets.delete(args.URI)
|
||||
else:
|
||||
elif args.command == 'verification':
|
||||
self.client.verifications.delete(args.URI)
|
||||
elif args.command == 'order':
|
||||
self.client.orders.delete(args.URI)
|
||||
else:
|
||||
self.parser.exit(status=1, message='ERROR: delete is only '
|
||||
'supported for secrets, orders or verifications\n')
|
||||
|
||||
def get(self, args):
|
||||
if args.command == 'secret':
|
||||
@@ -210,19 +232,41 @@ class Keep:
|
||||
args.payload_content_type)
|
||||
else:
|
||||
print self.client.secrets.get(args.URI)
|
||||
else:
|
||||
elif args.command == 'verification':
|
||||
print self.client.verifications.get(args.URI)
|
||||
elif args.command == 'order':
|
||||
print self.client.orders.get(args.URI)
|
||||
else:
|
||||
self.parser.exit(status=1, message='ERROR: get is only '
|
||||
'supported for secrets, orders or verifications\n')
|
||||
|
||||
def list(self, args):
|
||||
if args.command == 'secret':
|
||||
ls = self.client.secrets.list(args.limit, args.offset)
|
||||
else:
|
||||
elif args.command == 'verification':
|
||||
ls = self.client.verifications.list(args.limit, args.offset)
|
||||
elif args.command == 'order':
|
||||
ls = self.client.orders.list(args.limit, args.offset)
|
||||
else:
|
||||
self.parser.exit(status=1, message='ERROR: get list is only '
|
||||
'supported for secrets, orders or verifications\n')
|
||||
for obj in ls:
|
||||
print obj
|
||||
print '{0}s displayed: {1} - offset: {2}'.format(args.command, len(ls),
|
||||
args.offset)
|
||||
|
||||
def verify(self, args):
|
||||
if args.command == 'verification':
|
||||
verify = self.client.verifications\
|
||||
.create(resource_type=args.type,
|
||||
resource_ref=args.ref,
|
||||
resource_action=args.action,
|
||||
impersonation_allowed=args.impersonation)
|
||||
print verify
|
||||
else:
|
||||
self.parser.exit(status=1, message='ERROR: verify is only '
|
||||
'supported for verifications\n')
|
||||
|
||||
def execute(self, **kwargs):
|
||||
args = self.parser.parse_args(kwargs.get('argv'))
|
||||
if args.no_auth:
|
||||
|
||||
@@ -12,12 +12,35 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import unittest2 as unittest
|
||||
|
||||
from barbicanclient.common import auth
|
||||
|
||||
|
||||
class WhenTestingKeystoneAuthentication(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.keystone_client = mock.MagicMock()
|
||||
|
||||
self.auth_url = 'https://www.yada.com'
|
||||
self.username = 'user'
|
||||
self.password = 'pw'
|
||||
self.tenant_id = '1234'
|
||||
|
||||
self.keystone_auth = auth.KeystoneAuthV2(auth_url=self.auth_url,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
tenant_id=self.tenant_id,
|
||||
keystone=
|
||||
self.keystone_client)
|
||||
|
||||
def test_endpoint_username_password_tenant_are_required(self):
|
||||
with self.assertRaises(ValueError):
|
||||
keystone = auth.KeystoneAuthV2()
|
||||
|
||||
def test_get_barbican_url(self):
|
||||
barbican_url = 'https://www.barbican.com'
|
||||
self.keystone_auth._barbican_url = barbican_url
|
||||
self.assertEquals(barbican_url, self.keystone_auth.barbican_url)
|
||||
|
||||
@@ -13,13 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
import mock
|
||||
import unittest2 as unittest
|
||||
|
||||
from barbicanclient import client
|
||||
from barbicanclient import secrets
|
||||
from barbicanclient.openstack.common import timeutils
|
||||
from barbicanclient.openstack.common import jsonutils
|
||||
|
||||
@@ -140,11 +137,12 @@ class WhenTestingClientWithSession(unittest.TestCase):
|
||||
self.tenant_id = '1234567'
|
||||
|
||||
self.entity = 'dummy-entity'
|
||||
self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/"
|
||||
base = self.endpoint + self.tenant_id + "/"
|
||||
self.entity_base = base + self.entity + "/"
|
||||
self.entity_href = self.entity_base + '1234'
|
||||
|
||||
self.entity_name = 'name'
|
||||
self.entity_dict = {'name': self.entity_name}
|
||||
self.entity_dict = {'name': self.entity_name}
|
||||
|
||||
self.session = mock.MagicMock()
|
||||
|
||||
@@ -220,178 +218,9 @@ class BaseEntityResource(unittest.TestCase):
|
||||
self.tenant_id = '1234567'
|
||||
|
||||
self.entity = entity
|
||||
self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/"
|
||||
base = self.endpoint + self.tenant_id + "/"
|
||||
self.entity_base = base + self.entity + "/"
|
||||
self.entity_href = self.entity_base + '1234'
|
||||
|
||||
self.api = mock.MagicMock()
|
||||
#
|
||||
#
|
||||
# class WhenTestingSecretsManager(BaseEntityResource):
|
||||
#
|
||||
# def setUp(self):
|
||||
# self._setUp('secrets')
|
||||
#
|
||||
# self.secret = SecretData()
|
||||
#
|
||||
# self.manager = secrets.SecretManager(self.api)
|
||||
#
|
||||
# def test_should_store(self):
|
||||
# self.api.post.return_value = {'secret_ref': self.entity_href}
|
||||
#
|
||||
# secret_href = self.manager\
|
||||
# .store(name=self.secret.name,
|
||||
# payload=self.secret.payload,
|
||||
# payload_content_type=self.secret.content)
|
||||
#
|
||||
# self.assertEqual(self.entity_href, secret_href)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.post.call_args
|
||||
# entity_resp = args[0]
|
||||
# self.assertEqual(self.entity, entity_resp)
|
||||
#
|
||||
# # Verify that correct information was sent in the call.
|
||||
# secret_resp = args[1]
|
||||
# self.assertEqual(self.secret.name, secret_resp['name'])
|
||||
# self.assertEqual(self.secret.payload, secret_resp['payload'])
|
||||
# self.assertEqual(self.secret.payload_content_type,
|
||||
# secret_resp['payload_content_type'])
|
||||
#
|
||||
# def test_should_get(self):
|
||||
# self.api.get.return_value = self.secret.get_dict(self.entity_href)
|
||||
#
|
||||
# secret = self.manager.get(secret_ref=self.entity_href)
|
||||
# self.assertIsInstance(secret, secrets.Secret)
|
||||
# self.assertEqual(self.entity_href, secret.secret_ref)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.get.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# def test_should_decrypt_with_content_type(self):
|
||||
# decrypted = 'decrypted text here'
|
||||
# self.api.get_raw.return_value = decrypted
|
||||
#
|
||||
# secret = self.manager.decrypt(secret_ref=self.entity_href,
|
||||
# content_type='application/octet-stream')
|
||||
# self.assertEqual(decrypted, secret)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.get_raw.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# # Verify that correct information was sent in the call.
|
||||
# headers = args[1]
|
||||
# self.assertEqual('application/octet-stream', headers['Accept'])
|
||||
#
|
||||
# def test_should_decrypt_without_content_type(self):
|
||||
# content_types_dict = {'default': 'application/octet-stream'}
|
||||
# self.api.get.return_value = self.secret.get_dict(self.entity_href,
|
||||
# content_types_dict)
|
||||
# decrypted = 'decrypted text here'
|
||||
# self.api.get_raw.return_value = decrypted
|
||||
#
|
||||
# secret = self.manager.decrypt(secret_ref=self.entity_href)
|
||||
# self.assertEqual(decrypted, secret)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.get.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.get_raw.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# # Verify that correct information was sent in the call.
|
||||
# headers = args[1]
|
||||
# self.assertEqual('application/octet-stream', headers['Accept'])
|
||||
#
|
||||
# def test_should_delete(self):
|
||||
# self.manager.delete(secret_ref=self.entity_href)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.delete.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# def test_should_fail_get_no_href(self):
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.get(None)
|
||||
#
|
||||
# def test_should_fail_decrypt_no_content_types(self):
|
||||
# self.api.get.return_value = self.secret.get_dict(self.entity_href)
|
||||
#
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.decrypt(secret_ref=self.entity_href)
|
||||
#
|
||||
# def test_should_fail_decrypt_no_default_content_type(self):
|
||||
# content_types_dict = {'no-default': 'application/octet-stream'}
|
||||
# self.api.get.return_value = self.secret.get_dict(self.entity_href,
|
||||
# content_types_dict)
|
||||
#
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.decrypt(secret_ref=self.entity_href)
|
||||
#
|
||||
# def test_should_fail_delete_no_href(self):
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.delete(None)
|
||||
|
||||
|
||||
# class WhenTestingVerificationsResourcePost(BaseEntityResource):
|
||||
#
|
||||
# def setUp(self):
|
||||
# self._setUp('verifications')
|
||||
#
|
||||
# self.resource_type = 'image'
|
||||
# self.resource_ref = 'https://localhost:9311/v1/images/1234567'
|
||||
# self.resource_action = 'vm_attach'
|
||||
# self.impersonation_allowed = True
|
||||
#
|
||||
# def test_should_create(self):
|
||||
# self.session.post.return_value = FakeResp(200, {'verification_ref':
|
||||
# self.entity_href})
|
||||
#
|
||||
# verif_href = self.client\
|
||||
# .verifications.create(resource_type=self.resource_type,
|
||||
# resource_ref=self.resource_ref,
|
||||
# resource_action=self.resource_action)
|
||||
#
|
||||
# self.assertEqual(self.entity_href, verif_href)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.session.post.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_base, url)
|
||||
#
|
||||
# # Verify that correct information was sent in the call.
|
||||
# data = jsonutils.loads(kwargs['data'])
|
||||
# self.assertEqual(self.resource_type, data['resource_type'])
|
||||
# self.assertEqual(self.resource_action, data['resource_action'])
|
||||
#
|
||||
#
|
||||
# class WhenTestingVerificationsResourceGet(BaseEntityResource):
|
||||
#
|
||||
# def setUp(self):
|
||||
# self._setUp('verifications')
|
||||
#
|
||||
# self.secret = SecretData()
|
||||
#
|
||||
# def test_should_get(self):
|
||||
# self.session.get.return_value = FakeResp(200,
|
||||
# self.secret.get_dict())
|
||||
#
|
||||
# secret = self.client.secrets.get(secret_ref=self.entity_href)
|
||||
# self.assertIsInstance(secret, secrets.Secret)
|
||||
# self.assertEqual(self.entity_href, secret.secret_ref)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.session.get.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# # Verify that correct information was sent in the call.
|
||||
# self.assertIsNone(kwargs['params'])
|
||||
self.api.base_url = base[:-1]
|
||||
|
||||
@@ -24,7 +24,9 @@ class OrderData(object):
|
||||
self.created = str(timeutils.utcnow())
|
||||
|
||||
self.secret = test_secrets.SecretData()
|
||||
self.status = 'ACTIVE'
|
||||
self.order_dict = {'created': self.created,
|
||||
'status': self.status,
|
||||
'secret': self.secret.get_dict()}
|
||||
|
||||
def get_dict(self, order_ref, secret_ref=None):
|
||||
@@ -35,7 +37,7 @@ class OrderData(object):
|
||||
return order
|
||||
|
||||
|
||||
class WhenTestingOrdersManager(test_client.BaseEntityResource):
|
||||
class WhenTestingOrders(test_client.BaseEntityResource):
|
||||
|
||||
def setUp(self):
|
||||
self._setUp('orders')
|
||||
@@ -44,6 +46,19 @@ class WhenTestingOrdersManager(test_client.BaseEntityResource):
|
||||
|
||||
self.manager = orders.OrderManager(self.api)
|
||||
|
||||
def test_should_entity_str(self):
|
||||
order_obj = orders.Order(self.order.get_dict(self.entity_href))
|
||||
order_obj.error_status_code = '500'
|
||||
order_obj.error_reason = 'Something is broken'
|
||||
self.assertIn('status: ' + self.order.status,
|
||||
str(order_obj))
|
||||
self.assertIn('error_status_code: 500', str(order_obj))
|
||||
|
||||
def test_should_entity_repr(self):
|
||||
order_obj = orders.Order(self.order.get_dict(self.entity_href))
|
||||
self.assertIn('order_ref=' + self.entity_href,
|
||||
repr(order_obj))
|
||||
|
||||
def test_should_create(self):
|
||||
self.api.post.return_value = {'order_ref': self.entity_href}
|
||||
|
||||
@@ -87,6 +102,26 @@ class WhenTestingOrdersManager(test_client.BaseEntityResource):
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_href, url)
|
||||
|
||||
def test_should_get_list(self):
|
||||
order_resp = self.order.get_dict(self.entity_href)
|
||||
self.api.get.return_value = {"orders":
|
||||
[order_resp for v in xrange(3)]}
|
||||
|
||||
orders_list = self.manager.list(limit=10, offset=5)
|
||||
self.assertTrue(len(orders_list) == 3)
|
||||
self.assertIsInstance(orders_list[0], orders.Order)
|
||||
self.assertEqual(self.entity_href, orders_list[0].order_ref)
|
||||
|
||||
# Verify the correct URL was used to make the call.
|
||||
args, kwargs = self.api.get.call_args
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_base[:-1], url)
|
||||
|
||||
# Verify that correct information was sent in the call.
|
||||
params = args[1]
|
||||
self.assertEqual(10, params['limit'])
|
||||
self.assertEqual(5, params['offset'])
|
||||
|
||||
def test_should_fail_get_no_href(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.manager.get(None)
|
||||
|
||||
@@ -41,7 +41,7 @@ class SecretData(object):
|
||||
return secret
|
||||
|
||||
|
||||
class WhenTestingSecretsManager(test_client.BaseEntityResource):
|
||||
class WhenTestingSecrets(test_client.BaseEntityResource):
|
||||
|
||||
def setUp(self):
|
||||
self._setUp('secrets')
|
||||
@@ -50,6 +50,15 @@ class WhenTestingSecretsManager(test_client.BaseEntityResource):
|
||||
|
||||
self.manager = secrets.SecretManager(self.api)
|
||||
|
||||
def test_should_entity_str(self):
|
||||
secret_obj = secrets.Secret(self.secret.get_dict(self.entity_href))
|
||||
self.assertIn('name: ' + self.secret.name,
|
||||
str(secret_obj))
|
||||
|
||||
def test_should_entity_repr(self):
|
||||
secret_obj = secrets.Secret(self.secret.get_dict(self.entity_href))
|
||||
self.assertIn('name="{0}"'.format(self.secret.name), repr(secret_obj))
|
||||
|
||||
def test_should_store(self):
|
||||
self.api.post.return_value = {'secret_ref': self.entity_href}
|
||||
|
||||
@@ -133,6 +142,26 @@ class WhenTestingSecretsManager(test_client.BaseEntityResource):
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_href, url)
|
||||
|
||||
def test_should_get_list(self):
|
||||
secret_resp = self.secret.get_dict(self.entity_href)
|
||||
self.api.get.return_value = {"secrets":
|
||||
[secret_resp for v in xrange(3)]}
|
||||
|
||||
secrets_list = self.manager.list(limit=10, offset=5)
|
||||
self.assertTrue(len(secrets_list) == 3)
|
||||
self.assertIsInstance(secrets_list[0], secrets.Secret)
|
||||
self.assertEqual(self.entity_href, secrets_list[0].secret_ref)
|
||||
|
||||
# Verify the correct URL was used to make the call.
|
||||
args, kwargs = self.api.get.call_args
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_base[:-1], url)
|
||||
|
||||
# Verify that correct information was sent in the call.
|
||||
params = args[1]
|
||||
self.assertEqual(10, params['limit'])
|
||||
self.assertEqual(5, params['offset'])
|
||||
|
||||
def test_should_fail_get_no_href(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.manager.get(None)
|
||||
|
||||
@@ -13,10 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from barbicanclient import verifications as verify
|
||||
from barbicanclient import verifications as vers
|
||||
from barbicanclient.openstack.common import timeutils
|
||||
from barbicanclient.test import test_client
|
||||
from barbicanclient.test import test_client_secrets as test_secrets
|
||||
|
||||
|
||||
class VerificationData(object):
|
||||
@@ -41,14 +40,27 @@ class VerificationData(object):
|
||||
return verify
|
||||
|
||||
|
||||
class WhenTestingVerificationsManager(test_client.BaseEntityResource):
|
||||
class WhenTestingVerifications(test_client.BaseEntityResource):
|
||||
|
||||
def setUp(self):
|
||||
self._setUp('verifications')
|
||||
|
||||
self.verify = VerificationData()
|
||||
|
||||
self.manager = verify.VerificationManager(self.api)
|
||||
self.manager = vers.VerificationManager(self.api)
|
||||
|
||||
def test_should_entity_str(self):
|
||||
verif_obj = vers.Verification(self.verify.get_dict(self.entity_href))
|
||||
verif_obj.error_status_code = '500'
|
||||
verif_obj.error_reason = 'Something is broken'
|
||||
self.assertIn('resource_type: ' + self.verify.resource_type,
|
||||
str(verif_obj))
|
||||
self.assertIn('error_status_code: 500', str(verif_obj))
|
||||
|
||||
def test_should_entity_repr(self):
|
||||
verif = vers.Verification(self.verify.get_dict(self.entity_href))
|
||||
self.assertIn('verification_ref=' + self.entity_href,
|
||||
repr(verif))
|
||||
|
||||
def test_should_create(self):
|
||||
self.api.post.return_value = {'verification_ref': self.entity_href}
|
||||
@@ -67,36 +79,57 @@ class WhenTestingVerificationsManager(test_client.BaseEntityResource):
|
||||
|
||||
# Verify that correct information was sent in the call.
|
||||
verify_req = args[1]
|
||||
self.assertEqual(self.verify.resource_type, verify_req['resource_type'])
|
||||
self.assertEqual(self.verify.resource_type,
|
||||
verify_req['resource_type'])
|
||||
self.assertEqual(self.verify.resource_action,
|
||||
verify_req['resource_action'])
|
||||
self.assertEqual(self.verify.resource_ref,
|
||||
verify_req['resource_ref'])
|
||||
|
||||
# def test_should_get(self):
|
||||
# self.api.get.return_value = self.order.get_dict(self.entity_href)
|
||||
#
|
||||
# order = self.manager.get(order_ref=self.entity_href)
|
||||
# self.assertIsInstance(order, orders.Order)
|
||||
# self.assertEqual(self.entity_href, order.order_ref)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.get.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# def test_should_delete(self):
|
||||
# self.manager.delete(order_ref=self.entity_href)
|
||||
#
|
||||
# # Verify the correct URL was used to make the call.
|
||||
# args, kwargs = self.api.delete.call_args
|
||||
# url = args[0]
|
||||
# self.assertEqual(self.entity_href, url)
|
||||
#
|
||||
# def test_should_fail_get_no_href(self):
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.get(None)
|
||||
#
|
||||
# def test_should_fail_delete_no_href(self):
|
||||
# with self.assertRaises(ValueError):
|
||||
# self.manager.delete(None)
|
||||
def test_should_get(self):
|
||||
self.api.get.return_value = self.verify.get_dict(self.entity_href)
|
||||
|
||||
verify = self.manager.get(verification_ref=self.entity_href)
|
||||
self.assertIsInstance(verify, vers.Verification)
|
||||
self.assertEqual(self.entity_href, verify.verif_ref)
|
||||
|
||||
# Verify the correct URL was used to make the call.
|
||||
args, kwargs = self.api.get.call_args
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_href, url)
|
||||
|
||||
def test_should_delete(self):
|
||||
self.manager.delete(verification_ref=self.entity_href)
|
||||
|
||||
# Verify the correct URL was used to make the call.
|
||||
args, kwargs = self.api.delete.call_args
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_href, url)
|
||||
|
||||
def test_should_get_list(self):
|
||||
verify_resp = self.verify.get_dict(self.entity_href)
|
||||
self.api.get.return_value = {"verifications":
|
||||
[verify_resp for v in xrange(3)]}
|
||||
|
||||
verifies = self.manager.list(limit=10, offset=5)
|
||||
self.assertTrue(len(verifies) == 3)
|
||||
self.assertIsInstance(verifies[0], vers.Verification)
|
||||
self.assertEqual(self.entity_href, verifies[0].verif_ref)
|
||||
|
||||
# Verify the correct URL was used to make the call.
|
||||
args, kwargs = self.api.get.call_args
|
||||
url = args[0]
|
||||
self.assertEqual(self.entity_base[:-1], url)
|
||||
|
||||
# Verify that correct information was sent in the call.
|
||||
params = args[1]
|
||||
self.assertEqual(10, params['limit'])
|
||||
self.assertEqual(5, params['offset'])
|
||||
|
||||
def test_should_fail_get_no_href(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.manager.get(None)
|
||||
|
||||
def test_should_fail_delete_no_href(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.manager.delete(None)
|
||||
|
||||
@@ -105,28 +105,28 @@ class VerificationManager(base.BaseEntityManager):
|
||||
resp = self.api.post(self.entity, verif_dict)
|
||||
return resp['verification_ref']
|
||||
|
||||
def get(self, verif_ref):
|
||||
def get(self, verification_ref):
|
||||
"""
|
||||
Returns a verification object
|
||||
|
||||
:param verif_ref: The href for the verification instance
|
||||
:param verification_ref: The href for the verification instance
|
||||
"""
|
||||
LOG.debug(_("Getting verification - "
|
||||
"Verification href: {0}").format(verif_ref))
|
||||
if not verif_ref:
|
||||
"Verification href: {0}").format(verification_ref))
|
||||
if not verification_ref:
|
||||
raise ValueError('verif_ref is required.')
|
||||
resp = self.api.get(verif_ref)
|
||||
resp = self.api.get(verification_ref)
|
||||
return Verification(resp)
|
||||
|
||||
def delete(self, verif_ref):
|
||||
def delete(self, verification_ref):
|
||||
"""
|
||||
Deletes a verification
|
||||
|
||||
:param verif_ref: The href for the verification instance
|
||||
:param verification_ref: The href for the verification instance
|
||||
"""
|
||||
if not verif_ref:
|
||||
if not verification_ref:
|
||||
raise ValueError('verif_ref is required.')
|
||||
self.api.delete(verif_ref)
|
||||
self.api.delete(verification_ref)
|
||||
|
||||
def list(self, limit=10, offset=0):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user