test: add unit testing for new "create-cephfs-client" op

Change-Id: I9063059433c514f31ff0161bc24b20f24e19043d
This commit is contained in:
José Julián Espina 2024-04-19 17:52:38 -06:00
parent b8758b3c7e
commit 5473f69024
No known key found for this signature in database
GPG Key ID: 1E1AFCAE118C0954

View File

@ -14,6 +14,7 @@
import json import json
import unittest import unittest
import textwrap
from unittest.mock import patch, ANY from unittest.mock import patch, ANY
@ -863,3 +864,85 @@ class CephBrokerTestCase(unittest.TestCase):
} }
) )
mock_create_erasure_profile.assert_not_called() mock_create_erasure_profile.assert_not_called()
@patch.object(charms_ceph.broker, 'get_cephfs')
@patch.object(charms_ceph.broker, 'check_output')
@patch.object(charms_ceph.broker, 'log')
def test_create_cephfs_client(self, mock_log, check_output, get_cephfs):
def mock_check_output(*args, **kwargs):
cmd = args[0]
if cmd[:5] == ["ceph", "--id", "admin", "auth", "ls"]:
return textwrap.dedent("""
{
"auth_dump": [
{
"entity": "mds.ceph-fs",
"key": "mds-key",
"caps": {
"mds": "allow",
"mon": "allow rwx",
"osd": "allow *"
}
},
{
"entity": "mds.filesystem",
"key": "fs-key",
"caps": {
"mds": "allow",
"mon": "allow rwx",
"osd": "allow *"
}
},
{
"entity": "client.other-client",
"key": "other-client-key",
"caps": {
"mds": "allow rw fsname=filesystem",
"mon": "allow r fsname=filesystem",
"osd": "allow rw tag cephfs data=filesystem"
}
}
]
}
""")
elif cmd[:9] == ["ceph",
"--id",
"admin",
"fs",
"authorize",
"filesystem",
"client.fs-client",
"/",
"rw"
]:
return textwrap.dedent("""
[
{
"entity": "client.fs-client",
"key": "fs-client-key",
"caps": {
"mds": "allow rw fsname=filesystem",
"mon": "allow r fsname=filesystem",
"osd": "allow rw tag cephfs data=filesystem"
}
}
]
""")
return unittest.mock.DEFAULT
get_cephfs.return_value = ["filesystem"]
check_output.side_effect = mock_check_output
reqs = json.dumps({'api-version': 1,
'request-id': '1ef5aede',
'ops': [{
'op': 'create-cephfs-client',
'fs_name': 'filesystem',
'client_id': 'fs-client',
'path': '/',
'perms': 'rw',
}]})
rc = charms_ceph.broker.process_requests(reqs)
get_cephfs.assert_called_once_with(service='admin')
self.assertEqual(json.loads(rc)['exit-code'], 0)
self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
self.assertEqual(json.loads(rc)['key'], 'fs-client-key')