Add `rbd-mirror` relation

Reactive interface counterpart is here:
https://github.com/openstack-charmers/charm-interface-ceph-rbd-mirror

Sync charms.ceph.

Depends-On: I1bad5311ed034188a78dc67b493c22bff7ce4f7d
Change-Id: I509793e6c5aad9ea41fa4904c83b58e7477770e8
This commit is contained in:
Frode Nordahl 2019-02-06 11:26:54 +01:00
parent 3a094a28e7
commit 677dc1806a
No known key found for this signature in database
GPG Key ID: 6A5D59A3BA48373F
7 changed files with 357 additions and 14 deletions

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import subprocess
import socket
@ -38,6 +39,7 @@ from charmhelpers.core.hookenv import (
is_relation_made,
relation_get,
relation_set,
relation_type,
leader_set, leader_get,
is_leader,
remote_unit,
@ -401,7 +403,14 @@ def mon_relation():
moncount = int(config('monitor-count'))
if len(get_mon_hosts()) >= moncount:
if not ceph.is_bootstrapped():
if ceph.is_bootstrapped():
# The ceph-mon unit chosen for handling broker requests is based on
# internal Ceph MON leadership and not Juju leadership. To update
# the rbd-mirror relation on all ceph-mon units after pool creation
# the unit handling the broker request will update a nonce on the
# mon relation.
notify_rbd_mirrors()
else:
status_set('maintenance', 'Bootstrapping MON cluster')
# the following call raises an exception
# if it can't add the keyring
@ -445,6 +454,7 @@ def mon_relation():
notify_osds()
notify_radosgws()
notify_client()
notify_rbd_mirrors()
else:
log('Not enough mons ({}), punting.'
.format(len(get_mon_hosts())))
@ -462,6 +472,12 @@ def notify_radosgws():
radosgw_relation(relid=relid, unit=unit)
def notify_rbd_mirrors():
for relid in relation_ids('rbd-mirror'):
for unit in related_units(relid):
rbd_mirror_relation(relid=relid, unit=unit)
def notify_client():
for relid in relation_ids('client'):
client_relation_joined(relid)
@ -474,6 +490,25 @@ def notify_client():
mds_relation_joined(relid=relid, unit=unit)
def notify_mons():
"""Update a nonce on the ``mon`` relation.
This is useful for flagging that our peer mon units should update some of
their client relations.
Normally we would have handled this with leader storage, but for the Ceph
case, the unit handling the broker requests is the Ceph MON leader and not
necessarilly the Juju leader.
A non-leader unit has no way of changing data in leader-storage.
"""
nonce = uuid.uuid4()
for relid in relation_ids('mon'):
for unit in related_units(relid):
relation_set(relation_id=relid,
relation_settings={'nonce': nonce})
def handle_broker_request(relid, unit, add_legacy_response=False):
"""Retrieve broker request from relation, process, return response data.
@ -501,6 +536,20 @@ def handle_broker_request(relid, unit, add_legacy_response=False):
response.update({unit_response_key: rsp})
if add_legacy_response:
response.update({'broker_rsp': rsp})
# prevent recursion when called from rbd_mirror_relation()
if relation_type() != 'rbd-mirror':
# update ``rbd-mirror`` relations for this unit with
# information about new pools.
log('Notifying this units rbd-mirror relations after '
'processing broker request.', level=DEBUG)
notify_rbd_mirrors()
# notify mons to flag that the other mon units should update
# their ``rbd-mirror`` relations with information about new pools.
log('Notifying peers after processing broker request.',
level=DEBUG)
notify_mons()
return response
@ -527,6 +576,7 @@ def osd_relation(relid=None, unit=None):
# units so ensure that any deferred hooks are processed
notify_radosgws()
notify_client()
notify_rbd_mirrors()
else:
log('mon cluster not in quorum - deferring fsid provision')
@ -620,6 +670,37 @@ def radosgw_relation(relid=None, unit=None):
relation_set(relation_id=relid, relation_settings=data)
@hooks.hook('rbd-mirror-relation-joined')
@hooks.hook('rbd-mirror-relation-changed')
def rbd_mirror_relation(relid=None, unit=None):
if ready_for_service():
log('mon cluster in quorum and osds bootstrapped '
'- providing rbd-mirror client with keys')
if not unit:
unit = remote_unit()
# handle broker requests first to get a updated pool map
data = (handle_broker_request(relid, unit))
data.update({
'auth': config('auth-supported'),
'ceph-public-address': get_public_addr(),
'pools': json.dumps(ceph.list_pools_detail(), sort_keys=True)
})
cluster_addr = get_cluster_addr()
if cluster_addr:
data['ceph-cluster-address'] = cluster_addr
# handle both classic and reactive Endpoint peers
try:
unique_id = json.loads(
relation_get('unique_id', unit=unit, rid=relid))
except (TypeError, json.decoder.JSONDecodeError):
unique_id = relation_get('unique_id', unit=unit, rid=relid)
if unique_id:
data['{}_key'.format(unique_id)] = ceph.get_rbd_mirror_key(
'rbd-mirror.{}'.format(unique_id))
relation_set(relation_id=relid, relation_settings=data)
@hooks.hook('mds-relation-changed')
@hooks.hook('mds-relation-joined')
def mds_relation_joined(relid=None, unit=None):
@ -718,6 +799,7 @@ def upgrade_charm():
# key permission changes are applied
notify_client()
notify_radosgws()
notify_rbd_mirrors()
@hooks.hook('start')

View File

@ -0,0 +1 @@
ceph_hooks.py

View File

@ -0,0 +1 @@
ceph_hooks.py

View File

@ -2520,18 +2520,20 @@ def update_owner(path, recurse_dirs=True):
secs=elapsed_time.total_seconds(), path=path), DEBUG)
def list_pools(service):
def list_pools(client='admin'):
"""This will list the current pools that Ceph has
:param service: String service id to run under
:returns: list. Returns a list of the ceph pools.
:raises: CalledProcessError if the subprocess fails to run.
:param client: (Optional) client id for ceph key to use
Defaults to ``admin``
:type cilent: str
:returns: Returns a list of available pools.
:rtype: list
:raises: subprocess.CalledProcessError if the subprocess fails to run.
"""
try:
pool_list = []
pools = str(subprocess
.check_output(['rados', '--id', service, 'lspools'])
.decode('UTF-8'))
pools = subprocess.check_output(['rados', '--id', client, 'lspools'],
universal_newlines=True)
for pool in pools.splitlines():
pool_list.append(pool)
return pool_list
@ -2540,6 +2542,113 @@ def list_pools(service):
raise
def get_pool_param(pool, param, client='admin'):
"""Get parameter from pool.
:param pool: Name of pool to get variable from
:type pool: str
:param param: Name of variable to get
:type param: str
:param client: (Optional) client id for ceph key to use
Defaults to ``admin``
:type cilent: str
:returns: Value of variable on pool or None
:rtype: str or None
:raises: subprocess.CalledProcessError
"""
try:
output = subprocess.check_output(
['ceph', '--id', client, 'osd', 'pool', 'get',
pool, param], universal_newlines=True)
except subprocess.CalledProcessError as cp:
if cp.returncode == 2 and 'ENOENT: option' in cp.output:
return None
raise
if ':' in output:
return output.split(':')[1].lstrip().rstrip()
def get_pool_quota(pool, client='admin'):
"""Get pool quota.
:param pool: Name of pool to get variable from
:type pool: str
:param client: (Optional) client id for ceph key to use
Defaults to ``admin``
:type cilent: str
:returns: Dictionary with quota variables
:rtype: dict
:raises: subprocess.CalledProcessError
"""
output = subprocess.check_output(
['ceph', '--id', client, 'osd', 'pool', 'get-quota', pool],
universal_newlines=True)
rc = re.compile(r'\s+max\s+(\S+)\s*:\s+(\d+)')
result = {}
for line in output.splitlines():
m = rc.match(line)
if m:
result.update({'max_{}'.format(m.group(1)): m.group(2)})
return result
def get_pool_applications(pool='', client='admin'):
"""Get pool applications.
:param pool: (Optional) Name of pool to get applications for
Defaults to get for all pools
:type pool: str
:param client: (Optional) client id for ceph key to use
Defaults to ``admin``
:type cilent: str
:returns: Dictionary with pool name as key
:rtype: dict
:raises: subprocess.CalledProcessError
"""
cmd = ['ceph', '--id', client, 'osd', 'pool', 'application', 'get']
if pool:
cmd.append(pool)
try:
output = subprocess.check_output(cmd, universal_newlines=True)
except subprocess.CalledProcessError as cp:
if cp.returncode == 2 and 'ENOENT' in cp.output:
return {}
raise
return json.loads(output)
def list_pools_detail():
"""Get detailed information about pools.
Structure:
{'pool_name_1': {'applications': {'application': {}},
'parameters': {'pg_num': 42, 'size': 42},
'quota': {'max_bytes': '1000',
'max_objects': '10'},
},
'pool_name_2': ...
}
:returns: Dictionary with detailed pool information.
:rtype: dict
:raises: subproces.CalledProcessError
"""
get_params = ['pg_num', 'size']
result = {}
applications = get_pool_applications()
for pool in list_pools():
result[pool] = {
'applications': applications.get(pool, {}),
'parameters': {},
'quota': get_pool_quota(pool),
}
for param in get_params:
result[pool]['parameters'].update({
param: get_pool_param(pool, param)})
return result
def dirs_need_ownership_update(service):
"""Determines if directories still need change of ownership.

View File

@ -34,6 +34,8 @@ provides:
interface: ceph-osd
radosgw:
interface: ceph-radosgw
rbd-mirror:
interface: ceph-rbd-mirror
nrpe-external-master:
interface: nrpe-external-master
scope: container

View File

@ -2,10 +2,8 @@
# This file is managed centrally by release-tools and should not be modified
# within individual charm repos.
[tox]
;envlist = pep8,py27,py35,py36
envlist = pep8,py27,py35
envlist = pep8,py3
skipsdist = True
;skip_missing_interpreters = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
@ -25,6 +23,11 @@ deps = -r{toxinidir}/requirements.txt
# temporarily disable py27
commands = /bin/true
[testenv:py3]
basepython = python3
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
; keep zuul happy until we change the py35 job
[testenv:py35]
basepython = python3.5

View File

@ -1,4 +1,5 @@
import copy
import json
import unittest
import sys
@ -187,6 +188,7 @@ class CephHooksTestCase(unittest.TestCase):
mocks["apt_install"].assert_called_once_with(
["python-dbus", "lockfile-progs"])
@patch.object(ceph_hooks, 'notify_rbd_mirrors')
@patch.object(ceph_hooks, 'service_pause')
@patch.object(ceph_hooks, 'notify_radosgws')
@patch.object(ceph_hooks, 'ceph')
@ -198,7 +200,8 @@ class CephHooksTestCase(unittest.TestCase):
mock_notify_client,
mock_ceph,
mock_notify_radosgws,
mock_service_pause):
mock_service_pause,
mock_notify_rbd_mirrors):
config = copy.deepcopy(CHARM_CONFIG)
mock_config.side_effect = lambda key: config[key]
with patch.multiple(
@ -221,6 +224,61 @@ class CephHooksTestCase(unittest.TestCase):
mock_ceph.update_monfs.assert_called_once_with()
mock_service_pause.assert_called_with('ceph-create-keys')
@patch.object(ceph_hooks, 'mds_relation_joined')
@patch.object(ceph_hooks, 'admin_relation_joined')
@patch.object(ceph_hooks, 'client_relation_changed')
@patch.object(ceph_hooks, 'client_relation_joined')
@patch.object(ceph_hooks, 'related_units')
@patch.object(ceph_hooks, 'relation_ids')
def test_notify_client(self, mock_relation_ids, mock_related_units,
mock_client_relation_joined,
mock_client_relation_changed,
mock_admin_relation_joined,
mock_mds_relation_joined):
mock_relation_ids.return_value = ['arelid']
mock_related_units.return_value = ['aunit']
ceph_hooks.notify_client()
mock_relation_ids.assert_has_calls([
call('client'),
call('admin'),
call('mds'),
])
mock_related_units.assert_called_with('arelid')
mock_client_relation_joined.assert_called_once_with('arelid')
mock_client_relation_changed.assert_called_once_with('arelid', 'aunit')
mock_admin_relation_joined.assert_called_once_with('arelid')
mock_mds_relation_joined.assert_called_once_with(relid='arelid',
unit='aunit')
@patch.object(ceph_hooks, 'rbd_mirror_relation')
@patch.object(ceph_hooks, 'related_units')
@patch.object(ceph_hooks, 'relation_ids')
def test_notify_rbd_mirrors(self, mock_relation_ids, mock_related_units,
mock_rbd_mirror_relation):
mock_relation_ids.return_value = ['arelid']
mock_related_units.return_value = ['aunit']
ceph_hooks.notify_rbd_mirrors()
mock_relation_ids.assert_called_once_with('rbd-mirror')
mock_related_units.assert_called_once_with('arelid')
mock_rbd_mirror_relation.assert_called_once_with(relid='arelid',
unit='aunit')
@patch.object(ceph_hooks, 'uuid')
@patch.object(ceph_hooks, 'relation_set')
@patch.object(ceph_hooks, 'related_units')
@patch.object(ceph_hooks, 'relation_ids')
def test_notify_mons(self, mock_relation_ids, mock_related_units,
mock_relation_set, mock_uuid):
mock_relation_ids.return_value = ['arelid']
mock_related_units.return_value = ['aunit']
mock_uuid.uuid4.return_value = 'FAKE-UUID'
ceph_hooks.notify_mons()
mock_relation_ids.assert_called_once_with('mon')
mock_related_units.assert_called_once_with('arelid')
mock_relation_set.assert_called_once_with(relation_id='arelid',
relation_settings={
'nonce': 'FAKE-UUID'})
class RelatedUnitsTestCase(unittest.TestCase):
@ -264,6 +322,7 @@ class RelatedUnitsTestCase(unittest.TestCase):
call('osd:23')
])
@patch.object(ceph_hooks, 'relation_ids', return_value=[])
@patch.object(ceph_hooks, 'ready_for_service')
@patch.object(ceph_hooks.ceph, 'is_quorum')
@patch.object(ceph_hooks, 'remote_unit')
@ -277,7 +336,8 @@ class RelatedUnitsTestCase(unittest.TestCase):
relation_get,
remote_unit,
is_quorum,
ready_for_service):
ready_for_service,
relation_ids):
# Check for LP #1738154
ready_for_service.return_value = True
process_requests.return_value = 'AOK'
@ -300,13 +360,17 @@ class RelatedUnitsTestCase(unittest.TestCase):
'broker-rsp-glance-0': 'AOK',
'broker_rsp': 'AOK'})
@patch.object(ceph_hooks, 'notify_mons')
@patch.object(ceph_hooks, 'notify_rbd_mirrors')
@patch.object(ceph_hooks, 'process_requests')
@patch.object(ceph_hooks.ceph, 'is_leader')
@patch.object(ceph_hooks, 'relation_get')
@patch.object(ceph_hooks, 'remote_unit')
def test_handle_broker_request(self, mock_remote_unit, mock_relation_get,
mock_ceph_is_leader,
mock_broker_process_requests):
mock_broker_process_requests,
mock_notify_rbd_mirrors,
mock_notify_mons):
mock_remote_unit.return_value = 'glance/0'
ceph_hooks.handle_broker_request('rel1', None)
mock_remote_unit.assert_called_once_with()
@ -317,6 +381,8 @@ class RelatedUnitsTestCase(unittest.TestCase):
self.assertEqual(
ceph_hooks.handle_broker_request('rel1', 'glance/0'),
{'broker-rsp-glance-0': 'AOK'})
mock_notify_rbd_mirrors.assert_called_with()
mock_notify_mons.assert_called_with()
mock_relation_get.assert_called_once_with(rid='rel1', unit='glance/0')
self.assertEqual(
ceph_hooks.handle_broker_request('rel1', 'glance/0',
@ -503,3 +569,82 @@ class RGWRelationTestCase(test_utils.CharmTestCase):
}
)
self.ceph.get_radosgw_key.assert_called_once_with(name='testhostname')
class RBDMirrorRelationTestCase(test_utils.CharmTestCase):
TO_PATCH = [
'relation_get',
'get_cluster_addr',
'get_public_addr',
'ready_for_service',
'remote_unit',
'apt_install',
'filter_installed_packages',
'leader_get',
'ceph',
'process_requests',
'log',
'relation_set',
'config',
'handle_broker_request',
]
test_key = 'OTQ1MDdiODYtMmZhZi00M2IwLTkzYTgtZWI0MGRhNzdmNzBlCg=='
def setUp(self):
super(RBDMirrorRelationTestCase, self).setUp(ceph_hooks, self.TO_PATCH)
self.relation_get.side_effect = self.test_relation.get
self.config.side_effect = self.test_config.get
self.test_config.set('auth-supported', 'cephx')
self.filter_installed_packages.side_effect = lambda pkgs: pkgs
self.ready_for_service.return_value = True
self.ceph.is_leader.return_value = True
self.ceph.get_rbd_mirror_key.return_value = self.test_key
self.get_cluster_addr.return_value = '192.0.2.10'
self.get_public_addr.return_value = '198.51.100.10'
self.ceph.list_pools_detail.return_value = {'pool': {}}
def test_rbd_mirror_relation(self):
self.handle_broker_request.return_value = {}
base_relation_settings = {
'auth': self.test_config.get('auth-supported'),
'ceph-public-address': '198.51.100.10',
'ceph-cluster-address': '192.0.2.10',
'pools': json.dumps({'pool': {}}),
}
ceph_hooks.rbd_mirror_relation('rbd-mirror:51', 'ceph-rbd-mirror/0')
self.handle_broker_request.assert_called_with(
'rbd-mirror:51', 'ceph-rbd-mirror/0')
self.relation_set.assert_called_with(
relation_id='rbd-mirror:51',
relation_settings=base_relation_settings)
self.test_relation.set(
{'unique_id': None})
ceph_hooks.rbd_mirror_relation('rbd-mirror:52', 'ceph-rbd-mirror/0')
self.relation_set.assert_called_with(
relation_id='rbd-mirror:52',
relation_settings=base_relation_settings)
self.test_relation.set(
{'unique_id': json.dumps('otherSideIsReactiveEndpoint')})
ceph_hooks.rbd_mirror_relation('rbd-mirror:53', 'ceph-rbd-mirror/0')
self.ceph.get_rbd_mirror_key.assert_called_once_with(
'rbd-mirror.otherSideIsReactiveEndpoint')
key_relation_settings = base_relation_settings.copy()
key_relation_settings.update(
{'otherSideIsReactiveEndpoint_key': self.test_key})
self.relation_set.assert_called_with(
relation_id='rbd-mirror:53',
relation_settings=key_relation_settings)
self.test_relation.set({'unique_id': 'somehostname'})
ceph_hooks.rbd_mirror_relation('rbd-mirror:42', 'ceph-rbd-mirror/0')
self.ceph.get_rbd_mirror_key.assert_called_with(
'rbd-mirror.somehostname')
key_relation_settings = base_relation_settings.copy()
key_relation_settings.update({
'otherSideIsReactiveEndpoint_key': self.test_key,
'somehostname_key': self.test_key
})
self.relation_set.assert_called_with(
relation_id='rbd-mirror:42',
relation_settings=key_relation_settings)