charm-barbican-softhsm/src/tests/basic_deployment.py

355 lines
15 KiB
Python

import amulet
import json
import subprocess
import time
import barbicanclient.client as barbican_client
from keystoneclient import session as keystone_session
from keystoneclient.auth import identity as keystone_identity
import keystoneclient.exceptions
from keystoneclient.v2_0 import client as keystone_v2_0_client
from keystoneclient.v3 import client as keystone_v3_client
from charmhelpers.contrib.openstack.amulet.deployment import (
OpenStackAmuletDeployment
)
from charmhelpers.contrib.openstack.amulet.utils import (
OpenStackAmuletUtils,
DEBUG,
)
# Use DEBUG to turn on debug logging
u = OpenStackAmuletUtils(DEBUG)
class SoftHSMBasicDeployment(OpenStackAmuletDeployment):
"""Amulet tests on a basic SoftHSM deployment."""
def __init__(self, series, openstack=None, source=None, stable=False,
keystone_version='2'):
"""Deploy the entire test environment.
The keystone_version controls whether keystone (and barbican) are set
up to use keystone v2.0 or v3. The options are <string> 2 or 3.
"""
super(SoftHSMBasicDeployment, self).__init__(
series, openstack, source, stable)
self._keystone_version = str(keystone_version)
self._add_services()
self._add_relations()
self._configure_services(keystone_version)
self._deploy()
u.log.info('Waiting on extended status checks...')
exclude_services = ['mysql', ]
self._auto_wait_for_status(exclude_services=exclude_services)
self._initialize_tests()
def _add_services(self):
"""Add services
Add the services that we're testing, where softhsm is local,
and the rest of the service are from lp branches that are
compatible with the local charm (e.g. stable or next).
"""
this_service = {'name': 'barbican-softhsm'}
other_services = [{'name': 'barbican'},
{'name': 'mysql'},
{'name': 'rabbitmq-server'},
{'name': 'keystone'}]
super(SoftHSMBasicDeployment, self)._add_services(
this_service, other_services)
def _add_relations(self):
"""Add all of the relations for the services."""
relations = {
'barbican:hsm': 'barbican-softhsm:hsm',
'barbican:shared-db': 'mysql:shared-db',
'barbican:amqp': 'rabbitmq-server:amqp',
'barbican:identity-service': 'keystone:identity-service',
'keystone:shared-db': 'mysql:shared-db',
}
super(SoftHSMBasicDeployment, self)._add_relations(relations)
def _configure_services(self, keystone_version='2'):
"""Configure all of the services."""
keystone_config = {
'admin-password': 'openstack',
'admin-token': 'ubuntutesting',
'preferred-api-version': str(keystone_version),
}
# say we don't need an HSM for these tests
barbican_config = {
'require-hsm-plugin': True,
'verbose': True,
'keystone-api-version': str(keystone_version),
}
configs = {
'keystone': keystone_config,
'barbican': barbican_config,
}
super(SoftHSMBasicDeployment, self)._configure_services(configs)
def _initialize_tests(self):
"""Perform final initialization before tests get run."""
# Access the sentries for inspecting service units
self.softhsm_sentry = self.d.sentry['barbican-softhsm'][0]
self.barbican_sentry = self.d.sentry['barbican'][0]
self.mysql_sentry = self.d.sentry['mysql'][0]
self.keystone_sentry = self.d.sentry['keystone'][0]
self.rabbitmq_sentry = self.d.sentry['rabbitmq-server'][0]
u.log.debug('openstack release val: {}'.format(
self._get_openstack_release()))
u.log.debug('openstack release str: {}'.format(
self._get_openstack_release_string()))
keystone_ip = self.keystone_sentry.relation(
'shared-db', 'mysql:shared-db')['private-address']
# We need to auth either to v2.0 or v3 keystone
if self._keystone_version == '2':
ep = ("http://{}:35357/v2.0"
.format(keystone_ip.strip().decode('utf-8')))
auth = keystone_identity.v2.Password(
username='admin',
password='openstack',
tenant_name='admin',
auth_url=ep)
keystone_client_lib = keystone_v2_0_client
elif self._keystone_version == '3':
ep = ("http://{}:35357/v3"
.format(keystone_ip.strip().decode('utf-8')))
auth = keystone_identity.v3.Password(
user_domain_name='admin_domain',
username='admin',
password='openstack',
domain_name='admin_domain',
auth_url=ep)
keystone_client_lib = keystone_v3_client
else:
raise RuntimeError("keystone version must be '2' or '3'")
sess = keystone_session.Session(auth=auth)
self.keystone = keystone_client_lib.Client(session=sess)
# The service_catalog is missing from V3 keystone client when auth is
# done with session (via authenticate_keystone_admin()
# See https://bugs.launchpad.net/python-keystoneclient/+bug/1508374
# using session construct client will miss service_catalog property
# workaround bug # 1508374 by forcing a pre-auth and therefore, getting
# the service-catalog --
# see https://bugs.launchpad.net/python-keystoneclient/+bug/1547331
self.keystone.auth_ref = auth.get_access(sess)
def _run_action(self, unit_id, action, *args):
command = ["juju", "action", "do", "--format=json", unit_id, action]
command.extend(args)
print("Running command: %s\n" % " ".join(command))
output = subprocess.check_output(command)
output_json = output.decode(encoding="UTF-8")
data = json.loads(output_json)
action_id = data[u'Action queued with id']
return action_id
def _wait_on_action(self, action_id):
command = ["juju", "action", "fetch", "--format=json", action_id]
while True:
try:
output = subprocess.check_output(command)
except Exception as e:
print(e)
return False
output_json = output.decode(encoding="UTF-8")
data = json.loads(output_json)
if data[u"status"] == "completed":
return True
elif data[u"status"] == "failed":
return False
time.sleep(2)
def test_200_barbican_softhsm_relation(self):
"""Verify the barbican to softhsm plugin relation data"""
u.log.debug('Checking barbican to barbican-softhsm '
'relation data...')
unit = self.softhsm_sentry
relation = ['hsm', 'barbican:barbican-hsm']
expected = {
'_name': 'softhsm2',
'_plugin_data': u.not_null,
'private-address': u.not_null,
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('barbican barbican-softhsm', ret)
amulet.raise_status(amulet.FAIL, msg=message)
u.log.debug('OK')
@staticmethod
def _find_or_create(items, key, create):
"""Find or create the thing in the items
:param items: the items to search using the key
:param key: a function that key(item) -> boolean if found.
:param create: a function to call if the key() never was true.
:returns: the item that was either found or created.
"""
for i in items:
if key(i):
return i
return create()
def test_400_api_connection(self):
"""Simple api calls to check service is up and responding"""
u.log.debug("Not running test_400_api_connection() as bug#1611393 "
"prevents the test from passing. Waiting until the "
"OpenSSL library has the EVP_aes_128_wrap_pad() function "
"that barbican needs to wrap a keys for secret.store() to "
"function.")
return
u.log.debug('Checking api functionality...')
# This handles both keystone v2 and v3.
# For keystone v2 we need a user:
# - 'demo' user
# - has a project 'demo'
# - in the 'demo' project
# - with an 'admin' role
# For keystone v3 we need a user:
# - 'default' domain
# - 'demo' user
# - 'demo' project
# - 'admin' role -- to be able to delete.
# barbican requires a user with creator or admin role on the project
# when creating a secret (which this test does). Therefore, we create
# a demo user, demo project, and then get a demo barbican client and do
# the secret. ensure that the default domain is created.
if self._keystone_version == '2':
# find or create the 'demo' tenant (project)
tenant = self._find_or_create(
items=self.keystone.tenants.list(),
key=lambda t: t.name == 'demo',
create=lambda: self.keystone.tenants.create(
tenant_name="demo",
description="Demo for testing barbican",
enabled=True))
# find or create the demo user
demo_user = self._find_or_create(
items=self.keystone.users.list(),
key=lambda u: u.name == 'demo',
create=lambda: self.keystone.users.create(
name='demo',
password='pass',
tenant_id=tenant.id))
# find the admin role
# already be created - if not, then this will fail later.
admin_role = self._find_or_create(
items=self.keystone.roles.list(),
key=lambda r: r.name.lower() == 'admin',
create=lambda: None)
# grant the role if it isn't already created.
# now grant the creator role to the demo user.
self._find_or_create(
items=self.keystone.roles.roles_for_user(
demo_user, tenant=tenant),
key=lambda r: r.name.lower() == admin_role.name.lower(),
create=lambda: self.keystone.roles.add_user_role(
demo_user, admin_role, tenant=tenant))
# now we can finally get the barbican client and create the secret
keystone_ep = self.keystone.service_catalog.url_for(
service_type='identity', endpoint_type='publicURL')
auth = keystone_identity.v2.Password(
username=demo_user.name,
password='pass',
tenant_name=tenant.name,
auth_url=keystone_ep)
else:
# find or create the 'default' domain
domain = self._find_or_create(
items=self.keystone.domains.list(),
key=lambda u: u.name == 'default',
create=lambda: self.keystone.domains.create(
"default",
description="domain for barbican testing",
enabled=True))
# find or create the 'demo' user
demo_user = self._find_or_create(
items=self.keystone.users.list(domain=domain.id),
key=lambda u: u.name == 'demo',
create=lambda: self.keystone.users.create(
'demo',
domain=domain.id,
description="Demo user for barbican tests",
enabled=True,
email="demo@example.com",
password="pass"))
# find or create the 'demo' project
demo_project = self._find_or_create(
items=self.keystone.projects.list(domain=domain.id),
key=lambda x: x.name == 'demo',
create=lambda: self.keystone.projects.create(
'demo',
domain=domain.id,
description='barbican testing project',
enabled=True))
# create the role for the user - needs to be admin so that the
# secret can be deleted - note there is only one admin role, and it
# should already be created - if not, then this will fail later.
admin_role = self._find_or_create(
items=self.keystone.roles.list(),
key=lambda r: r.name.lower() == 'admin',
create=lambda: None)
# now grant the creator role to the demo user.
try:
self.keystone.roles.check(
role=admin_role,
user=demo_user,
project=demo_project)
except keystoneclient.exceptions.NotFound:
# create it if it isn't found
self.keystone.roles.grant(
role=admin_role,
user=demo_user,
project=demo_project)
# now we can finally get the barbican client and create the secret
keystone_ep = self.keystone.service_catalog.url_for(
service_type='identity', endpoint_type='publicURL')
auth = keystone_identity.v3.Password(
user_domain_name=domain.name,
username=demo_user.name,
password='pass',
project_domain_name=domain.name,
project_name=demo_project.name,
auth_url=keystone_ep)
# Now we carry on with common v2 and v3 code
sess = keystone_session.Session(auth=auth)
# Authenticate admin with barbican endpoint
barbican_ep = self.keystone.service_catalog.url_for(
service_type='key-manager', endpoint_type='publicURL')
barbican = barbican_client.Client(session=sess,
endpoint=barbican_ep)
# before creating a secret with the HSM we have to ensure that the mkek
# is generated. This is done with an action WHICH will fail if the
# mkek has already been generated. We look for the duplicate message
# and if it exists then we assume that it is okay.
action_id = u.run_action(self.d.sentry['barbican'][0], 'generate-mkek')
assert u.wait_on_action(action_id), "generate-mkek action failed."
# now create the secret.
my_secret = barbican.secrets.create()
my_secret.name = u'Random plain text password'
my_secret.payload = u'password'
my_secret_ref = my_secret.store()
assert(my_secret_ref is not None)
# and now delete the secret
my_secret.delete()
u.log.debug('OK')