Add action to list existing shares

This change also adds typing information for all library
functionality.
This commit is contained in:
Chris MacNaughton 2022-01-26 14:33:28 +01:00
parent aed288ad38
commit c9cc451a35
7 changed files with 315 additions and 33 deletions

View File

@ -6,15 +6,22 @@ create-share:
params:
allowed-ips:
description: |
IP Addresses to grant Read/Write access to. the default allows
read/write access to any address that cana access this application.
Comma separated list of IP Addresses to grant Read/Write access to.
The default allows read/write access to any address that cana access
this application.
type: string
default: "0.0.0.0"
default: "0.0.0.0/0"
size:
description: |
Size in gigabytes of the share. When unset, the share will not be
restricted in size.
type: integer
default:
# TODO: CephFS Share name
name:
description: |
Name of the share that will be exported.
type: string
default:
list-shares:
description: List all shares that this application is managing
# TODO: Update, delete share

View File

@ -185,6 +185,9 @@ class CephNfsCharm(
self.framework.observe(
self.on.create_share_action,
self.create_share_action)
self.framework.observe(
self.on.list_shares_action,
self.list_shares_action)
def config_get(self, key, default=None):
"""Retrieve config option.
@ -254,6 +257,7 @@ class CephNfsCharm(
mode=0o750)
def daemon_reload_and_restart(service_name):
logging.debug("restarting {} after config change".format(service_name))
subprocess.check_call(['systemctl', 'daemon-reload'])
subprocess.check_call(['systemctl', 'restart', service_name])
@ -274,6 +278,7 @@ class CephNfsCharm(
logging.info("on_pools_available: status updated")
def on_departing(self, event):
logging.debug("Removing this unit from Ganesha cluster")
subprocess.check_call([
'ganesha-rados-grace', '--userid', self.client_name,
'--cephconf', self.CEPH_CONF, '--pool', self.pool_name,
@ -296,10 +301,12 @@ class CephNfsCharm(
'put', 'ganesha-export-index', '/dev/null'
]
try:
logging.debug("Creating ganesha-export-index in Ceph")
subprocess.check_call(cmd)
counter = tempfile.NamedTemporaryFile('w+')
counter.write('1000')
counter.seek(0)
logging.debug("Creating ganesha-export-counter in Ceph")
cmd = [
'rados', '-p', self.pool_name,
'-c', self.CEPH_CONF,
@ -314,6 +321,7 @@ class CephNfsCharm(
def on_pool_initialised(self, event):
try:
logging.debug("Restarting Ganesha after pool initialisation")
subprocess.check_call(['systemctl', 'restart', 'nfs-ganesha'])
except subprocess.CalledProcessError:
logging.error("Failed torestart nfs-ganesha")
@ -336,10 +344,24 @@ class CephNfsCharm(
event.fail("Share creation needs to be run from the application leader")
return
share_size = event.params.get('size')
name = event.params.get('name')
allowed_ips = event.params.get('allowed-ips')
allowed_ips = [ip.strip() for ip in allowed_ips.split(',')]
client = GaneshaNfs(self.client_name, self.pool_name)
export_path = client.create_share(size=share_size)
export_path = client.create_share(size=share_size, name=name, access_ips=allowed_ips)
self.peers.trigger_reload()
event.set_results({"message": "Share created", "path": export_path, "ip": self.access_address()})
event.set_results({
"message": "Share created",
"path": export_path,
"ip": self.access_address()})
def list_shares_action(self, event):
client = GaneshaNfs(self.client_name, self.pool_name)
exports = client.list_shares()
event.set_results({
"exports": [{"id": export.export_id, "name": export.name} for export in exports]
})
@ops_openstack.core.charm_class
class CephNFSCharmOcto(CephNfsCharm):

View File

@ -5,6 +5,7 @@
import json
import logging
import subprocess
from typing import List, Optional
import tempfile
import uuid
@ -12,7 +13,8 @@ logger = logging.getLogger(__name__)
# TODO: Add ACL with kerberos
GANESHA_EXPORT_TEMPLATE = """EXPORT {{
GANESHA_EXPORT_TEMPLATE = """## This export is managed by the CephNFS charm ##
EXPORT {{
# Each EXPORT must have a unique Export_Id.
Export_Id = {id};
@ -42,6 +44,56 @@ GANESHA_EXPORT_TEMPLATE = """EXPORT {{
"""
class Export(object):
"""Object that encodes and decodes Ganesha export blocks"""
def __init__(self, export_id: int, path: str,
user_id: str, access_key: str, clients: List[str],
name: Optional[str] = None):
self.export_id = export_id
self.path = path
self.user_id = user_id
self.access_key = access_key
self.clients = clients
if '0.0.0.0/0' in self.clients:
self.clients[self.clients.index('0.0.0.0/0')] = '0.0.0.0'
if self.path:
self.name = self.path.split('/')[-2]
def from_export(export: str) -> 'Export':
if not export.startswith('## This export is managed by the CephNFS charm ##'):
raise RuntimeError('This export is not managed by the CephNFS charm.')
clients = []
strip_chars = " ;'\""
for line in [line.strip() for line in export.splitlines()]:
if line.startswith('Export_Id'):
export_id = int(line.split('=', 1)[1].strip(strip_chars))
if line.startswith('Path'):
path = line.split('=', 1)[1].strip(strip_chars)
if line.startswith('User_Id'):
user_id = line.split('=', 1)[1].strip(strip_chars)
if line.startswith('Secret_Access_Key'):
access_key = line.split('=', 1)[1].strip(strip_chars)
if line.startswith('Clients'):
clients = line.split('=', 1)[1].strip(strip_chars)
clients = clients.split(', ')
return Export(
export_id=export_id,
path=path,
user_id=user_id,
access_key=access_key,
clients=clients
)
def to_export(self) -> str:
return GANESHA_EXPORT_TEMPLATE.format(
id=self.export_id,
path=self.path,
user_id=self.user_id,
secret_key=self.access_key,
clients=', '.join(self.clients)
)
class GaneshaNfs(object):
export_index = "ganesha-export-index"
@ -51,26 +103,36 @@ class GaneshaNfs(object):
self.client_name = client_name
self.ceph_pool = ceph_pool
def create_share(self, name=None, size=None):
def create_share(self, name: str = None, size: int = None,
access_ips: List[str] = None) -> str:
"""Create a CephFS Share and export it via Ganesha
:param name: String name of the share to create
:param size: Int size in gigabytes of the share to create
:returns: Path to the export
"""
if name is None:
name = str(uuid.uuid4())
else:
existing_shares = [share for share in self.list_shares() if share.name == name]
if existing_shares:
return existing_shares[0].path
if size is not None:
size_in_bytes = size * 1024 * 1024
if access_ips is None:
access_ips = ['0.0.0.0/0']
access_id = 'ganesha-{}'.format(name)
self.export_path = self._create_cephfs_share(name, size_in_bytes)
export_id = self._get_next_export_id()
export_template = GANESHA_EXPORT_TEMPLATE.format(
id=export_id,
export = Export(
export_id=export_id,
path=self.export_path,
user_id=access_id,
secret_key=self._ceph_auth_key(access_id),
clients='0.0.0.0'
access_key=self._ceph_auth_key(access_id),
clients=access_ips
)
export_template = export.to_export()
logging.debug("Export template::\n{}".format(export_template))
tmp_file = self._tmpfile(export_template)
self._rados_put('ganesha-export-{}'.format(export_id), tmp_file.name)
@ -78,8 +140,23 @@ class GaneshaNfs(object):
self._add_share_to_index(export_id)
return self.export_path
def list_shares(self):
pass
def list_shares(self) -> List[Export]:
share_urls = [
url.replace('%url rados://{}/'.format(self.ceph_pool), '')
for url
in self._rados_get('ganesha-export-index').splitlines()]
exports_raw = [
self._rados_get(url)
for url in share_urls
if url.strip()
]
exports = []
for export_raw in exports_raw:
try:
exports.append(Export.from_export(export_raw))
except RuntimeError:
logging.warning("Encountered an independently created export")
return exports
def get_share(self, id):
pass
@ -87,13 +164,13 @@ class GaneshaNfs(object):
def update_share(self, id):
pass
def _ganesha_add_export(self, export_path, tmp_path):
def _ganesha_add_export(self, export_path: str, tmp_path: str):
"""Add a configured NFS export to Ganesha"""
return self._dbus_send(
'ExportMgr', 'AddExport',
'string:{}'.format(tmp_path), 'string:EXPORT(Path={})'.format(export_path))
def _dbus_send(self, section, action, *args):
def _dbus_send(self, section: str, action: str, *args):
"""Send a command to Ganesha via Dbus"""
cmd = [
'dbus-send', '--print-reply', '--system', '--dest=org.ganesha.nfsd',
@ -102,7 +179,7 @@ class GaneshaNfs(object):
logging.debug("About to call: {}".format(cmd))
return subprocess.check_output(cmd)
def _create_cephfs_share(self, name, size_in_bytes=None):
def _create_cephfs_share(self, name: str, size_in_bytes: int = None):
"""Create an authorise a CephFS share.
:param name: String name of the share to create
@ -135,15 +212,15 @@ class GaneshaNfs(object):
logging.error("failed to get path")
return False
def _ceph_subvolume_command(self, *cmd):
def _ceph_subvolume_command(self, *cmd: List[str]) -> subprocess.CompletedProcess:
"""Run a ceph fs subvolume command"""
return self._ceph_fs_command('subvolume', *cmd)
def _ceph_fs_command(self, *cmd):
def _ceph_fs_command(self, *cmd: List[str]) -> subprocess.CompletedProcess:
"""Run a ceph fs command"""
return self._ceph_command('fs', *cmd)
def _ceph_auth_key(self, access_id):
def _ceph_auth_key(self, access_id: str) -> str:
"""Retrieve the CephX key associated with this id
:returns: The access key
@ -153,12 +230,12 @@ class GaneshaNfs(object):
'auth', 'get', 'client.{}'.format(access_id), '--format=json')
return json.loads(output.decode('UTF-8'))[0]['key']
def _ceph_command(self, *cmd):
def _ceph_command(self, *cmd: List[str]) -> subprocess.CompletedProcess:
"""Run a ceph command"""
cmd = ["ceph", "--id", self.client_name, "--conf=/etc/ceph/ceph.conf"] + [*cmd]
return subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
def _get_next_export_id(self):
def _get_next_export_id(self) -> int:
"""Retrieve the next available export ID, and update the rados key
:returns: The export ID
@ -169,15 +246,15 @@ class GaneshaNfs(object):
self._rados_put(self.export_counter, file.name)
return next_id
def _tmpfile(self, value):
def _tmpfile(self, value: str) -> tempfile._TemporaryFileWrapper:
file = tempfile.NamedTemporaryFile(mode='w+')
file.write(str(value))
file.seek(0)
return file
def _rados_get(self, name):
def _rados_get(self, name: str) -> str:
"""Retrieve the content of the RADOS object with a given name
:param name: Name of the RADOS object to retrieve
:returns: Contents of the RADOS object
@ -191,12 +268,12 @@ class GaneshaNfs(object):
output = subprocess.check_output(cmd)
return output.decode('utf-8')
def _rados_put(self, name, source):
def _rados_put(self, name: str, source: str):
"""Store the contents of the source file in a named RADOS object.
:param name: Name of the RADOS object to retrieve
:param source: Path to a file to upload to RADOS.
:returns: None
"""
cmd = [
@ -206,8 +283,8 @@ class GaneshaNfs(object):
logging.debug("About to call: {}".format(cmd))
subprocess.check_call(cmd)
def _add_share_to_index(self, export_id):
def _add_share_to_index(self, export_id: int):
index = self._rados_get(self.export_index)
index += '%url rados://{}/ganesha-export-{}'.format(self.ceph_pool, export_id)
index += '\n%url rados://{}/ganesha-export-{}'.format(self.ceph_pool, export_id)
tmpfile = self._tmpfile(index)
self._rados_put(self.export_index, tmpfile.name)

View File

@ -17,12 +17,15 @@ from ops.framework import (
class PoolInitialisedEvent(EventBase):
pass
class ReloadNonceEvent(EventBase):
pass
class DepartedEvent(EventBase):
pass
class CephNfsPeerEvents(ObjectEvents):
pool_initialised = EventSource(PoolInitialisedEvent)
reload_nonce = EventSource(ReloadNonceEvent)
@ -50,16 +53,20 @@ class CephNfsPeers(Object):
def on_changed(self, event):
logging.info("CephNfsPeers on_changed")
logging.debug('pool_initialised: {}'.format(self.pool_initialised))
if self.pool_initialised == 'True' and not self._stored.pool_initialised:
logging.info("emiting pool initialised")
self.on.pool_initialised.emit()
self._stored.pool_initialised = True
self._stored.pool_initialised = True
logging.debug('reload_nonce: {}'.format(self.reload_nonce))
if self._stored.reload_nonce != self.reload_nonce:
logging.info("emiting reload nonce")
self.on.reload_nonce.emit()
self._stored.reload_nonce = self.reload_nonce
def on_departed(self, event):
logging.warning("CephNfsPeers on_departed")
if this_unit.name == os.getenv('JUJU_DEPARTING_UNIT'):
if self.this_unit.name == os.getenv('JUJU_DEPARTING_UNIT'):
self.on.departing.emit()
def initialised_pool(self):

125
tests/nfs_ganesha.py Normal file
View File

@ -0,0 +1,125 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Encapsulate ``Ceph NFS`` testing."""
import logging
import subprocess
import tenacity
from typing import Dict
import unittest
import yaml
import zaza
import zaza.utilities.installers
class NfsGaneshaTest(unittest.TestCase):
mount_dir = '/mnt/test'
share_protocol = 'nfs'
mounts_share = False
def tearDown(self):
if self.mounts_share:
try:
zaza.utilities.generic.run_via_ssh(
unit_name='ubuntu/0',
cmd='sudo umount /mnt/test && sudo rmdir /mnt/test')
zaza.utilities.generic.run_via_ssh(
unit_name='ubuntu/1',
cmd='sudo umount /mnt/test && sudo rmdir /mnt/test')
except subprocess.CalledProcessError:
logging.warning("Failed to cleanup mounts")
def _create_share(self, name: str, size: int = 10) -> Dict[str, str]:
action = zaza.model.run_action_on_leader(
'ceph-nfs',
'create-share',
action_params={
'name': name,
'size': size,
})
self.assertEqual(action.status, 'completed')
results = action.results
logging.debug("Action results: {}".format(results))
return results
def _mount_share(self, unit_name: str, share_ip: str, export_path: str):
ssh_cmd = (
'sudo mkdir -p {0} && '
'sudo mount -t {1} -o nfsvers=4.1,proto=tcp {2}:{3} {0}'.format(
self.mount_dir,
self.share_protocol,
share_ip,
export_path))
for attempt in tenacity.Retrying(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=3, min=2, max=10)):
with attempt:
zaza.utilities.generic.run_via_ssh(
unit_name=unit_name,
cmd=ssh_cmd)
def _install_dependencies(self, unit: str):
logging.debug("About to install nfs-common on {}".format(unit))
zaza.utilities.generic.run_via_ssh(
unit_name=unit,
cmd='sudo apt-get install -yq nfs-common')
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=3, min=2, max=10))
def _write_testing_file_on_instance(self, instance_name: str):
zaza.utilities.generic.run_via_ssh(
unit_name=instance_name,
cmd='echo "test" | sudo tee {}/test'.format(self.mount_dir))
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=3, min=2, max=10))
def _verify_testing_file_on_instance(self, instance_name: str):
run_with_juju_ssh = zaza.utilities.installers.make_juju_ssh_fn('ubuntu/1', sudo=True)
output = run_with_juju_ssh(
'sudo cat {}/test'.format(self.mount_dir))
logging.info("Verification output: {}".format(output))
self.assertEqual('test\r\n', output)
def test_create_share(self):
for unit in ['0', '1']:
self._install_dependencies('ubuntu/{}'.format(unit))
logging.info("Creating a share")
share = self._create_share('test_ganesha_share')
export_path = share['path']
ip = share['ip']
logging.info("Mounting share on ubuntu units")
self.mounts_share = True
self._mount_share('ubuntu/0', ip, export_path)
self._mount_share('ubuntu/1', ip, export_path)
logging.info("writing to the share on ubuntu/0")
self._write_testing_file_on_instance('ubuntu/0')
logging.info("reading from the share on ubuntu/1")
self._verify_testing_file_on_instance('ubuntu/1')
def test_list_shares(self):
self._create_share('test_ganesha_list_share')
action = zaza.model.run_action_on_leader(
'ceph-nfs',
'list-shares',
action_params={})
self.assertEqual(action.status, 'completed')
results = action.results
logging.debug("Action results: {}".format(results))
logging.debug("exports: {}".format(results['exports']))
exports = yaml.safe_load(results['exports'])
self.assertIn('test_ganesha_list_share', [export['name'] for export in exports])

