charm-keystone/unit_tests/test_keystone_utils.py
Frode Nordahl 5de1770931 Create service credentials in SERVICE_DOMAIN
Cleanup code that references users, projects or domains without
necessary scoping or filtering throughout the charm.  Add logging
of domain name in contexts where this is relevant.

Tighten rule:service_role to require role:service and token scoped
to project config('service-tenant') created in SERVICE_DOMAIN. This
ensures that if you have a deployment with end-user access to assign
roles within their own domains they will not gain privileged access
simply by assigning the service role to one of their own users.

Allow users authorized by rule:service_role to perform
identity:list_projects. This is required to allow Ceilometer
to operate without Admin privileges.

Services are given a user in project config('service-tenant') in
SERVICE_DOMAIN for v3 authentication / authorization.  As of Mitaka
Keystone v3 policy the 'service' role is sufficient for services to
validate tokens.

Services are also given a user in project config('service-tenant') in
DEFAULT_DOMAIN to support services still configured with v2.0
authentication / authorization.

This will allow us to transition from v2.0 based authentication /
authorization and existing services and charms will continue to
operate as before.  This will also allow the end-user to roll their
deployment up to api_version 3 and back to api_version 2 as needed.

Services and charms that has made the transition to fully use the
v3 API for authentication and authorization will gain full access to
domains and projects across the deployment.  The first charm to make
use of this is charm-ceilometer.

Closes-Bug: 1636098
Change-Id: If1518029c43476a5e14bf94596197eabe663499c
2016-11-24 13:03:12 +00:00

1225 lines
54 KiB
Python

# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import patch, call, MagicMock, Mock
from test_utils import CharmTestCase
import os
from base64 import b64encode
os.environ['JUJU_UNIT_NAME'] = 'keystone'
with patch('charmhelpers.core.hookenv.config') as config:
import keystone_utils as utils
TO_PATCH = [
'api_port',
'config',
'os_release',
'log',
'get_ca',
'create_role',
'create_service_entry',
'create_endpoint_template',
'get_admin_token',
'get_local_endpoint',
'get_requested_roles',
'get_service_password',
'get_os_codename_install_source',
'git_clone_and_install',
'git_pip_venv_dir',
'git_src_dir',
'grant_role',
'configure_installation_source',
'is_elected_leader',
'is_ssl_cert_master',
'https',
'lsb_release',
'peer_store_and_set',
'service_restart',
'service_stop',
'service_start',
'relation_get',
'relation_set',
'relation_ids',
'relation_id',
'render',
'local_unit',
'related_units',
'https',
'is_relation_made',
'peer_store',
'pip_install',
# generic
'apt_update',
'apt_upgrade',
'apt_install',
'subprocess',
'time',
'pwgen',
'os_application_version_set',
]
openstack_origin_git = \
"""repositories:
- {name: requirements,
repository: 'git://git.openstack.org/openstack/requirements',
branch: stable/juno}
- {name: keystone,
repository: 'git://git.openstack.org/openstack/keystone',
branch: stable/juno}"""
class TestKeystoneUtils(CharmTestCase):
def setUp(self):
super(TestKeystoneUtils, self).setUp(utils, TO_PATCH)
self.config.side_effect = self.test_config.get
self.ctxt = MagicMock()
self.rsc_map = {
'/etc/keystone/keystone.conf': {
'services': ['keystone'],
'contexts': [self.ctxt],
},
'/etc/apache2/sites-available/openstack_https_frontend': {
'services': ['apache2'],
'contexts': [self.ctxt],
},
'/etc/apache2/sites-available/openstack_https_frontend.conf': {
'services': ['apache2'],
'contexts': [self.ctxt],
}
}
@patch('charmhelpers.contrib.openstack.templating.OSConfigRenderer')
@patch('os.path.exists')
@patch.object(utils, 'resource_map')
def test_register_configs_apache(self, resource_map, exists, renderer):
exists.return_value = False
self.os_release.return_value = 'havana'
fake_renderer = MagicMock()
fake_renderer.register = MagicMock()
renderer.return_value = fake_renderer
resource_map.return_value = self.rsc_map
utils.register_configs()
renderer.assert_called_with(
openstack_release='havana', templates_dir='templates/')
ex_reg = [
call('/etc/keystone/keystone.conf', [self.ctxt]),
call(
'/etc/apache2/sites-available/openstack_https_frontend',
[self.ctxt]),
call(
'/etc/apache2/sites-available/openstack_https_frontend.conf',
[self.ctxt]),
]
self.assertEquals(fake_renderer.register.call_args_list, ex_reg)
def test_determine_ports(self):
self.test_config.set('admin-port', '80')
self.test_config.set('service-port', '81')
result = utils.determine_ports()
self.assertEquals(result, ['80', '81'])
@patch('charmhelpers.contrib.openstack.utils.config')
def test_determine_packages(self, _config):
_config.return_value = None
result = utils.determine_packages()
ex = utils.BASE_PACKAGES + ['keystone', 'python-keystoneclient']
self.assertEquals(set(ex), set(result))
@patch('charmhelpers.contrib.openstack.utils.config')
def test_determine_packages_git(self, _config):
_config.return_value = openstack_origin_git
result = utils.determine_packages()
ex = utils.BASE_PACKAGES + utils.BASE_GIT_PACKAGES
for p in utils.GIT_PACKAGE_BLACKLIST:
ex.remove(p)
self.assertEquals(set(ex), set(result))
@patch('os.path.exists')
@patch.object(utils, 'run_in_apache')
@patch.object(utils, 'determine_packages')
@patch.object(utils, 'migrate_database')
def test_openstack_upgrade_leader(
self, migrate_database, determine_packages,
run_in_apache, os_path_exists):
configs = MagicMock()
self.test_config.set('openstack-origin', 'cloud:xenial-newton')
determine_packages.return_value = []
self.is_elected_leader.return_value = True
os_path_exists.return_value = True
run_in_apache.return_value = True
utils.do_openstack_upgrade(configs)
self.get_os_codename_install_source.assert_called_with(
'cloud:xenial-newton'
)
self.configure_installation_source.assert_called_with(
'cloud:xenial-newton'
)
self.assertTrue(self.apt_update.called)
dpkg_opts = [
'--option', 'Dpkg::Options::=--force-confnew',
'--option', 'Dpkg::Options::=--force-confdef',
]
self.apt_upgrade.assert_called_with(
options=dpkg_opts,
fatal=True,
dist=True)
self.apt_install.assert_called_with(
packages=[],
options=dpkg_opts,
fatal=True)
self.assertTrue(configs.set_release.called)
self.assertTrue(configs.write_all.called)
self.assertTrue(migrate_database.called)
os_path_exists.assert_called_with(
utils.PACKAGE_KEYSTONE_CONF
)
self.subprocess.check_call.assert_called_with(
['a2dissite', 'keystone']
)
def test_migrate_database(self):
utils.migrate_database()
self.service_stop.assert_called_with('keystone')
cmd = ['sudo', '-u', 'keystone', 'keystone-manage', 'db_sync']
self.subprocess.check_output.assert_called_with(cmd)
self.service_start.assert_called_with('keystone')
@patch.object(utils, 'get_admin_domain_id')
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_manager')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'b64encode')
def test_add_service_to_keystone_clustered_https_none_values(
self, b64encode, _resolve_address, _get_manager,
_get_api_version, _get_admin_domain_id):
_get_api_version.return_value = 2
_get_admin_domain_id.return_value = None
relation_id = 'identity-service:0'
remote_unit = 'unit/0'
_resolve_address.return_value = '10.10.10.10'
self.https.return_value = True
self.test_config.set('https-service-endpoints', 'True')
self.test_config.set('vip', '10.10.10.10')
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
b64encode.return_value = 'certificate'
self.get_requested_roles.return_value = ['role1', ]
self.relation_get.return_value = {'service': 'keystone',
'region': 'RegionOne',
'public_url': 'None',
'admin_url': '10.0.0.2',
'internal_url': '192.168.1.2'}
utils.add_service_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
self.assertTrue(self.https.called)
self.assertTrue(self.create_role.called)
relation_data = {'auth_host': '10.10.10.10',
'service_host': '10.10.10.10',
'auth_protocol': 'https',
'service_protocol': 'https',
'auth_port': 80,
'service_port': 81,
'https_keystone': 'True',
'ca_cert': 'certificate',
'region': 'RegionOne',
'api_version': 2,
'admin_domain_id': None}
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_admin_domain_id')
@patch.object(utils, 'create_user')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'ensure_valid_service')
@patch.object(utils, 'add_endpoint')
@patch.object(utils, 'get_manager')
def test_add_service_to_keystone_no_clustered_no_https_complete_values(
self, KeystoneManager, add_endpoint, ensure_valid_service,
_resolve_address, create_user, get_admin_domain_id,
get_api_version, test_api_version=2):
get_admin_domain_id.return_value = None
get_api_version.return_value = test_api_version
relation_id = 'identity-service:0'
remote_unit = 'unit/0'
self.get_admin_token.return_value = 'token'
self.get_service_password.return_value = 'password'
self.test_config.set('service-tenant', 'tenant')
self.test_config.set('admin-role', 'Admin')
self.get_requested_roles.return_value = ['role1', ]
_resolve_address.return_value = '10.0.0.3'
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
self.https.return_value = False
self.test_config.set('https-service-endpoints', 'False')
self.get_local_endpoint.return_value = 'http://localhost:80/v2.0/'
self.relation_ids.return_value = ['cluster/0']
service_domain = None
service_role = 'Admin'
if test_api_version > 2:
service_domain = 'service_domain'
service_role = 'service'
mock_keystone = MagicMock()
mock_keystone.resolve_tenant_id.return_value = 'tenant_id'
KeystoneManager.return_value = mock_keystone
self.relation_get.return_value = {'service': 'keystone',
'region': 'RegionOne',
'public_url': '10.0.0.1',
'admin_url': '10.0.0.2',
'internal_url': '192.168.1.2'}
utils.add_service_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
ensure_valid_service.assert_called_with('keystone')
add_endpoint.assert_called_with(region='RegionOne', service='keystone',
publicurl='10.0.0.1',
adminurl='10.0.0.2',
internalurl='192.168.1.2')
self.assertTrue(self.get_admin_token.called)
self.get_service_password.assert_called_with('keystone')
create_user.assert_called_with('keystone', 'password',
domain=service_domain,
tenant='tenant')
self.grant_role.assert_called_with('keystone', service_role,
project_domain=service_domain,
tenant='tenant',
user_domain=service_domain)
self.create_role.assert_called_with('role1', user='keystone',
tenant='tenant',
domain=service_domain)
relation_data = {'admin_domain_id': None,
'auth_host': '10.0.0.3',
'service_host': '10.0.0.3', 'admin_token': 'token',
'service_port': 81, 'auth_port': 80,
'service_username': 'keystone',
'service_password': 'password',
'service_domain': service_domain,
'service_tenant': 'tenant',
'https_keystone': '__null__',
'ssl_cert': '__null__', 'ssl_key': '__null__',
'ca_cert': '__null__',
'auth_protocol': 'http', 'service_protocol': 'http',
'service_tenant_id': 'tenant_id',
'api_version': test_api_version}
filtered = {}
for k, v in relation_data.iteritems():
if v == '__null__':
filtered[k] = None
else:
filtered[k] = v
self.assertTrue(self.relation_set.called)
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)
self.relation_set.assert_called_with(relation_id=relation_id,
**filtered)
def test_add_service_to_keystone_no_clustered_no_https_complete_values_v3(
self):
return self.\
test_add_service_to_keystone_no_clustered_no_https_complete_values(
test_api_version=3)
@patch('charmhelpers.contrib.openstack.ip.config')
@patch.object(utils, 'ensure_valid_service')
@patch.object(utils, 'add_endpoint')
@patch.object(utils, 'get_manager')
def test_add_service_to_keystone_nosubset(
self, KeystoneManager, add_endpoint, ensure_valid_service,
ip_config):
relation_id = 'identity-service:0'
remote_unit = 'unit/0'
self.relation_get.return_value = {'ec2_service': 'nova',
'ec2_region': 'RegionOne',
'ec2_public_url': '10.0.0.1',
'ec2_admin_url': '10.0.0.2',
'ec2_internal_url': '192.168.1.2'}
self.get_local_endpoint.return_value = 'http://localhost:80/v2.0/'
KeystoneManager.resolve_tenant_id.return_value = 'tenant_id'
utils.add_service_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
ensure_valid_service.assert_called_with('nova')
add_endpoint.assert_called_with(region='RegionOne', service='nova',
publicurl='10.0.0.1',
adminurl='10.0.0.2',
internalurl='192.168.1.2')
@patch.object(utils, 'user_exists')
@patch.object(utils, 'grant_role')
@patch.object(utils, 'create_role')
@patch.object(utils, 'create_user')
def test_create_user_credentials_no_roles(self, mock_create_user,
mock_create_role,
mock_grant_role,
mock_user_exists):
mock_user_exists.return_value = False
utils.create_user_credentials('userA', 'passA', tenant='tenantA')
mock_create_user.assert_has_calls([call('userA', 'passA',
domain=None,
tenant='tenantA')])
mock_create_role.assert_has_calls([])
mock_grant_role.assert_has_calls([])
@patch.object(utils, 'user_exists')
@patch.object(utils, 'grant_role')
@patch.object(utils, 'create_role')
@patch.object(utils, 'create_user')
def test_create_user_credentials(self, mock_create_user, mock_create_role,
mock_grant_role, mock_user_exists):
mock_user_exists.return_value = False
utils.create_user_credentials('userA', 'passA', tenant='tenantA',
grants=['roleA'], new_roles=['roleB'])
mock_create_user.assert_has_calls([call('userA', 'passA',
tenant='tenantA',
domain=None)])
mock_create_role.assert_has_calls([call('roleB', user='userA',
tenant='tenantA',
domain=None)])
mock_grant_role.assert_has_calls([call('userA', 'roleA',
tenant='tenantA',
user_domain=None,
project_domain=None)])
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'user_exists')
@patch.object(utils, 'grant_role')
@patch.object(utils, 'create_role')
@patch.object(utils, 'create_user')
def test_create_user_credentials_user_exists(self, mock_create_user,
mock_create_role,
mock_grant_role,
mock_user_exists,
mock_update_user_password):
mock_user_exists.return_value = True
utils.create_user_credentials('userA', 'passA', tenant='tenantA',
grants=['roleA'], new_roles=['roleB'])
mock_create_user.assert_has_calls([])
mock_create_role.assert_has_calls([call('roleB', user='userA',
tenant='tenantA',
domain=None)])
mock_grant_role.assert_has_calls([call('userA', 'roleA',
tenant='tenantA',
user_domain=None,
project_domain=None)])
mock_update_user_password.assert_has_calls([call('userA', 'passA')])
@patch.object(utils, 'get_manager')
def test_create_user_case_sensitivity(self, KeystoneManager):
""" Test case sensitivity of check for existence in
the user creation process """
mock_keystone = MagicMock()
KeystoneManager.return_value = mock_keystone
mock_user = MagicMock()
mock_keystone.resolve_user_id.return_value = mock_user
mock_keystone.api.users.list.return_value = [mock_user]
# User found is the same i.e. userA == userA
mock_user.name = 'userA'
utils.create_user('userA', 'passA')
mock_keystone.resolve_user_id.assert_called_with('userA',
user_domain=None)
mock_keystone.create_user.assert_not_called()
# User found has different case but is the same
# i.e. Usera != userA
mock_user.name = 'Usera'
utils.create_user('userA', 'passA')
mock_keystone.resolve_user_id.assert_called_with('userA',
user_domain=None)
mock_keystone.create_user.assert_not_called()
# User is different i.e. UserB != userA
mock_user.name = 'UserB'
utils.create_user('userA', 'passA')
mock_keystone.resolve_user_id.assert_called_with('userA',
user_domain=None)
mock_keystone.create_user.assert_called_with(name='userA',
password='passA',
tenant_id=None,
domain_id=None,
email='juju@localhost')
@patch.object(utils, 'get_service_password')
@patch.object(utils, 'create_user_credentials')
def test_create_service_credentials(self, mock_create_user_credentials,
mock_get_service_password):
mock_get_service_password.return_value = 'passA'
cfg = {'service-tenant': 'tenantA', 'admin-role': 'Admin',
'preferred-api-version': 2}
self.config.side_effect = lambda key: cfg.get(key, None)
calls = [call('serviceA', 'passA', domain=None, grants=['Admin'],
new_roles=None, tenant='tenantA')]
utils.create_service_credentials('serviceA')
mock_create_user_credentials.assert_has_calls(calls)
def test_ensure_valid_service_incorrect(self):
utils.ensure_valid_service('fakeservice')
self.log.assert_called_with("Invalid service requested: 'fakeservice'")
self.relation_set.assert_called_with(admin_token=-1)
def test_add_endpoint(self):
publicurl = '10.0.0.1'
adminurl = '10.0.0.2'
internalurl = '10.0.0.3'
utils.add_endpoint(
'RegionOne',
'nova',
publicurl,
adminurl,
internalurl)
self.create_service_entry.assert_called_with(
'nova',
'compute',
'Nova Compute Service')
self.create_endpoint_template.asssert_called_with(
region='RegionOne', service='nova',
publicurl=publicurl, adminurl=adminurl,
internalurl=internalurl)
@patch.object(utils, 'uuid')
@patch.object(utils, 'relation_set')
@patch.object(utils, 'relation_get')
@patch.object(utils, 'relation_ids')
@patch.object(utils, 'is_elected_leader')
def test_send_notifications(self, mock_is_elected_leader,
mock_relation_ids, mock_relation_get,
mock_relation_set, mock_uuid):
relation_id = 'testrel:0'
mock_uuid.uuid4.return_value = '1234'
mock_relation_ids.return_value = [relation_id]
mock_is_elected_leader.return_value = False
utils.send_notifications({'foo-endpoint-changed': 1})
self.assertFalse(mock_relation_set.called)
mock_is_elected_leader.return_value = True
utils.send_notifications({})
self.assertFalse(mock_relation_set.called)
settings = {'foo-endpoint-changed': 1}
utils.send_notifications(settings)
self.assertTrue(mock_relation_set.called)
mock_relation_set.assert_called_once_with(relation_id=relation_id,
relation_settings=settings)
mock_relation_set.reset_mock()
settings = {'foo-endpoint-changed': 1}
utils.send_notifications(settings, force=True)
self.assertTrue(mock_relation_set.called)
settings['trigger'] = '1234'
mock_relation_set.assert_called_once_with(relation_id=relation_id,
relation_settings=settings)
def test_get_admin_passwd_pwd_set(self):
self.test_config.set('admin-password', 'supersecret')
self.assertEqual(utils.get_admin_passwd(), 'supersecret')
@patch('os.path.isfile')
def test_get_admin_passwd_pwd_file_load(self, isfile):
self.test_config.set('admin-password', '')
isfile.return_value = True
with patch('__builtin__.open') as mock_open:
mock_open.return_value.__enter__ = lambda s: s
mock_open.return_value.__exit__ = Mock()
mock_open.return_value.readline.return_value = 'supersecretfilepwd'
self.assertEqual(utils.get_admin_passwd(), 'supersecretfilepwd')
@patch.object(utils, 'store_admin_passwd')
@patch('os.path.isfile')
def test_get_admin_passwd_genpass(self, isfile, store_admin_passwd):
self.test_config.set('admin-password', '')
isfile.return_value = False
self.subprocess.check_output.return_value = 'supersecretgen'
self.assertEqual(utils.get_admin_passwd(), 'supersecretgen')
def test_is_db_ready(self):
allowed_units = None
def fake_rel_get(attribute=None, *args, **kwargs):
if attribute == 'allowed_units':
return allowed_units
self.relation_get.side_effect = fake_rel_get
self.relation_id.return_value = 'shared-db:0'
self.relation_ids.return_value = ['shared-db:0']
self.local_unit.return_value = 'unit/0'
allowed_units = 'unit/0'
self.assertTrue(utils.is_db_ready(use_current_context=True))
self.relation_id.return_value = 'shared-db:0'
self.relation_ids.return_value = ['shared-db:0']
self.local_unit.return_value = 'unit/0'
allowed_units = 'unit/1'
self.assertFalse(utils.is_db_ready(use_current_context=True))
self.relation_ids.return_value = ['acme:0']
self.assertRaises(utils.is_db_ready, use_current_context=True)
allowed_units = 'unit/0'
self.related_units.return_value = ['unit/0']
self.relation_ids.return_value = ['shared-db:0', 'shared-db:1']
self.assertTrue(utils.is_db_ready())
allowed_units = 'unit/1'
self.assertFalse(utils.is_db_ready())
self.related_units.return_value = []
self.assertTrue(utils.is_db_ready())
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_ssl_no_peers(self, mock_peer_units):
def mock_rel_get(unit=None, **kwargs):
return None
self.relation_get.side_effect = mock_rel_get
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
self.related_units.return_value = []
mock_peer_units.return_value = []
# This should get ignored since we are overriding
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = False
self.assertTrue(utils.ensure_ssl_cert_master())
settings = {'ssl-cert-master': 'unit/0'}
self.relation_set.assert_called_with(relation_id='cluster:0',
relation_settings=settings)
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_ssl_master_no_peers(self,
mock_peer_units):
def mock_rel_get(unit=None, **kwargs):
if unit == 'unit/0':
return 'unit/0'
return None
self.relation_get.side_effect = mock_rel_get
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
self.related_units.return_value = []
mock_peer_units.return_value = []
# This should get ignored since we are overriding
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = False
self.assertTrue(utils.ensure_ssl_cert_master())
settings = {'ssl-cert-master': 'unit/0'}
self.relation_set.assert_called_with(relation_id='cluster:0',
relation_settings=settings)
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_ssl_not_leader(self, mock_peer_units):
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
mock_peer_units.return_value = ['unit/1']
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = False
self.assertFalse(utils.ensure_ssl_cert_master())
self.assertFalse(self.relation_set.called)
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_is_leader_new_peer(self,
mock_peer_units):
def mock_rel_get(unit=None, **kwargs):
if unit == 'unit/0':
return 'unit/0'
return 'unknown'
self.relation_get.side_effect = mock_rel_get
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
mock_peer_units.return_value = ['unit/1']
self.related_units.return_value = ['unit/1']
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = True
self.assertFalse(utils.ensure_ssl_cert_master())
settings = {'ssl-cert-master': 'unit/0'}
self.relation_set.assert_called_with(relation_id='cluster:0',
relation_settings=settings)
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_is_leader_no_new_peer(self,
mock_peer_units):
def mock_rel_get(unit=None, **kwargs):
if unit == 'unit/0':
return 'unit/0'
return 'unit/0'
self.relation_get.side_effect = mock_rel_get
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
mock_peer_units.return_value = ['unit/1']
self.related_units.return_value = ['unit/1']
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = True
self.assertFalse(utils.ensure_ssl_cert_master())
self.assertFalse(self.relation_set.called)
@patch('charmhelpers.contrib.openstack.ip.unit_get')
@patch('charmhelpers.contrib.openstack.ip.is_clustered')
@patch('charmhelpers.contrib.openstack.ip.config')
@patch.object(utils, 'create_keystone_endpoint')
@patch.object(utils, 'create_tenant')
@patch.object(utils, 'create_user_credentials')
@patch.object(utils, 'create_service_entry')
def test_ensure_initial_admin_public_name(self,
_create_service_entry,
_create_user_creds,
_create_tenant,
_create_keystone_endpoint,
_ip_config,
_is_clustered,
_unit_get):
_is_clustered.return_value = False
_ip_config.side_effect = self.test_config.get
_unit_get.return_value = '10.0.0.1'
self.test_config.set('os-public-hostname', 'keystone.example.com')
utils.ensure_initial_admin(self.config)
_create_keystone_endpoint.assert_called_with(
public_ip='keystone.example.com',
service_port=5000,
internal_ip='10.0.0.1',
admin_ip='10.0.0.1',
auth_port=35357,
region='RegionOne',
)
@patch.object(utils, 'peer_units')
def test_ensure_ssl_cert_master_is_leader_bad_votes(self,
mock_peer_units):
counter = {0: 0}
def mock_rel_get(unit=None, **kwargs):
"""Returns a mix of votes."""
if unit == 'unit/0':
return 'unit/0'
ret = 'unit/%d' % (counter[0])
counter[0] += 1
return ret
self.relation_get.side_effect = mock_rel_get
self.relation_ids.return_value = ['cluster:0']
self.local_unit.return_value = 'unit/0'
mock_peer_units.return_value = ['unit/1']
self.related_units.return_value = ['unit/1']
self.is_ssl_cert_master.return_value = False
self.is_elected_leader.return_value = True
self.assertFalse(utils.ensure_ssl_cert_master())
self.assertFalse(self.relation_set.called)
@patch.object(utils, 'git_install_requested')
@patch.object(utils, 'git_post_install')
@patch.object(utils, 'git_pre_install')
def test_git_install(self, git_requested, git_pre, git_post):
projects_yaml = openstack_origin_git
git_requested.return_value = True
utils.git_install(projects_yaml)
self.assertTrue(git_pre.called)
self.git_clone_and_install.assert_called_with(openstack_origin_git,
core_project='keystone')
self.assertTrue(git_post.called)
@patch.object(utils, 'mkdir')
@patch.object(utils, 'write_file')
@patch.object(utils, 'add_user_to_group')
@patch.object(utils, 'add_group')
@patch.object(utils, 'adduser')
def test_git_pre_install(self, adduser, add_group, add_user_to_group,
write_file, mkdir):
utils.git_pre_install()
adduser.assert_called_with('keystone', shell='/bin/bash',
system_user=True,
home_dir='/var/lib/keystone')
add_group.assert_called_with('keystone', system_group=True)
add_user_to_group.assert_called_with('keystone', 'keystone')
expected = [
call('/var/lib/keystone', owner='keystone',
group='keystone', perms=0755, force=False),
call('/var/lib/keystone/cache', owner='keystone',
group='keystone', perms=0755, force=False),
call('/var/log/keystone', owner='keystone',
group='keystone', perms=0755, force=False),
]
self.assertEquals(mkdir.call_args_list, expected)
write_file.assert_called_with('/var/log/keystone/keystone.log',
'', owner='keystone', group='keystone',
perms=0600)
@patch('os.path.join')
@patch('os.path.exists')
@patch('os.symlink')
@patch('shutil.copytree')
@patch('shutil.rmtree')
@patch('subprocess.check_call')
def test_git_post_install(self, check_call, rmtree, copytree, symlink,
exists, join):
projects_yaml = openstack_origin_git
join.return_value = 'joined-string'
self.git_pip_venv_dir.return_value = '/mnt/openstack-git/venv'
self.lsb_release.return_value = {'DISTRIB_RELEASE': '15.04'}
utils.git_post_install(projects_yaml)
expected = [
call('joined-string', '/etc/keystone'),
]
copytree.assert_has_calls(expected)
expected = [
call('joined-string', '/usr/local/bin/keystone-manage'),
]
symlink.assert_has_calls(expected, any_order=True)
keystone_context = {
'service_description': 'Keystone API server',
'service_name': 'Keystone',
'user_name': 'keystone',
'start_dir': '/var/lib/keystone',
'process_name': 'keystone',
'executable_name': 'joined-string',
'config_files': ['/etc/keystone/keystone.conf'],
'log_file': '/var/log/keystone/keystone.log',
}
expected = [
call('git/logging.conf', '/etc/keystone/logging.conf', {},
perms=0o644),
call('git.upstart', '/etc/init/keystone.conf',
keystone_context, perms=0o644, templates_dir='joined-string'),
]
self.assertEquals(self.render.call_args_list, expected)
self.service_restart.assert_called_with('keystone')
@patch.object(utils, 'get_manager')
def test_is_service_present(self, KeystoneManager):
mock_keystone = MagicMock()
mock_keystone.resolve_service_id.return_value = 'sid1'
KeystoneManager.return_value = mock_keystone
self.assertTrue(utils.is_service_present('bob', 'bill'))
@patch.object(utils, 'get_manager')
def test_is_service_present_false(self, KeystoneManager):
mock_keystone = MagicMock()
mock_keystone.resolve_service_id.return_value = None
KeystoneManager.return_value = mock_keystone
self.assertFalse(utils.is_service_present('bob', 'bill'))
@patch.object(utils, 'get_manager')
def test_delete_service_entry(self, KeystoneManager):
mock_keystone = MagicMock()
mock_keystone.resolve_service_id.return_value = 'sid1'
KeystoneManager.return_value = mock_keystone
utils.delete_service_entry('bob', 'bill')
mock_keystone.api.services.delete.assert_called_with('sid1')
@patch('os.path.isfile')
def test_get_file_stored_domain_id(self, isfile_mock):
isfile_mock.return_value = False
x = utils.get_file_stored_domain_id('/a/file')
assert x is None
from sys import version_info
if version_info.major == 2:
import __builtin__ as builtins
else:
import builtins
from mock import mock_open
with patch.object(builtins, 'open', mock_open(
read_data="some_data\n")):
isfile_mock.return_value = True
x = utils.get_file_stored_domain_id('/a/file')
self.assertEquals(x, 'some_data')
@patch.object(utils, 'get_file_stored_domain_id')
def test_get_admin_domain_id(self, mock_get_file_stored_domain_id):
utils.get_admin_domain_id()
mock_get_file_stored_domain_id.assert_called_with(
'/var/lib/keystone/keystone.admin_domain_id')
@patch.object(utils, 'get_file_stored_domain_id')
def test_get_default_domain_id(self, mock_get_file_stored_domain_id):
utils.get_default_domain_id()
mock_get_file_stored_domain_id.assert_called_with(
'/var/lib/keystone/keystone.default_domain_id')
def test_assess_status(self):
with patch.object(utils, 'assess_status_func') as asf:
callee = MagicMock()
asf.return_value = callee
utils.assess_status('test-config')
asf.assert_called_once_with('test-config')
callee.assert_called_once_with()
self.os_application_version_set.assert_called_with(
utils.VERSION_PACKAGE
)
@patch.object(utils, 'get_optional_interfaces')
@patch.object(utils, 'REQUIRED_INTERFACES')
@patch.object(utils, 'check_optional_relations')
@patch.object(utils, 'services')
@patch.object(utils, 'determine_ports')
@patch.object(utils, 'make_assess_status_func')
def test_assess_status_func(self,
make_assess_status_func,
determine_ports,
services,
check_optional_relations,
REQUIRED_INTERFACES,
get_optional_interfaces):
services.return_value = 's1'
determine_ports.return_value = 'p1'
REQUIRED_INTERFACES.copy.return_value = {'int': ['test 1']}
get_optional_interfaces.return_value = {'opt': ['test 2']}
utils.assess_status_func('test-config')
make_assess_status_func.assert_called_once_with(
'test-config',
{'int': ['test 1'], 'opt': ['test 2']},
charm_func=check_optional_relations, services='s1', ports='p1')
def test_pause_unit_helper(self):
with patch.object(utils, '_pause_resume_helper') as prh:
utils.pause_unit_helper('random-config')
prh.assert_called_once_with(utils.pause_unit, 'random-config')
with patch.object(utils, '_pause_resume_helper') as prh:
utils.resume_unit_helper('random-config')
prh.assert_called_once_with(utils.resume_unit, 'random-config')
@patch.object(utils, 'services')
@patch.object(utils, 'determine_ports')
def test_pause_resume_helper(self, determine_ports, services):
f = MagicMock()
services.return_value = 's1'
determine_ports.return_value = 'p1'
with patch.object(utils, 'assess_status_func') as asf:
asf.return_value = 'assessor'
utils._pause_resume_helper(f, 'some-config')
asf.assert_called_once_with('some-config')
f.assert_called_once_with('assessor', services='s1', ports='p1')
@patch.object(utils, 'run_in_apache')
@patch.object(utils, 'restart_pid_check')
def test_restart_function_map(self, restart_pid_check, run_in_apache):
run_in_apache.return_value = True
self.assertEqual(utils.restart_function_map(),
{'apache2': restart_pid_check})
@patch.object(utils, 'run_in_apache')
def test_restart_function_map_legacy(self, run_in_apache):
run_in_apache.return_value = False
self.assertEqual(utils.restart_function_map(), {})
def test_restart_pid_check(self):
self.subprocess.call.return_value = 1
utils.restart_pid_check('apache2')
self.service_stop.assert_called_once_with('apache2')
self.service_start.assert_called_once_with('apache2')
self.subprocess.call.assert_called_once_with(['pgrep', 'apache2'])
def test_restart_pid_check_ptable_string(self):
self.subprocess.call.return_value = 1
utils.restart_pid_check('apache2', ptable_string='httpd')
self.service_stop.assert_called_once_with('apache2')
self.service_start.assert_called_once_with('apache2')
self.subprocess.call.assert_called_once_with(['pgrep', 'httpd'])
# Do not sleep() to speed up manual runs.
@patch('charmhelpers.core.decorators.time')
def test_restart_pid_check_ptable_string_retry(self, mock_time):
call_returns = [1, 0, 0]
self.subprocess.call.side_effect = lambda x: call_returns.pop()
utils.restart_pid_check('apache2', ptable_string='httpd')
self.service_stop.assert_called_once_with('apache2')
self.service_start.assert_called_once_with('apache2')
# self.subprocess.call.assert_called_once_with(['pgrep', 'httpd'])
expected = [
call(['pgrep', 'httpd']),
call(['pgrep', 'httpd']),
call(['pgrep', 'httpd']),
]
self.assertEquals(self.subprocess.call.call_args_list, expected)
def test_get_requested_grants(self):
settings = {'requested_grants': 'Admin,Member'}
expected_results = ['Admin', 'Member']
self.assertEqual(utils.get_requested_grants(settings),
expected_results)
settings = {'not_requsted_grants': 'something else'}
expected_results = []
self.assertEqual(utils.get_requested_grants(settings),
expected_results)
@patch.object(utils, 'https')
def test_get_protocol(self, https):
# http
https.return_value = False
protocol = utils.get_protocol()
self.assertEqual(protocol, 'http')
# https
https.return_value = True
protocol = utils.get_protocol()
self.assertEqual(protocol, 'https')
def test_get_ssl_ca_settings(self):
CA = MagicMock()
CA.get_ca_bundle.return_value = 'certstring'
self.test_config.set('https-service-endpoints', 'True')
self.get_ca.return_value = CA
expected_settings = {'https_keystone': 'True',
'ca_cert': b64encode('certstring')}
settings = utils.get_ssl_ca_settings()
self.assertEqual(settings, expected_settings)
@patch.object(utils, 'get_manager')
def test_add_credentials_keystone_not_ready(self, get_manager):
""" Verify add_credentials_to_keystone when the relation
data is incomplete """
relation_id = 'identity-credentials:0'
remote_unit = 'unit/0'
self.relation_get.return_value = {}
utils.add_credentials_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
self.log.assert_called_with('identity-credentials peer has not yet '
'set username')
@patch.object(utils, 'create_user_credentials')
@patch.object(utils, 'get_protocol')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_manager')
def test_add_credentials_keystone_username_only(self, get_manager,
get_api_version,
resolve_address,
get_protocol,
create_user_credentials):
""" Verify add_credentials with only username """
manager = MagicMock()
manager.resolve_tenant_id.return_value = 'abcdef0123456789'
get_manager.return_value = manager
remote_unit = 'unit/0'
relation_id = 'identity-credentials:0'
get_api_version.return_value = 2
get_protocol.return_value = 'http'
resolve_address.return_value = '10.10.10.10'
create_user_credentials.return_value = 'password'
self.relation_get.return_value = {'username': 'requester'}
self.get_service_password.return_value = 'password'
self.get_requested_roles.return_value = []
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
self.test_config.set('service-tenant', 'services')
relation_data = {'auth_host': '10.10.10.10',
'credentials_host': '10.10.10.10',
'credentials_port': 81,
'auth_port': 80,
'auth_protocol': 'http',
'credentials_username': 'requester',
'credentials_protocol': 'http',
'credentials_password': 'password',
'credentials_project': 'services',
'credentials_project_id': 'abcdef0123456789',
'region': 'RegionOne',
'api_version': 2}
utils.add_credentials_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
create_user_credentials.assert_called_with('requester', 'password',
domain=None,
new_roles=[],
grants=['Admin'],
tenant='services')
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)
@patch.object(utils, 'create_user_credentials')
@patch.object(utils, 'get_protocol')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_manager')
def test_add_credentials_keystone_kv3(self, get_manager,
get_api_version,
resolve_address,
get_protocol,
create_user_credentials):
""" Verify add_credentials with Keystone V3 """
manager = MagicMock()
manager.resolve_tenant_id.return_value = 'abcdef0123456789'
get_manager.return_value = manager
remote_unit = 'unit/0'
relation_id = 'identity-credentials:0'
get_api_version.return_value = 3
get_protocol.return_value = 'http'
resolve_address.return_value = '10.10.10.10'
create_user_credentials.return_value = 'password'
self.relation_get.return_value = {'username': 'requester',
'domain': 'Non-Default'}
self.get_service_password.return_value = 'password'
self.get_requested_roles.return_value = []
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
relation_data = {'auth_host': '10.10.10.10',
'credentials_host': '10.10.10.10',
'credentials_port': 81,
'auth_port': 80,
'auth_protocol': 'http',
'credentials_username': 'requester',
'credentials_protocol': 'http',
'credentials_password': 'password',
'credentials_project': 'services',
'credentials_project_id': 'abcdef0123456789',
'region': 'RegionOne',
'api_version': 3}
utils.add_credentials_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
create_user_credentials.assert_called_with('requester', 'password',
domain='Non-Default',
new_roles=[],
grants=['service'],
tenant='services')
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)
@patch.object(utils, 'create_tenant')
@patch.object(utils, 'create_user_credentials')
@patch.object(utils, 'get_protocol')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_manager')
def test_add_credentials_keystone_roles_grants(self, get_manager,
get_api_version,
resolve_address,
get_protocol,
create_user_credentials,
create_tenant):
""" Verify add_credentials with all relation settings """
manager = MagicMock()
manager.resolve_tenant_id.return_value = 'abcdef0123456789'
get_manager.return_value = manager
remote_unit = 'unit/0'
relation_id = 'identity-credentials:0'
get_api_version.return_value = 2
get_protocol.return_value = 'http'
resolve_address.return_value = '10.10.10.10'
create_user_credentials.return_value = 'password'
self.relation_get.return_value = {'username': 'requester',
'project': 'myproject',
'requested_roles': 'New,Member',
'requested_grants': 'New,Member'}
self.get_service_password.return_value = 'password'
self.get_requested_roles.return_value = ['New', 'Member']
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
relation_data = {'auth_host': '10.10.10.10',
'credentials_host': '10.10.10.10',
'credentials_port': 81,
'auth_port': 80,
'auth_protocol': 'http',
'credentials_username': 'requester',
'credentials_protocol': 'http',
'credentials_password': 'password',
'credentials_project': 'myproject',
'credentials_project_id': 'abcdef0123456789',
'region': 'RegionOne',
'api_version': 2}
utils.add_credentials_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
create_tenant.assert_called_with('myproject', None)
create_user_credentials.assert_called_with('requester', 'password',
domain=None,
new_roles=['New', 'Member'],
grants=['New', 'Member'],
tenant='myproject')
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)
@patch.object(utils, 'get_ssl_ca_settings')
@patch.object(utils, 'create_user_credentials')
@patch.object(utils, 'get_protocol')
@patch.object(utils, 'resolve_address')
@patch.object(utils, 'get_api_version')
@patch.object(utils, 'get_manager')
def test_add_credentials_keystone_ssl(self, get_manager,
get_api_version,
resolve_address,
get_protocol,
create_user_credentials,
get_ssl_ca_settings):
""" Verify add_credentials with SSL """
manager = MagicMock()
manager.resolve_tenant_id.return_value = 'abcdef0123456789'
get_manager.return_value = manager
remote_unit = 'unit/0'
relation_id = 'identity-credentials:0'
get_api_version.return_value = 2
get_protocol.return_value = 'https'
resolve_address.return_value = '10.10.10.10'
create_user_credentials.return_value = 'password'
get_ssl_ca_settings.return_value = {'https_keystone': 'True',
'ca_cert': 'base64certstring'}
self.relation_get.return_value = {'username': 'requester'}
self.get_service_password.return_value = 'password'
self.get_requested_roles.return_value = []
self.test_config.set('admin-port', 80)
self.test_config.set('service-port', 81)
self.test_config.set('https-service-endpoints', 'True')
relation_data = {'auth_host': '10.10.10.10',
'credentials_host': '10.10.10.10',
'credentials_port': 81,
'auth_port': 80,
'auth_protocol': 'https',
'credentials_username': 'requester',
'credentials_protocol': 'https',
'credentials_password': 'password',
'credentials_project': 'services',
'credentials_project_id': 'abcdef0123456789',
'region': 'RegionOne',
'api_version': 2,
'https_keystone': 'True',
'ca_cert': 'base64certstring'}
utils.add_credentials_to_keystone(
relation_id=relation_id,
remote_unit=remote_unit)
create_user_credentials.assert_called_with('requester', 'password',
domain=None,
new_roles=[],
grants=['Admin'],
tenant='services')
self.peer_store_and_set.assert_called_with(relation_id=relation_id,
**relation_data)