Files
charms.openstack/unit_tests/charms_openstack/plugins/test_classes.py
Frode Nordahl 552fa0e1ec Add Ceph base class and relation adapter
Our Ceph packages are distributed along with our OpenStack
packages both for distro and UCA.

Reactive Ceph charms can thus reuse a large portion of the
``charms.openstack`` library, both for basic package and
service management and for default reactive handlers.

The new classes are placed in a ``plugins`` directory.

First consumer of the ``CephCharm`` class is the
``ceph-rbd-mirror`` charm accompanied by the
``charm-layer-ceph`` layer.

Existing reactive charms that consume or provide Ceph services
should also be ported to use the base functionality now added
to the ``OpenStackCharm`` and ``CephCharm`` base classes
(e.g. ``ceph-fs``, ``gnocchi``).

Adds support to OpenStackRelationAdapter for passing on
properties from Endpoint based interfaces.

Change-Id: I86bdd35b301fa39504c5d5af9a2b7d01bfd38768
2019-03-01 09:14:12 +01:00

153 lines
6.1 KiB
Python

import mock
import os
from unit_tests.charms_openstack.charm.utils import BaseOpenStackCharmTest
import charms_openstack.charm.classes as chm
import charms_openstack.plugins.classes as cpl
TEST_CONFIG = {'config': True,
'openstack-origin': None}
class FakeOpenStackCephConsumingCharm(
chm.OpenStackCharm,
cpl.BaseOpenStackCephCharm):
abstract_class = True
class TestOpenStackCephConsumingCharm(BaseOpenStackCharmTest):
def setUp(self):
super(TestOpenStackCephConsumingCharm, self).setUp(
FakeOpenStackCephConsumingCharm, TEST_CONFIG)
def test_application_name(self):
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='svc1')
self.assertEqual(self.target.application_name, 'svc1')
def test_ceph_service_name(self):
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='charmname')
self.assertEqual(
self.target.ceph_service_name,
'charmname')
self.target.ceph_service_name_override = 'override'
self.assertEqual(
self.target.ceph_service_name,
'override')
def test_ceph_key_name(self):
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='charmname')
self.assertEqual(
self.target.ceph_key_name,
'client.charmname')
self.patch_object(cpl.socket, 'gethostname', return_value='hostname')
self.target.ceph_key_per_unit_name = True
self.assertEqual(
self.target.ceph_key_name,
'client.charmname.hostname')
def test_ceph_keyring_path(self):
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='charmname')
self.assertEqual(
self.target.ceph_keyring_path,
'/etc/ceph')
self.target.snaps = ['gnocchi']
self.assertEqual(
self.target.ceph_keyring_path,
os.path.join(cpl.SNAP_PATH_PREFIX_FORMAT.format('gnocchi'),
'/etc/ceph'))
def test_configure_ceph_keyring(self):
self.patch_object(cpl.os.path, 'isdir', return_value=False)
self.patch_object(cpl.ch_core.host, 'mkdir')
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='sarepta')
self.patch_object(cpl.subprocess, 'check_call')
self.patch_object(cpl.shutil, 'chown')
interface = mock.MagicMock()
interface.key = 'KEY'
self.assertEqual(self.target.configure_ceph_keyring(interface),
'/etc/ceph/ceph.client.sarepta.keyring')
self.isdir.assert_called_with('/etc/ceph')
self.mkdir.assert_called_with('/etc/ceph',
owner='root', group='root', perms=0o750)
self.check_call.assert_called_with([
'ceph-authtool',
'/etc/ceph/ceph.client.sarepta.keyring',
'--create-keyring', '--name=client.sarepta', '--add-key', 'KEY',
'--mode', '0600',
])
self.target.user = 'ceph'
self.target.group = 'ceph'
self.target.configure_ceph_keyring(interface)
self.chown.assert_called_with(
'/etc/ceph/ceph.client.sarepta.keyring',
user='ceph', group='ceph')
class TestCephCharm(BaseOpenStackCharmTest):
def setUp(self):
super(TestCephCharm, self).setUp(cpl.CephCharm, {'source': None})
def test_ceph_keyring_path(self):
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='charmname')
self.assertEqual(
self.target.ceph_keyring_path,
'/var/lib/ceph/charmname')
self.target.snaps = ['gnocchi']
self.assertEqual(
self.target.ceph_keyring_path,
os.path.join(cpl.SNAP_PATH_PREFIX_FORMAT.format('gnocchi'),
'/var/lib/ceph/charmname'))
def test_configure_ceph_keyring(self):
self.patch_object(cpl.os.path, 'isdir', return_value=False)
self.patch_object(cpl.ch_core.host, 'mkdir')
self.patch_object(cpl.ch_core.hookenv, 'application_name',
return_value='sarepta')
self.patch_object(cpl.subprocess, 'check_call')
self.patch_object(cpl.shutil, 'chown')
self.patch_object(cpl.os, 'symlink')
interface = mock.MagicMock()
interface.key = 'KEY'
self.patch_object(cpl.os.path, 'exists', return_value=True)
self.patch_object(cpl.os, 'readlink')
self.patch_object(cpl.os, 'remove')
self.readlink.side_effect = OSError
self.target.configure_ceph_keyring(interface)
self.isdir.assert_called_with('/var/lib/ceph/sarepta')
self.mkdir.assert_called_with('/var/lib/ceph/sarepta',
owner='root', group='root', perms=0o750)
self.check_call.assert_called_with([
'ceph-authtool',
'/var/lib/ceph/sarepta/ceph.client.sarepta.keyring',
'--create-keyring', '--name=client.sarepta', '--add-key', 'KEY',
'--mode', '0600',
])
self.exists.assert_called_with(
'/etc/ceph/ceph.client.sarepta.keyring')
self.readlink.assert_called_with(
'/etc/ceph/ceph.client.sarepta.keyring')
assert not self.remove.called
self.symlink.assert_called_with(
'/var/lib/ceph/sarepta/ceph.client.sarepta.keyring',
'/etc/ceph/ceph.client.sarepta.keyring')
self.readlink.side_effect = None
self.readlink.return_value = '/some/where/else'
self.target.configure_ceph_keyring(interface)
self.remove.assert_called_with('/etc/ceph/ceph.client.sarepta.keyring')
def test_install(self):
self.patch_object(cpl.subprocess, 'check_output', return_value=b'\n')
self.patch_target('configure_source')
self.target.install()
self.target.configure_source.assert_called()
self.check_output.assert_called()