Merge "Move TLS related helpers to ``OpenStackCharm`` base class"

This commit is contained in:
Zuul 2019-09-03 07:54:17 +00:00 committed by Gerrit Code Review
commit 42079b789f
2 changed files with 541 additions and 524 deletions

View File

@ -15,7 +15,7 @@ import charmhelpers.contrib.openstack.cert_utils as cert_utils
import charmhelpers.core.hookenv as hookenv
import charmhelpers.core.host as ch_host
import charmhelpers.fetch as fetch
import charms.reactive.relations as relations
import charms.reactive as reactive
from charms_openstack.charm.core import (
BaseOpenStackCharm,
@ -256,29 +256,94 @@ class OpenStackCharm(BaseOpenStackCharm,
os_utils.manage_payload_services('stop', services)
os_utils.manage_payload_services('start', services)
def get_certificate_requests(self):
"""Return a dict of certificate requests"""
return cert_utils.get_certificate_request(
json_encode=False).get('cert_requests', {})
@property
def rabbit_client_cert_dir(self):
return '/var/lib/charm/{}'.format(hookenv.service_name())
return '/var/lib/charm/{}'.format(self.service_name)
@property
def rabbit_cert_file(self):
return '{}/rabbit-client-ca.pem'.format(self.rabbit_client_cert_dir)
def get_default_cn(self):
"""Return the default Canonical Name to be used for TLS setup
:returns: 'canonical_name'
:rtype: str
"""
return os_ip.resolve_address(endpoint_type=os_ip.INTERNAL)
def configure_cert(self, path, cert, key, cn=None):
"""Write out TLS certificate and key to disk.
:param path: Directory to place files in
:type path: str
:param cert: TLS Certificate
:type cert: str
:param key: TLS Key
:type key: str
:param cn: Canonical name for service
:type cn: Option[None, str]
"""
if not cn:
cn = self.get_default_cn()
ch_host.mkdir(path=path)
if cn:
cert_filename = 'cert_{}'.format(cn)
key_filename = 'key_{}'.format(cn)
else:
cert_filename = 'cert'
key_filename = 'key'
ch_host.write_file(path=os.path.join(path, cert_filename),
content=cert.encode('utf-8'), group=self.group,
perms=0o640)
ch_host.write_file(path=os.path.join(path, key_filename),
content=key.encode('utf-8'), group=self.group,
perms=0o640)
def get_local_addresses(self):
"""Return list of local addresses on each configured network
For each network return an address the local unit has on that network
if one exists.
:returns: [private_addr, admin_addr, public_addr, ...]
:rtype: List[str]
"""
addresses = [
os_utils.get_host_ip(hookenv.unit_get('private-address'))]
for addr_type in os_ip.ADDRESS_MAP.keys():
laddr = os_ip.resolve_address(endpoint_type=addr_type)
if laddr:
addresses.append(laddr)
return sorted(list(set(addresses)))
def get_certs_and_keys(self, keystone_interface=None,
certificates_interface=None):
"""Collect SSL config for local endpoints
"""Collect TLS config for local endpoints
SSL keys and certs may come from user specified configuration for this
charm or they may come directly from Keystone.
TLS keys and certs may come from user specified configuration for this
charm or they may come directly from the ``certificates`` relation.
If collecting from keystone there may be a certificate and key per
endpoint (public, admin etc).
If collecting from ``certificates`` relation there may be a certificate
and key per endpoint (public, admin etc).
@returns [
:param keystone_interface: DEPRECATED Functionality removed.
:type keystone_interace: Option[None, KeystoneRequires(RelationBase)]
:param certificates_interface: Certificates interface object
:type certificates_interface: TlsRequires(Endpoint)
:returns: [
{'key': 'key1', 'cert': 'cert1', 'ca': 'ca1', 'cn': 'cn1'}
{'key': 'key2', 'cert': 'cert2', 'ca': 'ca2', 'cn': 'cn2'}
...
]
:rtype: List[Dict[str,str]]
"""
if self.config_defined_ssl_key and self.config_defined_ssl_cert:
ssl_artifacts = []
@ -290,19 +355,6 @@ class OpenStackCharm(BaseOpenStackCharm,
if self.config_defined_ssl_ca else None),
'cn': os_ip.resolve_address(endpoint_type=ep_type)})
return ssl_artifacts
elif keystone_interface:
keys_and_certs = []
for addr in self.get_local_addresses():
key = keystone_interface.get_ssl_key(addr)
cert = keystone_interface.get_ssl_cert(addr)
ca = keystone_interface.get_ssl_ca()
if key and cert:
keys_and_certs.append({
'key': key,
'cert': cert,
'ca': ca,
'cn': addr})
return keys_and_certs
elif certificates_interface:
keys_and_certs = []
reqs = certificates_interface.get_batch_requests()
@ -343,52 +395,61 @@ class OpenStackCharm(BaseOpenStackCharm,
return self._get_b64decode_for('ssl_ca')
def configure_ssl(self, keystone_interface=None):
"""Configure SSL certificates and keys
"""DEPRECATED Configure SSL certificates and keys.
NOTE(AJK): This function tries to minimise the work it does,
particularly with writing files and restarting apache.
@param keystone_interface KeystoneRequires class
Please use configure_tls insteaad.
"""
keystone_interface = (
relations.endpoint_from_flag('identity-service.available.ssl') or
relations
.endpoint_from_flag('identity-service.available.ssl_legacy'))
certificates_interface = relations.endpoint_from_flag(
'certificates.batch.cert.available')
ssl_objects = self.get_certs_and_keys(
keystone_interface=keystone_interface,
certificates_interface=certificates_interface)
with is_data_changed('configure_ssl.ssl_objects',
ssl_objects) as changed:
if ssl_objects:
# NOTE(fnordahl): regardless of changes to data we may
# have other changes we want to apply to the files.
# (e.g. ownership, permissions)
#
# Also note that c-h.host.write_file used in configure_cert()
# has it's own logic to detect data changes.
#
# LP: #1821314
for ssl in ssl_objects:
self.set_state('ssl.requested', True)
self.configure_cert(
ssl['cert'], ssl['key'], cn=ssl['cn'])
self.configure_ca(ssl['ca'])
cert_utils.create_ip_cert_links(
os.path.join('/etc/apache2/ssl/', self.name))
if not os_utils.snap_install_requested() and changed:
self.configure_apache()
ch_host.service_reload('apache2')
hookenv.log('configure_ssl method is DEPRECATED, please use '
'configure_tls instead.', level=hookenv.WARNING)
self.configure_tls(
certificates_interface=reactive.endpoint_from_flag(
'certificates.available'))
self.remove_state('ssl.requested')
self.set_state('ssl.enabled', True)
else:
self.set_state('ssl.enabled', False)
amqp_ssl = relations.endpoint_from_flag('amqp.available.ssl')
def configure_tls(self, certificates_interface=None):
"""Write out TLS certificate data.
The reactive handler counterpart in ``layer-openstack`` will make
sure this helper is called when certificate data is available or
changed.
Note that if your charm uses the OpenStackCharm base class directly
and want to write out client/server certificate and key data you will
need to override this method and call configure_cert() with a path
argument appropriate for the service you are implementing a charm
for.
:param certificates_interface: A certificates relation
:type certificates_interface: Option[None, TlsRequires(Endpoint)]
:returns: List of certificate data as returned by get_certs_and_keys()
:rtype: List[Dict[str,str]]
"""
tls_objects = self.get_certs_and_keys(
certificates_interface=certificates_interface)
if tls_objects:
# NOTE(fnordahl): regardless of changes to data we may
# have other changes we want to apply to the files.
# (e.g. ownership, permissions)
#
# Also note that update_central_cacerts() used in configure_ca()
# has it's own logic to detect data changes.
#
# LP: #1821314
for tls_object in tls_objects:
self.configure_ca(tls_object['ca'])
# NOTE(fnordahl): Retaining for in-transition compability with current
# usage. The RabbitMQ TLS configuration should be initiated by the
# layer code. Given we have non-API services consuming RabbitMQ we
# should probably move the RabbitMQ reactive handling code down to the
# ``openstack`` layer too.
#
# Will address this in separate review. LP: #1841912
amqp_ssl = reactive.endpoint_from_flag('amqp.available.ssl')
if amqp_ssl:
self.configure_rabbit_cert(amqp_ssl)
return tls_objects
def configure_rabbit_cert(self, rabbit_interface):
if not os.path.exists(self.rabbit_client_cert_dir):
os.makedirs(self.rabbit_client_cert_dir)
@ -397,7 +458,7 @@ class OpenStackCharm(BaseOpenStackCharm,
@contextlib.contextmanager
def update_central_cacerts(self, cert_files, update_certs=True):
"""Update Central certs info if once of cert_files changes"""
"""Update Central certs info if one of cert_files changes"""
checksums = {path: ch_host.path_hash(path)
for path in cert_files}
yield
@ -411,7 +472,8 @@ class OpenStackCharm(BaseOpenStackCharm,
"""Write Certificate Authority certificate"""
# TODO(jamespage): work this out for snap based installations
cert_file = (
'/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt')
'/usr/local/share/ca-certificates/{}.crt'
.format(self.service_name))
if ca_cert:
with self.update_central_cacerts([cert_file], update_certs):
with open(cert_file, 'w') as crt:
@ -442,6 +504,10 @@ class OpenStackCharm(BaseOpenStackCharm,
ch_host.mkdir(os.path.dirname(ca_certs))
shutil.copyfile(SYSTEM_CA_CERTS, ca_certs)
@property
def service_name(self):
return hookenv.service_name()
class OpenStackAPICharm(OpenStackCharm):
"""The base class for API OS charms -- this just bakes in the default
@ -554,12 +620,6 @@ class OpenStackAPICharm(OpenStackCharm):
"get_database_setup() needs to be overridden in the derived "
"class")
def get_certificate_requests(self):
"""Return a dict of certificate requests
"""
return cert_utils.get_certificate_request(
json_encode=False).get('cert_requests', {})
@property
def all_packages(self):
"""List of packages to be installed
@ -804,61 +864,51 @@ class HAOpenStackCharm(OpenStackAPICharm):
if restart:
ch_host.service_restart('apache2')
def get_default_cn(self):
"""Return the default Canonical Name to be used for SSL setup
def configure_tls(self, certificates_interface=None):
"""Configure TLS certificates and keys
@returns 'canonical_name'
NOTE(AJK): This function tries to minimise the work it does,
particularly with writing files and restarting apache.
:param certificates_interface: certificates relation endpoint
:type certificates_interface: TlsRequires(Endpoint) object
"""
return os_ip.resolve_address(endpoint_type=os_ip.INTERNAL)
# this takes care of writing out the CA certificate
tls_objects = super().configure_tls(
certificates_interface=certificates_interface)
with is_data_changed(
'configure_ssl.ssl_objects', tls_objects) as changed:
if tls_objects:
# NOTE(fnordahl): regardless of changes to data we may
# have other changes we want to apply to the files.
# (e.g. ownership, permissions)
#
# Also note that c-h.host.write_file used in configure_cert()
# has it's own logic to detect data changes.
#
# LP: #1821314
for tls_object in tls_objects:
self.set_state('ssl.requested', True)
if os_utils.snap_install_requested():
path = ('/var/snap/{snap_name}/common/etc/nginx/ssl'
.format(snap_name=self.primary_snap))
else:
path = os.path.join('/etc/apache2/ssl/', self.name)
self.configure_cert(
path,
tls_object['cert'],
tls_object['key'],
cn=tls_object['cn'])
cert_utils.create_ip_cert_links(
os.path.join('/etc/apache2/ssl/', self.name))
if not os_utils.snap_install_requested() and changed:
self.configure_apache()
ch_host.service_reload('apache2')
def configure_cert(self, cert, key, cn=None):
"""Configure service SSL cert and key
Write out service SSL certificate and key for Apache.
@param cert string SSL Certificate
@param key string SSL Key
@param cn string Canonical name for service
"""
if os_utils.snap_install_requested():
ssl_dir = '/var/snap/{snap_name}/common/etc/nginx/ssl'.format(
snap_name=self.primary_snap)
else:
ssl_dir = os.path.join('/etc/apache2/ssl/', self.name)
if not cn:
cn = self.get_default_cn()
ch_host.mkdir(path=ssl_dir)
if cn:
cert_filename = 'cert_{}'.format(cn)
key_filename = 'key_{}'.format(cn)
else:
cert_filename = 'cert'
key_filename = 'key'
ch_host.write_file(path=os.path.join(ssl_dir, cert_filename),
content=cert.encode('utf-8'), group=self.group,
perms=0o640)
ch_host.write_file(path=os.path.join(ssl_dir, key_filename),
content=key.encode('utf-8'), group=self.group,
perms=0o640)
def get_local_addresses(self):
"""Return list of local addresses on each configured network
For each network return an address the local unit has on that network
if one exists.
@returns [private_addr, admin_addr, public_addr, ...]
"""
addresses = [
os_utils.get_host_ip(hookenv.unit_get('private-address'))]
for addr_type in os_ip.ADDRESS_MAP.keys():
laddr = os_ip.resolve_address(endpoint_type=addr_type)
if laddr:
addresses.append(laddr)
return sorted(list(set(addresses)))
self.remove_state('ssl.requested')
self.set_state('ssl.enabled', True)
else:
self.set_state('ssl.enabled', False)
def update_peers(self, cluster):
"""Update peers in the cluster about the addresses that this unit
@ -938,15 +988,11 @@ class CinderStoragePluginCharm(OpenStackCharm):
def stateless(self):
raise NotImplementedError()
@property
def service_name(self):
return hookenv.service_name()
def cinder_configuration(self):
raise NotImplementedError()
def send_storage_backend_data(self):
cbend = relations.endpoint_from_flag('storage-backend.connected')
cbend = reactive.endpoint_from_flag('storage-backend.connected')
cbend.configure_principal(
backend_name=self.service_name,
configuration=self.cinder_configuration(),

View File

@ -324,6 +324,363 @@ class TestMyOpenStackCharm(BaseOpenStackCharmTest):
mock.call('stop', svcs),
mock.call('start', svcs)])
def test_configure_cert(self):
self.patch_object(chm.ch_host, 'mkdir')
self.patch_object(chm.ch_host, 'write_file')
self.target.configure_cert('/some/path', 'mycert', 'mykey', cn='mycn')
self.mkdir.assert_called_once_with(path='/some/path')
calls = [
mock.call(
path='/some/path/cert_mycn',
content=b'mycert', group='root', perms=0o640),
mock.call(
path='/some/path/key_mycn',
content=b'mykey', group='root', perms=0o640)]
self.write_file.assert_has_calls(calls)
self.write_file.reset_mock()
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.target.configure_cert('/some/path', 'mycert', 'mykey')
calls = [
mock.call(
path='/some/path/cert_addr',
content=b'mycert', group='root', perms=0o640),
mock.call(
path='/some/path/key_addr',
content=b'mykey', group='root', perms=0o640)]
self.write_file.assert_has_calls(calls)
def test_get_local_addresses(self):
self.patch_object(chm.os_utils, 'get_host_ip', return_value='privaddr')
self.patch_object(chm.os_ip, 'resolve_address')
addresses = {
'admin': 'admin_addr',
'int': 'internal_addr',
'public': 'public_addr'}
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.assertEqual(
self.target.get_local_addresses(),
['admin_addr', 'internal_addr', 'privaddr', 'public_addr'])
def test_get_certs_and_keys(self):
config = {
'ssl_key': base64.b64encode(b'key'),
'ssl_cert': base64.b64encode(b'cert'),
'ssl_ca': base64.b64encode(b'ca')}
addresses = {
'admin': 'adm_addr',
'int': 'int_addr',
'public': 'pub_addr'}
self.patch_target('config', new=config)
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.assertEqual(
self.target.get_certs_and_keys(),
[
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'int_addr'},
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'adm_addr'},
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'pub_addr'}])
def test_get_certs_and_keys_noca(self):
config = {
'ssl_key': base64.b64encode(b'key'),
'ssl_cert': base64.b64encode(b'cert')}
addresses = {
'admin': 'adm_addr',
'int': 'int_addr',
'public': 'pub_addr'}
self.patch_target('config', new=config)
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.assertEqual(
self.target.get_certs_and_keys(),
[
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'int_addr'},
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'adm_addr'},
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'pub_addr'}])
def test_get_certs_and_keys_certs_interface(self):
class CertsInterface(object):
def get_batch_requests(self):
req = {
'int_addr': {
'cert': 'int_cert',
'key': 'int_key'},
'priv_addr': {
'cert': 'priv_cert',
'key': 'priv_key'},
'pub_addr': {
'cert': 'pub_cert',
'key': 'pub_key'},
'admin_addr': {
'cert': 'admin_cert',
'key': 'admin_key'}}
return req
def get_ca(self):
return 'CA'
def get_chain(self):
return 'CHAIN'
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
expect = [
{
'ca': 'CA',
'cert': 'admin_cert\nCHAIN',
'cn': 'admin_addr',
'key': 'admin_key'},
{
'ca': 'CA',
'cert': 'int_cert\nCHAIN',
'cn': 'int_addr',
'key': 'int_key'},
{
'ca': 'CA',
'cert': 'priv_cert\nCHAIN',
'cn': 'priv_addr',
'key': 'priv_key'},
{
'ca': 'CA',
'cert': 'pub_cert\nCHAIN',
'cn': 'pub_addr',
'key': 'pub_key'},
]
self.assertEqual(
self.target.get_certs_and_keys(
certificates_interface=CertsInterface()),
expect)
def test_config_defined_certs_and_keys(self):
# test that the cached parameters do what we expect
config = {
'ssl_key': base64.b64encode(b'confkey'),
'ssl_cert': base64.b64encode(b'confcert'),
'ssl_ca': base64.b64encode(b'confca')}
self.patch_target('config', new=config)
self.assertEqual(self.target.config_defined_ssl_key, b'confkey')
self.assertEqual(self.target.config_defined_ssl_cert, b'confcert')
self.assertEqual(self.target.config_defined_ssl_ca, b'confca')
def test_configure_ssl_rabbit(self):
self.patch_target('get_certs_and_keys', return_value=[])
self.patch_target('configure_rabbit_cert')
self.patch('charms.reactive.bus.set_state', name='set_state')
self.patch_object(chm.reactive, 'endpoint_from_flag',
return_value='ssl_int')
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
with mock.patch.object(chm.reactive.helpers,
'is_data_changed'):
self.target.configure_ssl()
self.configure_rabbit_cert.assert_called_once_with('ssl_int')
def test_configure_rabbit_cert(self):
rabbit_int_mock = mock.MagicMock()
rabbit_int_mock.get_ssl_cert.return_value = 'rabbit_cert'
self.patch_object(chm.os.path, 'exists', return_value=True)
self.patch_object(chm.os, 'mkdir')
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
with utils.patch_open() as (mock_open, mock_file):
self.target.configure_rabbit_cert(rabbit_int_mock)
mock_open.assert_called_with(
'/var/lib/charm/svc1/rabbit-client-ca.pem',
'w')
mock_file.write.assert_called_with('rabbit_cert')
def test_configure_tls(self):
tls_objs = [
{
'cert': 'cert1',
'key': 'key1',
'ca': 'ca1',
'cn': 'cn1'},
{
'cert': 'cert2',
'key': 'key2',
'ca': 'ca2',
'cn': 'cn2'}]
self.patch_target('get_certs_and_keys', return_value=tls_objs)
self.patch_target('configure_ca')
self.patch('charms.reactive.bus.set_state', name='set_state')
ca_calls = [
mock.call('ca1'),
mock.call('ca2')]
self.target.configure_tls()
self.configure_ca.assert_has_calls(ca_calls)
def test_configure_ca(self):
self.patch_target('run_update_certs')
self.patch_target('install_snap_certs')
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
with utils.patch_open() as (mock_open, mock_file):
self.target.configure_ca('myca')
mock_open.assert_called_with(
'/usr/local/share/ca-certificates/svc1.crt',
'w')
mock_file.write.assert_called_with('myca')
def test_run_update_certs(self):
self.patch_object(chm.subprocess, 'check_call')
self.target.run_update_certs()
self.check_call.assert_called_once_with(
['update-ca-certificates', '--fresh'])
def test_install_snap_certs(self):
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=True)
self.patch_object(chm.shutil, 'copyfile')
self.patch_object(chm.ch_host, 'mkdir')
self.patch_object(chm.os.path, 'exists', return_value=True)
self.target.snaps = ['mysnap']
self.target.install_snap_certs()
self.exists.assert_called_with('/etc/ssl/certs/ca-certificates.crt')
self.copyfile.assert_called_with(
'/etc/ssl/certs/ca-certificates.crt',
'/var/snap/mysnap/common/etc/ssl/certs/ca-certificates.crt',
)
self.mkdir.assert_called_with('/var/snap/mysnap/common/etc/ssl/certs')
self.snap_install_requested.reset_mock()
self.snap_install_requested.return_value = True
self.exists.reset_mock()
self.exists.return_value = False
self.copyfile.reset_mock()
self.mkdir.reset_mock()
self.target.install_snap_certs()
self.exists.assert_called_with('/etc/ssl/certs/ca-certificates.crt')
self.mkdir.assert_not_called()
self.copyfile.assert_not_called()
self.snap_install_requested.reset_mock()
self.snap_install_requested.return_value = False
self.exists.reset_mock()
self.exists.return_value = True
self.copyfile.reset_mock()
self.mkdir.reset_mock()
self.target.install_snap_certs()
self.exists.assert_not_called()
self.mkdir.assert_not_called()
self.copyfile.assert_not_called()
def test_update_central_cacerts(self):
self.patch_target('run_update_certs')
change_hashes = ['hash1', 'hash2']
nochange_hashes = ['hash1', 'hash1']
def fake_hash(hash_dict):
def fake_hash_inner(filename):
return hash_dict.pop()
return fake_hash_inner
self.patch_object(chm.ch_host, 'path_hash')
self.path_hash.side_effect = fake_hash(change_hashes)
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
with self.target.update_central_cacerts(['file1']):
pass
self.run_update_certs.assert_called_with()
self.run_update_certs.reset_mock()
self.path_hash.side_effect = fake_hash(nochange_hashes)
with self.target.update_central_cacerts(['file1']):
pass
self.assertFalse(self.run_update_certs.called)
class TestCinderStoragePluginCharm(BaseOpenStackCharmTest):
def setUp(self):
super(TestCinderStoragePluginCharm, self).setUp(
chm.CinderStoragePluginCharm,
TEST_CONFIG)
def test_install(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('config', new={'driver-source': 'ppa:user/ppa'})
self.patch_target('install_resources')
self.target.install()
self.add_source.assert_called_once_with('ppa:user/ppa', key=None)
self.apt_update.assert_called_once_with()
self.install_resources.assert_called_once_with()
def test_install_with_key(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('install_resources')
self.patch_target(
'config',
new={
'driver-source': 'ppa:user/ppa',
'driver-key': 'mykey'})
self.target.install()
self.add_source.assert_called_once_with('ppa:user/ppa', key='mykey')
self.apt_update.assert_called_once_with()
def test_install_no_additional_source(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('install_resources')
self.patch_target(
'config',
new={
'driver-source': '',
'driver-key': ''})
self.target.install()
self.assertFalse(self.add_source.called)
self.assertFalse(self.apt_update.called)
def test_install_source_undefined(self):
# A charm may be based from this class but not implement the
# additonal ppa option.
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('config', new={})
self.patch_target('install_resources')
self.target.install()
self.assertFalse(self.add_source.called)
self.assertFalse(self.apt_update.called)
def test_stateless(self):
with self.assertRaises(NotImplementedError):
self.target.stateless
def test_service_name(self):
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
self.assertEqual(self.target.service_name, 'svc1')
def test_cinder_configuration(self):
with self.assertRaises(NotImplementedError):
self.target.cinder_configuration()
def test_send_storage_backend_data(self):
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
ep_mock = mock.MagicMock()
self.patch_object(
chm.reactive,
'endpoint_from_flag',
return_value=ep_mock)
with self.assertRaises(NotImplementedError):
self.target.send_storage_backend_data()
class TestOpenStackAPICharm(BaseOpenStackCharmTest):
@ -693,208 +1050,7 @@ class TestHAOpenStackCharm(BaseOpenStackCharmTest):
['a2enmod', 'proxy_http'])
self.service_restart.assert_called_once_with('apache2')
def test_configure_cert(self):
self.patch_object(chm.ch_host, 'mkdir')
self.patch_object(chm.ch_host, 'write_file')
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.target.configure_cert('mycert', 'mykey', cn='mycn')
self.mkdir.assert_called_once_with(path='/etc/apache2/ssl/charmname')
calls = [
mock.call(
path='/etc/apache2/ssl/charmname/cert_mycn',
content=b'mycert', group='root', perms=0o640),
mock.call(
path='/etc/apache2/ssl/charmname/key_mycn',
content=b'mykey', group='root', perms=0o640)]
self.write_file.assert_has_calls(calls)
self.write_file.reset_mock()
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.target.configure_cert('mycert', 'mykey')
calls = [
mock.call(
path='/etc/apache2/ssl/charmname/cert_addr',
content=b'mycert', group='root', perms=0o640),
mock.call(
path='/etc/apache2/ssl/charmname/key_addr',
content=b'mykey', group='root', perms=0o640)]
self.write_file.assert_has_calls(calls)
def test_get_local_addresses(self):
self.patch_object(chm.os_utils, 'get_host_ip', return_value='privaddr')
self.patch_object(chm.os_ip, 'resolve_address')
addresses = {
'admin': 'admin_addr',
'int': 'internal_addr',
'public': 'public_addr'}
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.assertEqual(
self.target.get_local_addresses(),
['admin_addr', 'internal_addr', 'privaddr', 'public_addr'])
def test_get_certs_and_keys(self):
config = {
'ssl_key': base64.b64encode(b'key'),
'ssl_cert': base64.b64encode(b'cert'),
'ssl_ca': base64.b64encode(b'ca')}
addresses = {
'admin': 'adm_addr',
'int': 'int_addr',
'public': 'pub_addr'}
self.patch_target('config', new=config)
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.assertEqual(
self.target.get_certs_and_keys(),
[
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'int_addr'},
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'adm_addr'},
{'key': 'key', 'cert': 'cert', 'ca': 'ca', 'cn': 'pub_addr'}])
def test_get_certs_and_keys_noca(self):
config = {
'ssl_key': base64.b64encode(b'key'),
'ssl_cert': base64.b64encode(b'cert')}
addresses = {
'admin': 'adm_addr',
'int': 'int_addr',
'public': 'pub_addr'}
self.patch_target('config', new=config)
self.patch_object(chm.os_ip, 'resolve_address', 'addr')
self.resolve_address.side_effect = \
lambda endpoint_type=None: addresses[endpoint_type]
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.assertEqual(
self.target.get_certs_and_keys(),
[
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'int_addr'},
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'adm_addr'},
{'key': 'key', 'cert': 'cert', 'ca': None, 'cn': 'pub_addr'}])
def test_get_certs_and_keys_ks_interface(self):
class KSInterface(object):
def get_ssl_key(self, key):
keys = {
'int_addr': 'int_key',
'priv_addr': 'priv_key',
'pub_addr': 'pub_key',
'admin_addr': 'admin_key'}
return keys[key]
def get_ssl_cert(self, key):
certs = {
'int_addr': 'int_cert',
'priv_addr': 'priv_cert',
'pub_addr': 'pub_cert',
'admin_addr': 'admin_cert'}
return certs[key]
def get_ssl_ca(self):
return 'ca'
self.patch_target(
'get_local_addresses',
return_value=['int_addr', 'priv_addr', 'pub_addr', 'admin_addr'])
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
expect = [
{
'ca': 'ca',
'cert': 'int_cert',
'cn': 'int_addr',
'key': 'int_key'},
{
'ca': 'ca',
'cert': 'priv_cert',
'cn': 'priv_addr',
'key': 'priv_key'},
{
'ca': 'ca',
'cert': 'pub_cert',
'cn': 'pub_addr',
'key': 'pub_key'},
{
'ca': 'ca',
'cert': 'admin_cert',
'cn': 'admin_addr',
'key': 'admin_key'}]
self.assertEqual(
self.target.get_certs_and_keys(keystone_interface=KSInterface()),
expect)
def test_get_certs_and_keys_certs_interface(self):
class CertsInterface(object):
def get_batch_requests(self):
req = {
'int_addr': {
'cert': 'int_cert',
'key': 'int_key'},
'priv_addr': {
'cert': 'priv_cert',
'key': 'priv_key'},
'pub_addr': {
'cert': 'pub_cert',
'key': 'pub_key'},
'admin_addr': {
'cert': 'admin_cert',
'key': 'admin_key'}}
return req
def get_ca(self):
return 'CA'
def get_chain(self):
return 'CHAIN'
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
expect = [
{
'ca': 'CA',
'cert': 'admin_cert\nCHAIN',
'cn': 'admin_addr',
'key': 'admin_key'},
{
'ca': 'CA',
'cert': 'int_cert\nCHAIN',
'cn': 'int_addr',
'key': 'int_key'},
{
'ca': 'CA',
'cert': 'priv_cert\nCHAIN',
'cn': 'priv_addr',
'key': 'priv_key'},
{
'ca': 'CA',
'cert': 'pub_cert\nCHAIN',
'cn': 'pub_addr',
'key': 'pub_key'},
]
self.assertEqual(
self.target.get_certs_and_keys(
certificates_interface=CertsInterface()),
expect)
def test_config_defined_certs_and_keys(self):
# test that the cached parameters do what we expect
config = {
'ssl_key': base64.b64encode(b'confkey'),
'ssl_cert': base64.b64encode(b'confcert'),
'ssl_ca': base64.b64encode(b'confca')}
self.patch_target('config', new=config)
self.assertEqual(self.target.config_defined_ssl_key, b'confkey')
self.assertEqual(self.target.config_defined_ssl_cert, b'confcert')
self.assertEqual(self.target.config_defined_ssl_ca, b'confca')
def test_configure_ssl(self):
def test_configure_tls(self):
ssl_objs = [
{
'cert': 'cert1',
@ -911,7 +1067,7 @@ class TestHAOpenStackCharm(BaseOpenStackCharmTest):
self.patch_target('configure_cert')
self.patch_target('configure_ca')
self.patch('charms.reactive.bus.set_state', name='set_state')
self.patch_object(chm.relations, 'endpoint_from_flag',
self.patch_object(chm.reactive, 'endpoint_from_flag',
return_value=None)
self.patch_object(chm_core.charmhelpers.fetch,
'filter_installed_packages',
@ -923,8 +1079,8 @@ class TestHAOpenStackCharm(BaseOpenStackCharmTest):
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
cert_calls = [
mock.call('cert1', 'key1', cn='cn1'),
mock.call('cert2', 'key2', cn='cn2')]
mock.call('/etc/apache2/ssl/charmname', 'cert1', 'key1', cn='cn1'),
mock.call('/etc/apache2/ssl/charmname', 'cert2', 'key2', cn='cn2')]
ca_calls = [
mock.call('ca1'),
mock.call('ca2')]
@ -933,212 +1089,27 @@ class TestHAOpenStackCharm(BaseOpenStackCharmTest):
mock.call('ssl.enabled', True)]
with mock.patch.object(chm, 'is_data_changed') as changed:
changed.return_value.__enter__.return_value = False
self.target.configure_ssl()
self.target.configure_tls()
self.configure_cert.assert_has_calls(cert_calls)
self.configure_ca.assert_has_calls(ca_calls)
self.assertFalse(self.configure_apache.called)
self.set_state.assert_has_calls(set_state_calls)
with mock.patch.object(chm, 'is_data_changed') as changed:
changed.return_value.__enter__.return_value = True
self.target.configure_ssl()
self.target.configure_tls()
self.configure_cert.assert_has_calls(cert_calls)
self.configure_ca.assert_has_calls(ca_calls)
self.configure_apache.called_once_with()
self.set_state.assert_has_calls(set_state_calls)
def test_configure_ssl_off(self):
def test_configure_tls_off(self):
self.patch_target('get_certs_and_keys', return_value=[])
self.patch('charms.reactive.bus.set_state', name='set_state')
self.patch_object(chm.relations, 'endpoint_from_flag',
self.patch_object(chm.reactive, 'endpoint_from_flag',
return_value=None)
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.target.configure_ssl()
self.set_state.assert_called_once_with('ssl.enabled', False)
def test_configure_ssl_rabbit(self):
self.patch_target('get_certs_and_keys', return_value=[])
self.patch_target('configure_rabbit_cert')
self.patch('charms.reactive.bus.set_state', name='set_state')
self.patch_object(chm.relations, 'endpoint_from_flag',
return_value='ssl_int')
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
self.target.configure_ssl()
self.set_state.assert_called_once_with('ssl.enabled', False)
self.configure_rabbit_cert.assert_called_once_with('ssl_int')
def test_configure_rabbit_cert(self):
rabbit_int_mock = mock.MagicMock()
rabbit_int_mock.get_ssl_cert.return_value = 'rabbit_cert'
self.patch_object(chm.os.path, 'exists', return_value=True)
self.patch_object(chm.os, 'mkdir')
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
with utils.patch_open() as (mock_open, mock_file):
self.target.configure_rabbit_cert(rabbit_int_mock)
mock_open.assert_called_with(
'/var/lib/charm/svc1/rabbit-client-ca.pem',
'w')
mock_file.write.assert_called_with('rabbit_cert')
def test_configure_ca(self):
self.patch_target('run_update_certs')
self.patch_target('install_snap_certs')
with utils.patch_open() as (mock_open, mock_file):
self.target.configure_ca('myca')
mock_open.assert_called_with(
'/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt',
'w')
mock_file.write.assert_called_with('myca')
def test_run_update_certs(self):
self.patch_object(chm.subprocess, 'check_call')
self.target.run_update_certs()
self.check_call.assert_called_once_with(
['update-ca-certificates', '--fresh'])
def test_install_snap_certs(self):
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=True)
self.patch_object(chm.shutil, 'copyfile')
self.patch_object(chm.ch_host, 'mkdir')
self.patch_object(chm.os.path, 'exists', return_value=True)
self.target.snaps = ['mysnap']
self.target.install_snap_certs()
self.exists.assert_called_with('/etc/ssl/certs/ca-certificates.crt')
self.copyfile.assert_called_with(
'/etc/ssl/certs/ca-certificates.crt',
'/var/snap/mysnap/common/etc/ssl/certs/ca-certificates.crt',
)
self.mkdir.assert_called_with('/var/snap/mysnap/common/etc/ssl/certs')
self.snap_install_requested.reset_mock()
self.snap_install_requested.return_value = True
self.exists.reset_mock()
self.exists.return_value = False
self.copyfile.reset_mock()
self.mkdir.reset_mock()
self.target.install_snap_certs()
self.exists.assert_called_with('/etc/ssl/certs/ca-certificates.crt')
self.mkdir.assert_not_called()
self.copyfile.assert_not_called()
self.snap_install_requested.reset_mock()
self.snap_install_requested.return_value = False
self.exists.reset_mock()
self.exists.return_value = True
self.copyfile.reset_mock()
self.mkdir.reset_mock()
self.target.install_snap_certs()
self.exists.assert_not_called()
self.mkdir.assert_not_called()
self.copyfile.assert_not_called()
def test_update_central_cacerts(self):
self.patch_target('run_update_certs')
change_hashes = ['hash1', 'hash2']
nochange_hashes = ['hash1', 'hash1']
def fake_hash(hash_dict):
def fake_hash_inner(filename):
return hash_dict.pop()
return fake_hash_inner
self.patch_object(chm.ch_host, 'path_hash')
self.path_hash.side_effect = fake_hash(change_hashes)
self.patch_object(chm.os_utils, 'snap_install_requested',
return_value=False)
with self.target.update_central_cacerts(['file1']):
pass
self.run_update_certs.assert_called_with()
self.run_update_certs.reset_mock()
self.path_hash.side_effect = fake_hash(nochange_hashes)
with self.target.update_central_cacerts(['file1']):
pass
self.assertFalse(self.run_update_certs.called)
class TestCinderStoragePluginCharm(BaseOpenStackCharmTest):
def setUp(self):
super(TestCinderStoragePluginCharm, self).setUp(
chm.CinderStoragePluginCharm,
TEST_CONFIG)
def test_install(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('config', new={'driver-source': 'ppa:user/ppa'})
self.patch_target('install_resources')
self.target.install()
self.add_source.assert_called_once_with('ppa:user/ppa', key=None)
self.apt_update.assert_called_once_with()
self.install_resources.assert_called_once_with()
def test_install_with_key(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('install_resources')
self.patch_target(
'config',
new={
'driver-source': 'ppa:user/ppa',
'driver-key': 'mykey'})
self.target.install()
self.add_source.assert_called_once_with('ppa:user/ppa', key='mykey')
self.apt_update.assert_called_once_with()
def test_install_no_additional_source(self):
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('install_resources')
self.patch_target(
'config',
new={
'driver-source': '',
'driver-key': ''})
self.target.install()
self.assertFalse(self.add_source.called)
self.assertFalse(self.apt_update.called)
def test_install_source_undefined(self):
# A charm may be based from this class but not implement the
# additonal ppa option.
self.patch_object(chm.subprocess, 'check_output', return_value=b'\n')
self.patch_object(chm_core.charmhelpers.fetch, 'add_source')
self.patch_object(chm_core.charmhelpers.fetch, 'apt_update')
self.patch_target('config', new={})
self.patch_target('install_resources')
self.target.install()
self.assertFalse(self.add_source.called)
self.assertFalse(self.apt_update.called)
def test_stateless(self):
with self.assertRaises(NotImplementedError):
self.target.stateless
def test_service_name(self):
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
self.assertEqual(self.target.service_name, 'svc1')
def test_cinder_configuration(self):
with self.assertRaises(NotImplementedError):
self.target.cinder_configuration()
def test_send_storage_backend_data(self):
self.patch_object(chm.hookenv, 'service_name', return_value='svc1')
ep_mock = mock.MagicMock()
self.patch_object(
chm.relations,
'endpoint_from_flag',
return_value=ep_mock)
with self.assertRaises(NotImplementedError):
self.target.send_storage_backend_data()
with mock.patch.object(chm.reactive.helpers,
'is_data_changed'):
self.target.configure_tls()
self.set_state.assert_called_once_with('ssl.enabled', False)