View File

@ -5,7 +5,8 @@ gate_bundles:
smoke_bundles:
- focal-octopus
configure: []
tests: []
tests:
- tests.nfs_ganesha.NfsGaneshaTest
target_deploy_status:
ubuntu:
workload-status: active

View File

@ -0,0 +1,43 @@
import unittest
import ganesha
EXAMPLE_EXPORT = """## This export is managed by the CephNFS charm ##
EXPORT {
# Each EXPORT must have a unique Export_Id.
Export_Id = 1000;
# The directory in the exported file system this export
# is rooted on.
Path = '/volumes/_nogroup/test_ganesha_share/e12a49ef-1b2b-40b3-ba6c-7e6695bcc950';
# FSAL, Ganesha's module component
FSAL {
# FSAL name
Name = "Ceph";
User_Id = "ganesha-test_ganesha_share";
Secret_Access_Key = "AQCT9+9h4cwJOxAAue2fFvvGTWziUiR9koCHEw==";
}
# Path of export in the NFSv4 pseudo filesystem
Pseudo = '/volumes/_nogroup/test_ganesha_share/e12a49ef-1b2b-40b3-ba6c-7e6695bcc950';
SecType = "sys";
CLIENT {
Access_Type = "rw";
Clients = 0.0.0.0;
}
# User id squashing, one of None, Root, All
Squash = "None";
}
"""
class ExportTest(unittest.TestCase):
def test_parser(self):
export = ganesha.Export.from_export(EXAMPLE_EXPORT)
self.assertEqual(export.export_id, 1000)
self.assertEqual(export.clients, ['0.0.0.0'])
self.assertEqual(export.to_export(), EXAMPLE_EXPORT)
self.assertEqual(export.name, 'test_ganesha_share')